Skip to content

SYSTEM PRESHUTDOWN command for graceful shutdown swarm node #852

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: antalya-25.3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/en/sql-reference/statements/system.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,12 @@ SYSTEM RELOAD USERS [ON CLUSTER cluster_name]

Normally shuts down ClickHouse (like `service clickhouse-server stop` / `kill {$pid_clickhouse-server}`)

## PRESHUTDOWN {#preshutdown}

<CloudNotSupportedBadge/>

Prepare node for graceful shutdown. Unregister in autodiscovered clusters, stop accepting distributed requests to object storages (s3Cluster, icebergCluster, etc.).

## KILL {#kill}

Aborts ClickHouse process (like `kill -9 {$ pid_clickhouse-server}`)
Expand Down
2 changes: 2 additions & 0 deletions programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2689,6 +2689,8 @@ try

is_cancelled = true;

global_context->stopSwarm();

LOG_DEBUG(log, "Waiting for current connections to close.");

size_t current_connections = 0;
Expand Down
1 change: 1 addition & 0 deletions src/Access/Common/AccessType.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ enum class AccessType : uint8_t
M(SYSTEM_TTL_MERGES, "SYSTEM STOP TTL MERGES, SYSTEM START TTL MERGES, STOP TTL MERGES, START TTL MERGES", TABLE, SYSTEM) \
M(SYSTEM_FETCHES, "SYSTEM STOP FETCHES, SYSTEM START FETCHES, STOP FETCHES, START FETCHES", TABLE, SYSTEM) \
M(SYSTEM_MOVES, "SYSTEM STOP MOVES, SYSTEM START MOVES, STOP MOVES, START MOVES", TABLE, SYSTEM) \
M(SYSTEM_SWARM, "SYSTEM STOP SWARM, SYSTEM START SWARM, STOP SWARM, START SWARM", GLOBAL, SYSTEM) \
M(SYSTEM_PULLING_REPLICATION_LOG, "SYSTEM STOP PULLING REPLICATION LOG, SYSTEM START PULLING REPLICATION LOG", TABLE, SYSTEM) \
M(SYSTEM_CLEANUP, "SYSTEM STOP CLEANUP, SYSTEM START CLEANUP", TABLE, SYSTEM) \
M(SYSTEM_VIEWS, "SYSTEM REFRESH VIEW, SYSTEM START VIEWS, SYSTEM STOP VIEWS, SYSTEM START VIEW, SYSTEM STOP VIEW, SYSTEM CANCEL VIEW, REFRESH VIEW, START VIEWS, STOP VIEWS, START VIEW, STOP VIEW, CANCEL VIEW", VIEW, SYSTEM) \
Expand Down
40 changes: 39 additions & 1 deletion src/Interpreters/ClusterDiscovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,9 @@ bool ClusterDiscovery::upsertCluster(ClusterInfo & cluster_info)
return true;
};

if (!cluster_info.current_node_is_observer && !contains(node_uuids, current_node_name))
if (!cluster_info.current_node_is_observer
&& !context->isStopSwarmCalled()
&& !contains(node_uuids, current_node_name))
{
LOG_ERROR(log, "Can't find current node in cluster '{}', will register again", cluster_info.name);
registerInZk(zk, cluster_info);
Expand Down Expand Up @@ -454,12 +456,30 @@ void ClusterDiscovery::registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & inf
return;
}

if (context->isStopSwarmCalled())
{
LOG_DEBUG(log, "STOP SWARM called, skip self-registering current node {} in cluster {}", current_node_name, info.name);
return;
}

LOG_DEBUG(log, "Registering current node {} in cluster {}", current_node_name, info.name);

zk->createOrUpdate(node_path, info.current_node.serialize(), zkutil::CreateMode::Ephemeral);
LOG_DEBUG(log, "Current node {} registered in cluster {}", current_node_name, info.name);
}

void ClusterDiscovery::unregisterFromZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info)
{
if (info.current_node_is_observer)
return;

String node_path = getShardsListPath(info.zk_root) / current_node_name;
LOG_DEBUG(log, "Removing current node {} from cluster {}", current_node_name, info.name);

zk->remove(node_path);
LOG_DEBUG(log, "Current node {} removed from cluster {}", current_node_name, info.name);
}

void ClusterDiscovery::initialUpdate()
{
LOG_DEBUG(log, "Initializing");
Expand Down Expand Up @@ -505,6 +525,24 @@ void ClusterDiscovery::initialUpdate()
is_initialized = true;
}

void ClusterDiscovery::registerAll()
{
for (auto & [_, info] : clusters_info)
{
auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name);
registerInZk(zk, info);
}
}

void ClusterDiscovery::unregisterAll()
{
for (auto & [_, info] : clusters_info)
{
auto zk = context->getDefaultOrAuxiliaryZooKeeper(info.zk_name);
unregisterFromZk(zk, info);
}
}

void ClusterDiscovery::findDynamicClusters(
std::unordered_map<String, ClusterDiscovery::ClusterInfo> & info,
std::unordered_set<size_t> * unchanged_roots)
Expand Down
4 changes: 4 additions & 0 deletions src/Interpreters/ClusterDiscovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ class ClusterDiscovery

~ClusterDiscovery();

void registerAll();
void unregisterAll();

private:
struct NodeInfo
{
Expand Down Expand Up @@ -125,6 +128,7 @@ class ClusterDiscovery
void initialUpdate();

void registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info);
void unregisterFromZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info);

Strings getNodeNames(zkutil::ZooKeeperPtr & zk,
const String & zk_root,
Expand Down
43 changes: 41 additions & 2 deletions src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ struct ContextSharedPart : boost::noncopyable
std::map<String, UInt16> server_ports;

std::atomic<bool> shutdown_called = false;
std::atomic<bool> stop_swarm_called = false;

Stopwatch uptime_watch TSA_GUARDED_BY(mutex);

Expand Down Expand Up @@ -734,6 +735,7 @@ struct ContextSharedPart : boost::noncopyable
*/
void shutdown() TSA_NO_THREAD_SAFETY_ANALYSIS
{
stop_swarm_called = true;
bool is_shutdown_called = shutdown_called.exchange(true);
if (is_shutdown_called)
return;
Expand Down Expand Up @@ -913,6 +915,16 @@ struct ContextSharedPart : boost::noncopyable
total_memory_tracker.resetPageCache();
}

void stopSwarm()
{
stop_swarm_called = true;
}

void startSwarm()
{
stop_swarm_called = false;
}

bool hasTraceCollector() const
{
return trace_collector.has_value();
Expand Down Expand Up @@ -4481,7 +4493,6 @@ std::shared_ptr<Cluster> Context::getCluster(const std::string & cluster_name) c
throw Exception(ErrorCodes::CLUSTER_DOESNT_EXIST, "Requested cluster '{}' not found", cluster_name);
}


std::shared_ptr<Cluster> Context::tryGetCluster(const std::string & cluster_name) const
{
std::shared_ptr<Cluster> res = nullptr;
Expand All @@ -4500,6 +4511,21 @@ std::shared_ptr<Cluster> Context::tryGetCluster(const std::string & cluster_name
return res;
}

void Context::unregisterInDynamicClusters()
{
std::lock_guard lock(shared->clusters_mutex);
if (!shared->cluster_discovery)
return;
shared->cluster_discovery->unregisterAll();
}

void Context::registerInDynamicClusters()
{
std::lock_guard lock(shared->clusters_mutex);
if (!shared->cluster_discovery)
return;
shared->cluster_discovery->registerAll();
}

void Context::reloadClusterConfig() const
{
Expand Down Expand Up @@ -5350,12 +5376,25 @@ void Context::stopServers(const ServerType & server_type) const
shared->stop_servers_callback(server_type);
}


void Context::shutdown() TSA_NO_THREAD_SAFETY_ANALYSIS
{
shared->shutdown();
}

void Context::stopSwarm()
{
shared->stop_swarm_called = true;
}

void Context::startSwarm()
{
shared->stop_swarm_called = false;
}

bool Context::isStopSwarmCalled() const
{
return shared->stop_swarm_called;
}

Context::ApplicationType Context::getApplicationType() const
{
Expand Down
7 changes: 7 additions & 0 deletions src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -1225,6 +1225,8 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
size_t getClustersVersion() const;

void startClusterDiscovery();
void registerInDynamicClusters();
void unregisterInDynamicClusters();

/// Sets custom cluster, but doesn't update configuration
void setCluster(const String & cluster_name, const std::shared_ptr<Cluster> & cluster);
Expand Down Expand Up @@ -1335,6 +1337,11 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>

void shutdown();

/// Stop some works to allow graceful shutdown later
void stopSwarm();
void startSwarm();
bool isStopSwarmCalled() const;

bool isInternalQuery() const { return is_internal_query; }
void setInternalQuery(bool internal) { is_internal_query = internal; }

Expand Down
20 changes: 20 additions & 0 deletions src/Interpreters/InterpreterSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,20 @@ BlockIO InterpreterSystemQuery::execute()
case Type::START_MOVES:
startStopAction(ActionLocks::PartsMove, true);
break;
case Type::STOP_SWARM:
{
getContext()->checkAccess(AccessType::SYSTEM_SWARM);
getContext()->stopSwarm();
getContext()->unregisterInDynamicClusters();
break;
}
case Type::START_SWARM:
{
getContext()->checkAccess(AccessType::SYSTEM_SWARM);
getContext()->startSwarm();
getContext()->registerInDynamicClusters();
break;
}
case Type::STOP_FETCHES:
startStopAction(ActionLocks::PartsFetch, false);
break;
Expand Down Expand Up @@ -1564,6 +1578,12 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
required_access.emplace_back(AccessType::SYSTEM_MOVES, query.getDatabase(), query.getTable());
break;
}
case Type::STOP_SWARM:
case Type::START_SWARM:
{
required_access.emplace_back(AccessType::SYSTEM_SWARM);
break;
}
case Type::STOP_PULLING_REPLICATION_LOG:
case Type::START_PULLING_REPLICATION_LOG:
{
Expand Down
2 changes: 2 additions & 0 deletions src/Parsers/ASTSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,8 @@ void ASTSystemQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & setti
case Type::DROP_PAGE_CACHE:
case Type::STOP_REPLICATED_DDL_QUERIES:
case Type::START_REPLICATED_DDL_QUERIES:
case Type::STOP_SWARM:
case Type::START_SWARM:
break;
case Type::UNKNOWN:
case Type::END:
Expand Down
2 changes: 2 additions & 0 deletions src/Parsers/ASTSystemQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster
START_FETCHES,
STOP_MOVES,
START_MOVES,
STOP_SWARM,
START_SWARM,
STOP_REPLICATED_SENDS,
START_REPLICATED_SENDS,
STOP_REPLICATION_QUEUES,
Expand Down
5 changes: 5 additions & 0 deletions src/QueryPipeline/RemoteQueryExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,11 @@ void RemoteQueryExecutor::setProfileInfoCallback(ProfileInfoCallback callback)
profile_info_callback = std::move(callback);
}

bool RemoteQueryExecutor::skipUnavailableShards() const
{
return context->getSettingsRef()[Setting::skip_unavailable_shards];
}

bool RemoteQueryExecutor::needToSkipUnavailableShard() const
{
return context->getSettingsRef()[Setting::skip_unavailable_shards] && (0 == connections->size());
Expand Down
2 changes: 2 additions & 0 deletions src/QueryPipeline/RemoteQueryExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ class RemoteQueryExecutor

IConnections & getConnections() { return *connections; }

bool skipUnavailableShards() const;

bool needToSkipUnavailableShard() const;

bool isReplicaUnavailable() const { return extension && extension->parallel_reading_coordinator && connections->size() == 0; }
Expand Down
33 changes: 25 additions & 8 deletions src/QueryPipeline/RemoteQueryExecutorReadContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace ErrorCodes
extern const int CANNOT_READ_FROM_SOCKET;
extern const int CANNOT_OPEN_FILE;
extern const int SOCKET_TIMEOUT;
extern const int ATTEMPT_TO_READ_AFTER_EOF;
}

RemoteQueryExecutorReadContext::RemoteQueryExecutorReadContext(
Expand Down Expand Up @@ -56,16 +57,32 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus

while (true)
{
read_context.has_read_packet_part = PacketPart::None;

if (read_context.read_packet_type_separately)
try
{
read_context.has_read_packet_part = PacketPart::None;

if (read_context.read_packet_type_separately)
{
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
read_context.has_read_packet_part = PacketPart::Type;
suspend_callback();
}
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
read_context.has_read_packet_part = PacketPart::Body;
if (read_context.packet.type == Protocol::Server::Data)
read_context.has_data_packets = true;
}
catch (const Exception & e)
{
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
read_context.has_read_packet_part = PacketPart::Type;
suspend_callback();
if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF
&& !read_context.has_data_packets.load() && read_context.executor.skipUnavailableShards())
{
read_context.has_read_packet_part = PacketPart::None;
}
else
throw;
}
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
read_context.has_read_packet_part = PacketPart::Body;

suspend_callback();
}
}
Expand Down
1 change: 1 addition & 0 deletions src/QueryPipeline/RemoteQueryExecutorReadContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor
/// None -> Type -> Body -> None
/// None -> Body -> None
std::atomic<PacketPart> has_read_packet_part = PacketPart::None;
std::atomic_bool has_data_packets = false;
Packet packet;

RemoteQueryExecutor & executor;
Expand Down
22 changes: 19 additions & 3 deletions src/Storages/ObjectStorage/StorageObjectStorageSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,10 @@ std::shared_ptr<IObjectIterator> StorageObjectStorageSource::createFileIterator(

if (distributed_processing)
{
auto distributed_iterator = std::make_unique<ReadTaskIterator>(local_context->getReadTaskCallback(), local_context->getSettingsRef()[Setting::max_threads]);
auto distributed_iterator = std::make_unique<ReadTaskIterator>(
local_context->getReadTaskCallback(),
local_context->getSettingsRef()[Setting::max_threads],
local_context);

if (is_archive)
return std::make_shared<ArchiveIterator>(object_storage, configuration, std::move(distributed_iterator), local_context, nullptr);
Expand Down Expand Up @@ -937,9 +940,16 @@ StorageObjectStorageSource::ReaderHolder::operator=(ReaderHolder && other) noexc
}

StorageObjectStorageSource::ReadTaskIterator::ReadTaskIterator(
const ReadTaskCallback & callback_, size_t max_threads_count)
: callback(callback_)
const ReadTaskCallback & callback_, size_t max_threads_count, ContextPtr context_)
: WithContext(context_)
, callback(callback_)
{
if (getContext()->isStopSwarmCalled())
{
LOG_DEBUG(getLogger("StorageObjectStorageSource"), "STOP SWARM called, stop getting new tasks");
return;
}

ThreadPool pool(
CurrentMetrics::StorageObjectStorageThreads,
CurrentMetrics::StorageObjectStorageThreadsActive,
Expand Down Expand Up @@ -969,6 +979,12 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::ReadTaskIterator
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
if (current_index >= buffer.size())
{
if (getContext()->isStopSwarmCalled())
{
LOG_DEBUG(getLogger("StorageObjectStorageSource"), "STOP SWARM called, stop getting new tasks");
return nullptr;
}

auto key = callback();
if (key.empty())
return nullptr;
Expand Down
Loading
Loading