Skip to content

Commit 5285c2a

Browse files
mpenickMichael Penick
authored andcommitted
CPP-713 - Fix load distribution of RR and DC-aware policies (#218)
CPP-701 introduced a bug where round-robin and DC-aware policies would not equally distribute requests to the remaining nodes during a node outage or removal.
1 parent a102c78 commit 5285c2a

File tree

10 files changed

+225
-29
lines changed

10 files changed

+225
-29
lines changed

cpp-driver/gtests/src/unit/tests/test_load_balancing.cpp

Lines changed: 178 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,38 @@ void verify_sequence(cass::QueryPlan* qp, const cass::Vector<size_t>& sequence)
8383
EXPECT_FALSE(qp->compute_next(&received));
8484
}
8585

86+
typedef cass::Map<cass::Address, int> QueryCounts;
87+
88+
QueryCounts run_policy(cass::LoadBalancingPolicy& policy, int count) {
89+
QueryCounts counts;
90+
for (int i = 0; i < 12; ++i) {
91+
cass::ScopedPtr<cass::QueryPlan> qp(policy.new_query_plan("ks", NULL, NULL));
92+
cass::Host::Ptr host(qp->compute_next());
93+
if (host) {
94+
counts[host->address()] += 1;
95+
}
96+
}
97+
return counts;
98+
}
99+
100+
void verify_dcs(const QueryCounts& counts,
101+
const cass::HostMap& hosts,
102+
const cass::String& expected_dc) {
103+
for (QueryCounts::const_iterator it = counts.begin(),
104+
end = counts.end(); it != end; ++it) {
105+
cass::HostMap::const_iterator host_it = hosts.find(it->first);
106+
ASSERT_NE(host_it, hosts.end());
107+
EXPECT_EQ(expected_dc, host_it->second->dc());
108+
}
109+
}
110+
111+
void verify_query_counts(const QueryCounts& counts, int expected_count) {
112+
for (QueryCounts::const_iterator it = counts.begin(),
113+
end = counts.end(); it != end; ++it) {
114+
EXPECT_EQ(expected_count, it->second);
115+
}
116+
}
117+
86118
struct RunPeriodicTask : public cass::EventLoop {
87119
RunPeriodicTask(cass::LatencyAwarePolicy* policy)
88120
: policy(policy) {
@@ -194,7 +226,7 @@ TEST(RoundRobinLoadBalancingUnitTest, OnAdd) {
194226
cass::Address addr_new = addr_for_sequence(seq_new);
195227
cass::SharedRefPtr<cass::Host> host = host_for_addr(addr_new);
196228
policy.on_host_added(host);
197-
policy.on_host_up(host->address());
229+
policy.on_host_up(host);
198230

199231
cass::ScopedPtr<cass::QueryPlan> qp2(policy.new_query_plan("ks", NULL, NULL));
200232
const size_t seq2[] = {2, seq_new, 1};
@@ -248,26 +280,64 @@ TEST(RoundRobinLoadBalancingUnitTest, OnUpAndDown) {
248280
}
249281

250282
// host is added to the list, but not 'up'
251-
policy.on_host_up(host->address());
283+
policy.on_host_up(host);
252284

253285
cass::ScopedPtr<cass::QueryPlan> qp_after1(policy.new_query_plan("ks", NULL, NULL));
254286
cass::ScopedPtr<cass::QueryPlan> qp_after2(policy.new_query_plan("ks", NULL, NULL));
255287

256288
policy.on_host_down(host->address());
257289
// 1 is dynamically excluded from plan
258290
{
259-
const size_t seq[] = {3, 2};
291+
const size_t seq[] = {2, 3};
260292
verify_sequence(qp_after1.get(), VECTOR_FROM(size_t, seq));
261293
}
262294

263-
policy.on_host_up(host->address());
295+
policy.on_host_up(host);
264296
// now included
265297
{
266-
const size_t seq[] = {1, 2, 3};
298+
const size_t seq[] = {2, 3, 1};
267299
verify_sequence(qp_after2.get(), VECTOR_FROM(size_t, seq));
268300
}
269301
}
270302

303+
TEST(RoundRobinLoadBalancingUnitTest, VerifyEqualDistribution) {
304+
cass::HostMap hosts;
305+
populate_hosts(3, "rack", "dc", &hosts);
306+
307+
cass::RoundRobinPolicy policy;
308+
policy.init(cass::SharedRefPtr<cass::Host>(), hosts, NULL);
309+
310+
{ // All nodes
311+
QueryCounts counts(run_policy(policy, 12));
312+
ASSERT_EQ(counts.size(), 3u);
313+
verify_query_counts(counts, 4);
314+
}
315+
316+
policy.on_host_down(hosts.begin()->first);
317+
318+
{ // One node down
319+
QueryCounts counts(run_policy(policy, 12));
320+
ASSERT_EQ(counts.size(), 2u);
321+
verify_query_counts(counts, 6);
322+
}
323+
324+
policy.on_host_up(hosts.begin()->second);
325+
326+
{ // All nodes again
327+
QueryCounts counts(run_policy(policy, 12));
328+
ASSERT_EQ(counts.size(), 3u);
329+
verify_query_counts(counts, 4);
330+
}
331+
332+
policy.on_host_removed(hosts.begin()->second);
333+
334+
{ // One node removed
335+
QueryCounts counts(run_policy(policy, 12));
336+
ASSERT_EQ(counts.size(), 2u);
337+
verify_query_counts(counts, 6);
338+
}
339+
}
340+
271341
TEST(DatacenterAwareLoadBalancingUnitTest, SomeDatacenterLocalUnspecified) {
272342
const size_t total_hosts = 3;
273343
cass::HostMap hosts;
@@ -302,7 +372,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, SingleLocalDown) {
302372
verify_sequence(qp_before.get(), VECTOR_FROM(size_t, seq));
303373
}
304374

305-
policy.on_host_up(target_host->address());
375+
policy.on_host_up(target_host);
306376
{
307377
const size_t seq[] = {2, 3, 1, 4}; // local dc wrapped before remote offered
308378
verify_sequence(qp_after.get(), VECTOR_FROM(size_t, seq));
@@ -328,7 +398,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, AllLocalRemovedReturned) {
328398
verify_sequence(qp_after.get(), VECTOR_FROM(size_t, seq));
329399
}
330400

331-
policy.on_host_up(target_host->address());
401+
policy.on_host_up(target_host);
332402

333403
// make sure we get the local node first after on_up
334404
cass::ScopedPtr<cass::QueryPlan> qp(policy.new_query_plan("ks", NULL, NULL));
@@ -358,7 +428,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, RemoteRemovedReturned) {
358428
verify_sequence(qp_after.get(), VECTOR_FROM(size_t, seq));
359429
}
360430

361-
policy.on_host_up(target_host->address());
431+
policy.on_host_up(target_host);
362432

363433
// make sure we get both nodes, correct order after
364434
cass::ScopedPtr<cass::QueryPlan> qp(policy.new_query_plan("ks", NULL, NULL));
@@ -474,6 +544,102 @@ cass::Vector<cass::String> single_token(int64_t token) {
474544
return cass::Vector<cass::String>(1, ss.str());
475545
}
476546

547+
TEST(DatacenterAwareLoadBalancingUnitTest, VerifyEqualDistributionLocalDc) {
548+
cass::HostMap hosts;
549+
populate_hosts(3, "rack", LOCAL_DC, &hosts);
550+
populate_hosts(3, "rack", REMOTE_DC, &hosts);
551+
552+
cass::DCAwarePolicy policy("", 0, false);
553+
policy.init(hosts.begin()->second, hosts, NULL);
554+
555+
{ // All local nodes
556+
QueryCounts counts(run_policy(policy, 12));
557+
verify_dcs(counts, hosts, LOCAL_DC);
558+
ASSERT_EQ(counts.size(), 3u);
559+
verify_query_counts(counts, 4);
560+
}
561+
562+
policy.on_host_down(hosts.begin()->first);
563+
564+
{ // One local node down
565+
QueryCounts counts(run_policy(policy, 12));
566+
verify_dcs(counts, hosts, LOCAL_DC);
567+
ASSERT_EQ(counts.size(), 2u);
568+
verify_query_counts(counts, 6);
569+
}
570+
571+
policy.on_host_up(hosts.begin()->second);
572+
573+
{ // All local nodes again
574+
QueryCounts counts(run_policy(policy, 12));
575+
verify_dcs(counts, hosts, LOCAL_DC);
576+
ASSERT_EQ(counts.size(), 3u);
577+
verify_query_counts(counts, 4);
578+
}
579+
580+
policy.on_host_removed(hosts.begin()->second);
581+
582+
{ // One local node removed
583+
QueryCounts counts(run_policy(policy, 12));
584+
verify_dcs(counts, hosts, LOCAL_DC);
585+
ASSERT_EQ(counts.size(), 2u);
586+
verify_query_counts(counts, 6);
587+
}
588+
}
589+
590+
TEST(DatacenterAwareLoadBalancingUnitTest, VerifyEqualDistributionRemoteDc) {
591+
cass::HostMap hosts;
592+
populate_hosts(3, "rack", LOCAL_DC, &hosts);
593+
populate_hosts(3, "rack", REMOTE_DC, &hosts);
594+
595+
cass::DCAwarePolicy policy("", 3, false); // Allow all remote DC nodes
596+
policy.init(hosts.begin()->second, hosts, NULL);
597+
598+
cass::Host::Ptr remote_dc_node1;
599+
{ // Mark down all local nodes
600+
cass::HostMap::iterator it = hosts.begin();
601+
for (int i = 0; i < 3; ++i) {
602+
policy.on_host_down(it->first);
603+
it++;
604+
}
605+
remote_dc_node1 = it->second;
606+
}
607+
608+
{ // All remote nodes
609+
QueryCounts counts(run_policy(policy, 12));
610+
verify_dcs(counts, hosts, REMOTE_DC);
611+
ASSERT_EQ(counts.size(), 3u);
612+
verify_query_counts(counts, 4);
613+
}
614+
615+
policy.on_host_down(remote_dc_node1->address());
616+
617+
{ // One remote node down
618+
QueryCounts counts(run_policy(policy, 12));
619+
verify_dcs(counts, hosts, REMOTE_DC);
620+
ASSERT_EQ(counts.size(), 2u);
621+
verify_query_counts(counts, 6);
622+
}
623+
624+
policy.on_host_up(remote_dc_node1);
625+
626+
{ // All remote nodes again
627+
QueryCounts counts(run_policy(policy, 12));
628+
verify_dcs(counts, hosts, REMOTE_DC);
629+
ASSERT_EQ(counts.size(), 3u);
630+
verify_query_counts(counts, 4);
631+
}
632+
633+
policy.on_host_removed(remote_dc_node1);
634+
635+
{ // One remote node removed
636+
QueryCounts counts(run_policy(policy, 12));
637+
verify_dcs(counts, hosts, REMOTE_DC);
638+
ASSERT_EQ(counts.size(), 2u);
639+
verify_query_counts(counts, 6);
640+
}
641+
}
642+
477643
TEST(TokenAwareLoadBalancingUnitTest, Simple) {
478644
const int64_t num_hosts = 4;
479645
cass::HostMap hosts;
@@ -530,7 +696,7 @@ TEST(TokenAwareLoadBalancingUnitTest, Simple) {
530696
}
531697

532698
// Restore the first host and bring down the first token aware replica
533-
policy.on_host_up(curr_host_it->second->address());
699+
policy.on_host_up(curr_host_it->second);
534700
++curr_host_it; // 2.0.0.0
535701
++curr_host_it; // 3.0.0.0
536702
++curr_host_it; // 4.0.0.0
@@ -601,19 +767,19 @@ TEST(TokenAwareLoadBalancingUnitTest, NetworkTopology) {
601767

602768
{
603769
cass::ScopedPtr<cass::QueryPlan> qp(policy.new_query_plan("test", request_handler.get(), token_map.get()));
604-
const size_t seq[] = { 3, 5, 7, 6, 2, 4 };
770+
const size_t seq[] = { 3, 5, 7, 4, 6, 2 };
605771
verify_sequence(qp.get(), VECTOR_FROM(size_t, seq));
606772
}
607773

608774
// Restore the first host and bring down the first token aware replica
609-
policy.on_host_up(curr_host_it->second->address());
775+
policy.on_host_up(curr_host_it->second);
610776
++curr_host_it; // 2.0.0.0
611777
++curr_host_it; // 3.0.0.0
612778
policy.on_host_down(curr_host_it->second->address());
613779

614780
{
615781
cass::ScopedPtr<cass::QueryPlan> qp(policy.new_query_plan("test", request_handler.get(), token_map.get()));
616-
const size_t seq[] = { 5, 7, 1, 2, 4, 6 };
782+
const size_t seq[] = { 5, 7, 1, 6, 2, 4 };
617783
verify_sequence(qp.get(), VECTOR_FROM(size_t, seq));
618784
}
619785
}

cpp-driver/src/cluster.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,7 @@ void Cluster::internal_notify_host_up(const Address& address) {
495495

496496
for (LoadBalancingPolicy::Vec::const_iterator it = load_balancing_policies_.begin(),
497497
end = load_balancing_policies_.end(); it != end; ++it) {
498-
(*it)->on_host_up(address);
498+
(*it)->on_host_up(host);
499499
}
500500

501501
if (is_host_ignored(host)) {

cpp-driver/src/dc_aware_policy.cpp

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,12 +119,20 @@ void DCAwarePolicy::on_host_removed(const Host::Ptr& host) {
119119
available_.erase(host->address());
120120
}
121121

122-
void DCAwarePolicy::on_host_up(const Address& address) {
122+
void DCAwarePolicy::on_host_up(const Host::Ptr& host) {
123+
on_host_added(host);
124+
123125
ScopedWriteLock wl(&available_rwlock_);
124-
available_.insert(address);
126+
available_.insert(host->address());
125127
}
126128

127129
void DCAwarePolicy::on_host_down(const Address& address) {
130+
if (!remove_host(local_dc_live_hosts_, address) &&
131+
!per_remote_dc_live_hosts_.remove_host(address)) {
132+
LOG_DEBUG("Attempted to mark host %s as DOWN, but it doesn't exist",
133+
address.to_string().c_str());
134+
}
135+
128136
ScopedWriteLock wl(&available_rwlock_);
129137
available_.erase(address);
130138
}
@@ -145,8 +153,19 @@ void DCAwarePolicy::PerDCHostMap::remove_host_from_dc(const String& dc, const Ho
145153
ScopedWriteLock wl(&rwlock_);
146154
Map::iterator i = map_.find(dc);
147155
if (i != map_.end()) {
148-
remove_host(i->second, host);
156+
cass::remove_host(i->second, host);
157+
}
158+
}
159+
160+
bool DCAwarePolicy::PerDCHostMap::remove_host(const Address& address) {
161+
ScopedWriteLock wl(&rwlock_);
162+
for (Map::iterator i = map_.begin(),
163+
end = map_.end(); i != end; ++i) {
164+
if (cass::remove_host(i->second, address)) {
165+
return true;
166+
}
149167
}
168+
return false;
150169
}
151170

152171
const CopyOnWriteHostVec& DCAwarePolicy::PerDCHostMap::get_hosts(const String& dc) const {

cpp-driver/src/dc_aware_policy.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class DCAwarePolicy : public LoadBalancingPolicy {
4949

5050
virtual void on_host_added(const Host::Ptr& host);
5151
virtual void on_host_removed(const Host::Ptr& host);
52-
virtual void on_host_up(const Address& address);
52+
virtual void on_host_up(const Host::Ptr& host);
5353
virtual void on_host_down(const Address& address);
5454

5555
virtual LoadBalancingPolicy* new_instance() {
@@ -71,6 +71,7 @@ class DCAwarePolicy : public LoadBalancingPolicy {
7171

7272
void add_host_to_dc(const String& dc, const Host::Ptr& host);
7373
void remove_host_from_dc(const String& dc, const Host::Ptr& host);
74+
bool remove_host(const Address& address);
7475
const CopyOnWriteHostVec& get_hosts(const String& dc) const;
7576
void copy_dcs(KeySet* dcs) const;
7677

cpp-driver/src/host.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,18 @@ void add_host(CopyOnWriteHostVec& hosts, const Host::Ptr& host) {
3636
}
3737

3838
void remove_host(CopyOnWriteHostVec& hosts, const Host::Ptr& host) {
39+
remove_host(hosts, host->address());
40+
}
41+
42+
bool remove_host(CopyOnWriteHostVec& hosts, const Address& address) {
3943
HostVec::iterator i;
4044
for (i = hosts->begin(); i != hosts->end(); ++i) {
41-
if ((*i)->address() == host->address()) {
45+
if ((*i)->address() == address) {
4246
hosts->erase(i);
43-
break;
47+
return true;
4448
}
4549
}
50+
return false;
4651
}
4752

4853
void Host::LatencyTracker::update(uint64_t latency_ns) {

cpp-driver/src/host.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ typedef CopyOnWritePtr<HostVec> CopyOnWriteHostVec;
289289

290290
void add_host(CopyOnWriteHostVec& hosts, const Host::Ptr& host);
291291
void remove_host(CopyOnWriteHostVec& hosts, const Host::Ptr& host);
292+
bool remove_host(CopyOnWriteHostVec& hosts, const Address& address);
292293

293294
} // namespace cass
294295

cpp-driver/src/load_balancing.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ class LoadBalancingPolicy
9494
virtual bool is_host_up(const Address& address) const = 0;
9595
virtual void on_host_added(const Host::Ptr& host) = 0;
9696
virtual void on_host_removed(const Host::Ptr& host) = 0;
97-
virtual void on_host_up(const Address& address) = 0;
97+
virtual void on_host_up(const Host::Ptr& host) = 0;
9898
virtual void on_host_down(const Address& address) = 0;
9999

100100
virtual QueryPlan* new_query_plan(const String& keyspace,
@@ -134,7 +134,7 @@ class ChainedLoadBalancingPolicy : public LoadBalancingPolicy {
134134

135135
virtual void on_host_added(const Host::Ptr& host) { child_policy_->on_host_added(host); }
136136
virtual void on_host_removed(const Host::Ptr& host) { child_policy_->on_host_removed(host); }
137-
virtual void on_host_up(const Address& address) { child_policy_->on_host_up(address); }
137+
virtual void on_host_up(const Host::Ptr& host) { child_policy_->on_host_up(host); }
138138
virtual void on_host_down(const Address& address) { child_policy_->on_host_down(address); }
139139

140140
protected:

0 commit comments

Comments
 (0)