From ad969de7843a172901ccf9e5408fb7f0213752f1 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 24 Apr 2025 12:29:48 +0200 Subject: [PATCH 1/2] Fix addresses_with_failover in cluster requests --- src/Interpreters/Cluster.cpp | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/Interpreters/Cluster.cpp b/src/Interpreters/Cluster.cpp index 7366efa37642..0b85946dbd95 100644 --- a/src/Interpreters/Cluster.cpp +++ b/src/Interpreters/Cluster.cpp @@ -775,6 +775,7 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti UInt32 shard_num = 0; std::set> unique_hosts; + Addresses all_addresses; for (size_t shard_index : collections::range(0, from.shards_info.size())) { auto create_shards_from_replicas = [&](std::span replicas) @@ -790,6 +791,7 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti if (address.is_local) info.local_addresses.push_back(address); + all_addresses.push_back(address); auto pool = ConnectionPoolFactory::instance().get( static_cast(settings[Setting::distributed_connections_pool_size]), @@ -838,14 +840,21 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti std::shuffle(shards_info.begin(), shards_info.end(), gen); shards_info.resize(max_hosts); + Addresses all_addresses_; + shard_num = 0; for (auto & shard_info : shards_info) + { + all_addresses_.push_back(all_addresses[shard_info.shard_num - 1]); shard_info.shard_num = ++shard_num; + } + + all_addresses.swap(all_addresses_); } for (size_t i = 0; i < shards_info.size(); ++i) { - addresses_with_failover.emplace_back(shards_info[i].local_addresses); + addresses_with_failover.emplace_back(Addresses({all_addresses[shards_info[i].shard_num - 1]})); slot_to_shard.insert(std::end(slot_to_shard), shards_info[i].weight, i); } From 79557963d14a3f23582ce13ec1047e078d0be9c2 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 25 Apr 2025 11:11:27 +0200 Subject: [PATCH 2/2] Move cluster cut logic in separate method --- src/Interpreters/Cluster.cpp | 42 +++++++++++++++++++----------------- src/Interpreters/Cluster.h | 3 +++ 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/src/Interpreters/Cluster.cpp b/src/Interpreters/Cluster.cpp index 0b85946dbd95..5e90249d0b6e 100644 --- a/src/Interpreters/Cluster.cpp +++ b/src/Interpreters/Cluster.cpp @@ -775,7 +775,6 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti UInt32 shard_num = 0; std::set> unique_hosts; - Addresses all_addresses; for (size_t shard_index : collections::range(0, from.shards_info.size())) { auto create_shards_from_replicas = [&](std::span replicas) @@ -791,7 +790,7 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti if (address.is_local) info.local_addresses.push_back(address); - all_addresses.push_back(address); + addresses_with_failover.emplace_back(Addresses({address})); auto pool = ConnectionPoolFactory::instance().get( static_cast(settings[Setting::distributed_connections_pool_size]), @@ -834,31 +833,34 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti secret = from.secret; name = from.name; - if (max_hosts > 0 && shards_info.size() > max_hosts) - { - pcg64_fast gen{randomSeed()}; - std::shuffle(shards_info.begin(), shards_info.end(), gen); - shards_info.resize(max_hosts); + constrainShardInfoAndAddressesToMaxHosts(max_hosts); - Addresses all_addresses_; + for (size_t i = 0; i < shards_info.size(); ++i) + slot_to_shard.insert(std::end(slot_to_shard), shards_info[i].weight, i); - shard_num = 0; - for (auto & shard_info : shards_info) - { - all_addresses_.push_back(all_addresses[shard_info.shard_num - 1]); - shard_info.shard_num = ++shard_num; - } + initMisc(); +} - all_addresses.swap(all_addresses_); - } - for (size_t i = 0; i < shards_info.size(); ++i) +void Cluster::constrainShardInfoAndAddressesToMaxHosts(size_t max_hosts) +{ + if (max_hosts == 0 || shards_info.size() <= max_hosts) + return; + + pcg64_fast gen{randomSeed()}; + std::shuffle(shards_info.begin(), shards_info.end(), gen); + shards_info.resize(max_hosts); + + AddressesWithFailover addresses_with_failover_; + + UInt32 shard_num = 0; + for (auto & shard_info : shards_info) { - addresses_with_failover.emplace_back(Addresses({all_addresses[shards_info[i].shard_num - 1]})); - slot_to_shard.insert(std::end(slot_to_shard), shards_info[i].weight, i); + addresses_with_failover_.push_back(addresses_with_failover[shard_info.shard_num - 1]); + shard_info.shard_num = ++shard_num; } - initMisc(); + addresses_with_failover.swap(addresses_with_failover_); } diff --git a/src/Interpreters/Cluster.h b/src/Interpreters/Cluster.h index b581963b08dc..6d0f5c204e5e 100644 --- a/src/Interpreters/Cluster.h +++ b/src/Interpreters/Cluster.h @@ -304,6 +304,9 @@ class Cluster ShardInfoInsertPathForInternalReplication insert_paths = {}, bool internal_replication = false); + /// Reduce size of cluster to max_hosts + void constrainShardInfoAndAddressesToMaxHosts(size_t max_hosts); + /// Inter-server secret String secret;