Skip to content

Commit 239075c

Browse files
committed
CPP-913 Fix: Ensure no duplicates in token map replica sets
1 parent dfce03c commit 239075c

File tree

3 files changed

+131
-19
lines changed

3 files changed

+131
-19
lines changed

src/token_map.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ class TokenMap : public RefCounted<TokenMap> {
5252

5353
virtual const CopyOnWriteHostVec& get_replicas(const String& keyspace_name,
5454
const String& routing_key) const = 0;
55+
56+
virtual String dump(const String& keyspace_name) const = 0;
5557
};
5658

5759
}}} // namespace datastax::internal::core

src/token_map_impl.hpp

Lines changed: 83 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434

3535
#include <algorithm>
3636
#include <assert.h>
37+
#include <iomanip>
38+
#include <ios>
3739
#include <uv.h>
3840

3941
#define CASS_NETWORK_TOPOLOGY_STRATEGY "NetworkTopologyStrategy"
@@ -143,12 +145,32 @@ class ByteOrderedPartitioner {
143145
static StringRef name() { return "ByteOrderedPartitioner"; }
144146
};
145147

148+
inline std::ostream& operator<<(std::ostream& os, const RandomPartitioner::Token& token) {
149+
os << std::setfill('0') << std::setw(16) << std::hex << token.hi << std::setfill('0')
150+
<< std::setw(16) << std::hex << token.lo;
151+
return os;
152+
}
153+
154+
inline std::ostream& operator<<(std::ostream& os, const ByteOrderedPartitioner::Token& token) {
155+
for (ByteOrderedPartitioner::Token::const_iterator it = token.begin(), end = token.end();
156+
it != end; ++it) {
157+
os << std::hex << *it;
158+
}
159+
return os;
160+
}
161+
146162
class HostSet : public DenseHashSet<Host::Ptr> {
147163
public:
148164
HostSet() {
149165
set_empty_key(Host::Ptr(new Host(Address::EMPTY_KEY)));
150166
set_deleted_key(Host::Ptr(new Host(Address::DELETED_KEY)));
151167
}
168+
169+
template <class InputIterator>
170+
HostSet(InputIterator first, InputIterator last)
171+
: DenseHashSet<Host::Ptr>(first, last, Host::Ptr(new Host(Address::EMPTY_KEY))) {
172+
set_deleted_key(Host::Ptr(new Host(Address::DELETED_KEY)));
173+
}
152174
};
153175

154176
class RackSet : public DenseHashSet<uint32_t> {
@@ -355,6 +377,17 @@ void ReplicationStrategy<Partitioner>::build_replicas(const TokenHostVec& tokens
355377
}
356378
}
357379

380+
// Adds unique replica. It returns true if the replica was added.
381+
inline bool add_replica(CopyOnWriteHostVec& hosts, const Host::Ptr& host) {
382+
for (HostVec::const_reverse_iterator it = hosts->rbegin(); it != hosts->rend(); ++it) {
383+
if ((*it)->address() == host->address()) {
384+
return false; // Already in the replica set
385+
}
386+
}
387+
hosts->push_back(host);
388+
return true;
389+
}
390+
358391
template <class Partitioner>
359392
void ReplicationStrategy<Partitioner>::build_replicas_network_topology(
360393
const TokenHostVec& tokens, const DatacenterMap& datacenters, TokenReplicasVec& result) const {
@@ -443,24 +476,27 @@ void ReplicationStrategy<Partitioner>::build_replicas_network_topology(
443476
// datacenter only then consider hosts in the same rack
444477

445478
if (rack == 0 || racks_observed_this_dc.size() == rack_count_this_dc) {
446-
++replica_count_this_dc;
447-
replicas->push_back(Host::Ptr(host));
479+
if (add_replica(replicas, Host::Ptr(host))) {
480+
++replica_count_this_dc;
481+
}
448482
} else {
449483
TokenHostQueue& skipped_endpoints_this_dc = dc_rack_info.skipped_endpoints;
450484
if (racks_observed_this_dc.count(rack) > 0) {
451485
skipped_endpoints_this_dc.push_back(curr_token_it);
452486
} else {
453-
++replica_count_this_dc;
454-
replicas->push_back(Host::Ptr(host));
455-
racks_observed_this_dc.insert(rack);
487+
if (add_replica(replicas, Host::Ptr(host))) {
488+
++replica_count_this_dc;
489+
racks_observed_this_dc.insert(rack);
490+
}
456491

457492
// Once we visited every rack in the current datacenter then starting considering
458493
// hosts we've already skipped.
459494
if (racks_observed_this_dc.size() == rack_count_this_dc) {
460495
while (!skipped_endpoints_this_dc.empty() &&
461496
replica_count_this_dc < replication_factor) {
462-
++replica_count_this_dc;
463-
replicas->push_back(Host::Ptr(skipped_endpoints_this_dc.front()->second));
497+
if (add_replica(replicas, Host::Ptr(skipped_endpoints_this_dc.front()->second))) {
498+
++replica_count_this_dc;
499+
}
464500
skipped_endpoints_this_dc.pop_front();
465501
}
466502
}
@@ -484,9 +520,10 @@ void ReplicationStrategy<Partitioner>::build_replicas_simple(const TokenHostVec&
484520
for (typename TokenHostVec::const_iterator i = tokens.begin(), end = tokens.end(); i != end;
485521
++i) {
486522
CopyOnWriteHostVec replicas(new HostVec());
523+
replicas->reserve(num_replicas);
487524
typename TokenHostVec::const_iterator token_it = i;
488525
do {
489-
replicas->push_back(Host::Ptr(token_it->second));
526+
add_replica(replicas, Host::Ptr(Host::Ptr(token_it->second)));
490527
++token_it;
491528
if (token_it == tokens.end()) {
492529
token_it = tokens.begin();
@@ -578,7 +615,11 @@ class TokenMapImpl : public TokenMap {
578615
virtual const CopyOnWriteHostVec& get_replicas(const String& keyspace_name,
579616
const String& routing_key) const;
580617

581-
// Test only
618+
virtual String dump(const String& keyspace_name) const;
619+
620+
public:
621+
// Testing only
622+
582623
bool contains(const Token& token) const {
583624
for (typename TokenHostVec::const_iterator i = tokens_.begin(), end = tokens_.end(); i != end;
584625
++i) {
@@ -587,6 +628,8 @@ class TokenMapImpl : public TokenMap {
587628
return false;
588629
}
589630

631+
const TokenReplicasVec& token_replicas(const String& keyspace_name) const;
632+
590633
private:
591634
void update_keyspace(const VersionNumber& cassandra_version, const ResultResponse* result,
592635
bool should_build_replicas);
@@ -713,6 +756,35 @@ const CopyOnWriteHostVec& TokenMapImpl<Partitioner>::get_replicas(const String&
713756
return no_replicas_dummy_;
714757
}
715758

759+
template <class Partitioner>
760+
String TokenMapImpl<Partitioner>::dump(const String& keyspace_name) const {
761+
String result;
762+
typename KeyspaceReplicaMap::const_iterator ks_it = replicas_.find(keyspace_name);
763+
const TokenReplicasVec& replicas = ks_it->second;
764+
765+
for (typename TokenReplicasVec::const_iterator it = replicas.begin(), end = replicas.end();
766+
it != end; ++it) {
767+
OStringStream ss;
768+
ss << std::setw(20) << it->first << " [ ";
769+
const CopyOnWriteHostVec& hosts = it->second;
770+
for (HostVec::const_iterator host_it = hosts->begin(), end = hosts->end(); host_it != end;
771+
++host_it) {
772+
ss << (*host_it)->address_string() << " ";
773+
}
774+
ss << "]\n";
775+
result.append(ss.str());
776+
}
777+
return result;
778+
}
779+
780+
template <class Partitioner>
781+
const typename TokenMapImpl<Partitioner>::TokenReplicasVec&
782+
TokenMapImpl<Partitioner>::token_replicas(const String& keyspace_name) const {
783+
typename KeyspaceReplicaMap::const_iterator ks_it = replicas_.find(keyspace_name);
784+
static TokenReplicasVec not_found;
785+
return ks_it != replicas_.end() ? ks_it->second : not_found;
786+
}
787+
716788
template <class Partitioner>
717789
void TokenMapImpl<Partitioner>::update_keyspace(const VersionNumber& cassandra_version,
718790
const ResultResponse* result,
@@ -773,6 +845,8 @@ void TokenMapImpl<Partitioner>::build_replicas() {
773845
const String& keyspace_name = i->first;
774846
const ReplicationStrategy<Partitioner>& strategy = i->second;
775847
strategy.build_replicas(tokens_, datacenters_, replicas_[keyspace_name]);
848+
LOG_TRACE("Replicas for keyspace '%s':\n%s", keyspace_name.c_str(),
849+
dump(keyspace_name).c_str());
776850
}
777851
}
778852

tests/src/unit/tests/test_token_map.cpp

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ template <class Partitioner>
3030
struct TestTokenMap {
3131
typedef typename ReplicationStrategy<Partitioner>::Token Token;
3232
typedef Map<Token, Host::Ptr> TokenHostMap;
33+
typedef typename TokenMapImpl<Partitioner>::TokenReplicasVec TokenReplicasVec;
3334

3435
TokenHostMap tokens;
3536
TokenMap::Ptr token_map;
@@ -43,14 +44,11 @@ struct TestTokenMap {
4344
const String v(*i);
4445
tokens[Partitioner::from_string(*i)] = host;
4546
}
47+
token_map->add_host(host);
4648
}
4749

4850
void build(const String& keyspace_name = "ks", size_t replication_factor = 3) {
4951
add_keyspace_simple(keyspace_name, replication_factor, token_map.get());
50-
for (typename TokenHostMap::const_iterator i = tokens.begin(), end = tokens.end(); i != end;
51-
++i) {
52-
token_map->add_host(i->second);
53-
}
5452
token_map->build();
5553
}
5654

@@ -63,7 +61,7 @@ struct TestTokenMap {
6361
}
6462
}
6563

66-
void verify(const String& keyspace_name = "ks") {
64+
void verify(const String& keyspace_name = "ks", size_t replication_factor = 3) {
6765
const String keys[] = { "test", "abc", "def", "a", "b", "c", "d" };
6866

6967
for (size_t i = 0; i < sizeof(keys) / sizeof(keys[0]); ++i) {
@@ -78,6 +76,25 @@ struct TestTokenMap {
7876

7977
EXPECT_EQ(hosts->front()->address(), host->address());
8078
}
79+
80+
verify_unique_replica_count(keyspace_name, replication_factor);
81+
}
82+
83+
void verify_unique_replica_count(const String& keyspace_name = "ks",
84+
size_t replication_factor = 3) {
85+
// Verify a unique set of replicas per token
86+
const TokenReplicasVec& token_replicas =
87+
static_cast<TokenMapImpl<Partitioner>*>(token_map.get())->token_replicas(keyspace_name);
88+
89+
ASSERT_EQ(tokens.size(), token_replicas.size());
90+
91+
for (typename TokenReplicasVec::const_iterator it = token_replicas.begin(),
92+
end = token_replicas.end();
93+
it != end; ++it) {
94+
HostSet replicas(it->second->begin(), it->second->end());
95+
// Using assert here because they're can be many, many tokens
96+
ASSERT_EQ(replication_factor, replicas.size());
97+
}
8198
}
8299
};
83100

@@ -117,6 +134,7 @@ TEST(TokenMapUnitTest, Murmur3LargeNumberOfVnodes) {
117134
size_t num_hosts = 4;
118135
size_t num_vnodes = 256;
119136
size_t replication_factor = 3;
137+
size_t total_replicas = std::min(num_hosts, replication_factor) * num_dcs;
120138

121139
ReplicationMap replication;
122140
MT19937_64 rng;
@@ -144,7 +162,6 @@ TEST(TokenMapUnitTest, Murmur3LargeNumberOfVnodes) {
144162
Murmur3Partitioner::name().to_string(), rack, dc));
145163

146164
test_murmur3.add_host(host);
147-
token_map->add_host(host);
148165
}
149166
}
150167
}
@@ -159,7 +176,7 @@ TEST(TokenMapUnitTest, Murmur3LargeNumberOfVnodes) {
159176
const String& key = keys[i];
160177

161178
const CopyOnWriteHostVec& hosts = token_map->get_replicas("ks1", key);
162-
ASSERT_TRUE(hosts && hosts->size() == replication_factor * num_dcs);
179+
ASSERT_TRUE(hosts && hosts->size() == total_replicas);
163180

164181
typedef Map<String, Set<String> > DcRackMap;
165182

@@ -181,6 +198,8 @@ TEST(TokenMapUnitTest, Murmur3LargeNumberOfVnodes) {
181198

182199
EXPECT_EQ((*hosts)[0]->address(), host->address());
183200
}
201+
202+
test_murmur3.verify_unique_replica_count("ks1", total_replicas);
184203
}
185204

186205
TEST(TokenMapUnitTest, Random) {
@@ -223,7 +242,7 @@ TEST(TokenMapUnitTest, RemoveHost) {
223242
test_remove_host.add_host(create_host("1.0.0.3", single_token(CASS_INT64_MAX / 2)));
224243

225244
test_remove_host.build("ks", 2);
226-
test_remove_host.verify();
245+
test_remove_host.verify("ks", 2);
227246

228247
TokenMap* token_map = test_remove_host.token_map.get();
229248

@@ -275,7 +294,7 @@ TEST(TokenMapUnitTest, UpdateHost) {
275294
test_update_host.add_host(create_host("1.0.0.2", single_token(CASS_INT64_MIN / 4)));
276295

277296
test_update_host.build("ks", 4);
278-
test_update_host.verify();
297+
test_update_host.verify("ks", 2); // Only two hosts, so rf = 2
279298

280299
TokenMap* token_map = test_update_host.token_map.get();
281300

@@ -317,6 +336,8 @@ TEST(TokenMapUnitTest, UpdateHost) {
317336
EXPECT_EQ((*replicas)[2]->address(), Address("1.0.0.3", 9042));
318337
EXPECT_EQ((*replicas)[3]->address(), Address("1.0.0.4", 9042));
319338
}
339+
340+
test_update_host.verify("ks", 4);
320341
}
321342

322343
/**
@@ -436,7 +457,7 @@ TEST(TokenMapUnitTest, DropKeyspace) {
436457
test_drop_keyspace.add_host(create_host("1.0.0.3", single_token(CASS_INT64_MAX / 2)));
437458

438459
test_drop_keyspace.build("ks", 2);
439-
test_drop_keyspace.verify();
460+
test_drop_keyspace.verify("ks", 2);
440461

441462
TokenMap* token_map = test_drop_keyspace.token_map.get();
442463

@@ -456,3 +477,18 @@ TEST(TokenMapUnitTest, DropKeyspace) {
456477
EXPECT_FALSE(replicas);
457478
}
458479
}
480+
481+
TEST(TokenMapUnitTest, UniqueReplicas) {
482+
TestTokenMap<Murmur3Partitioner> test_murmur3;
483+
484+
const size_t tokens_per_host = 256;
485+
MT19937_64 rng;
486+
487+
test_murmur3.add_host(create_host("1.0.0.1", random_murmur3_tokens(rng, tokens_per_host)));
488+
test_murmur3.add_host(create_host("1.0.0.2", random_murmur3_tokens(rng, tokens_per_host)));
489+
test_murmur3.add_host(create_host("1.0.0.3", random_murmur3_tokens(rng, tokens_per_host)));
490+
test_murmur3.add_host(create_host("1.0.0.4", random_murmur3_tokens(rng, tokens_per_host)));
491+
492+
test_murmur3.build();
493+
test_murmur3.verify();
494+
}

0 commit comments

Comments
 (0)