Skip to content

Commit 1684f6b

Browse files
daschlMichael Nitschinger
authored and
Michael Nitschinger
committed
JCBC-337: Implement a Combined Configuration Provider (Binary + HTTP)
This changeset provides a combined http and binary configuration provider. It is designed to handle bootstrap and monitoring completely autonomously, further decoupling it from the rest of the codebase. Change-Id: I13b5029b98c9327cac6927ea35efe529edbb2f84 Reviewed-on: http://review.couchbase.org/32589 Reviewed-by: Michael Nitschinger <[email protected]> Tested-by: Michael Nitschinger <[email protected]>
1 parent 25a4474 commit 1684f6b

19 files changed

+1252
-241
lines changed

src/main/java/com/couchbase/client/CouchbaseClient.java

+1-12
Original file line numberDiff line numberDiff line change
@@ -263,24 +263,13 @@ public CouchbaseClient(CouchbaseConnectionFactory cf)
263263
executorService = cbConnFactory.getListenerExecutorService();
264264

265265
getLogger().info(MODE_ERROR);
266-
cf.getConfigurationProvider().subscribe(cf.getBucketName(), this);
266+
cf.getConfigurationProvider().subscribe(this);
267267
}
268268

269269
@Override
270270
public void reconfigure(Bucket bucket) {
271271
reconfiguring = true;
272-
if (bucket.isNotUpdating()) {
273-
getLogger().info("Bucket configuration is disconnected from cluster "
274-
+ "configuration updates, attempting to reconnect.");
275-
CouchbaseConnectionFactory cbcf = (CouchbaseConnectionFactory)connFactory;
276-
cbcf.requestConfigReconnect(cbcf.getBucketName(), this);
277-
cbcf.checkConfigUpdate();
278-
}
279272
try {
280-
cbConnFactory.getConfigurationProvider().updateBucket(
281-
cbConnFactory.getBucketName(), bucket);
282-
cbConnFactory.updateStoredBaseList(bucket.getConfig());
283-
284273
if(vconn != null) {
285274
vconn.reconfigure(bucket);
286275
}

src/main/java/com/couchbase/client/CouchbaseConnection.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ public void reconfigure(Bucket bucket) {
163163
// schedule shutdown for the oddNodes
164164
for(MemcachedNode shutDownNode : oddNodes) {
165165
getLogger().info("Scheduling Node "
166-
+ shutDownNode.getSocketAddress() + "for shutdown.");
166+
+ shutDownNode.getSocketAddress() + " for shutdown.");
167167
}
168168
nodesToShutdown.addAll(oddNodes);
169169
} catch (IOException e) {
@@ -313,7 +313,7 @@ public void run() {
313313
protected void handleRetryInformation(byte[] retryMessage) {
314314
String message = new String(retryMessage).trim();
315315
if (message.startsWith("{")) {
316-
cf.getConfigurationProvider().updateBucket(
316+
cf.getConfigurationProvider().setConfig(
317317
replaceConfigWildcards(message)
318318
);
319319
}
@@ -328,7 +328,7 @@ protected void handleRetryInformation(byte[] retryMessage) {
328328
* @param original the raw new config string.
329329
* @return the potentially changed config string.
330330
*/
331-
private String replaceConfigWildcards(String original) {
331+
public String replaceConfigWildcards(String original) {
332332
if (original.contains("$HOST")) {
333333
ArrayList<MemcachedNode> nodes =
334334
new ArrayList<MemcachedNode>(getLocator().getAll());

src/main/java/com/couchbase/client/CouchbaseConnectionFactory.java

+6-133
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
package com.couchbase.client;
2424

2525
import com.couchbase.client.vbucket.ConfigurationException;
26-
import com.couchbase.client.vbucket.ConfigurationProvider;
26+
import com.couchbase.client.vbucket.provider.BucketConfigurationProvider;
27+
import com.couchbase.client.vbucket.provider.ConfigurationProvider;
2728
import com.couchbase.client.vbucket.ConfigurationProviderHTTP;
2829
import com.couchbase.client.vbucket.CouchbaseNodeOrder;
2930
import com.couchbase.client.vbucket.Reconfigurable;
@@ -204,8 +205,6 @@ public CouchbaseConnectionFactory(final List<URI> baseList,
204205
}
205206

206207
private void initialize(List<URI> baseList, String bucket, String password) {
207-
potentiallyRandomizeNodeList(baseList);
208-
209208
storedBaseList = new ArrayList<URI>();
210209
for (URI bu : baseList) {
211210
if (!bu.isAbsolute()) {
@@ -226,7 +225,7 @@ private void initialize(List<URI> baseList, String bucket, String password) {
226225
this.bucket = bucket;
227226
pass = password;
228227
configurationProvider =
229-
new ConfigurationProviderHTTP(baseList, bucket, password);
228+
new BucketConfigurationProvider(baseList, bucket, password, this);
230229
}
231230

232231
@Override
@@ -310,25 +309,15 @@ public CouchbaseNodeOrder getStreamingNodeOrder() {
310309
}
311310

312311
public Config getVBucketConfig() {
313-
Bucket config = configurationProvider.getBucketConfiguration(bucket);
314-
if(config == null) {
315-
throw new ConfigurationException("Could not fetch valid configuration "
316-
+ "from provided nodes. Stopping.");
317-
} else if (config.isNotUpdating()) {
318-
LOGGER.warning("Noticed bucket configuration to be disconnected, "
319-
+ "will attempt to reconnect");
320-
setConfigurationProvider(new ConfigurationProviderHTTP(storedBaseList,
321-
bucket, pass));
322-
}
323-
return configurationProvider.getBucketConfiguration(bucket).getConfig();
312+
return configurationProvider.getConfig().getConfig();
324313
}
325314

326315
public synchronized ConfigurationProvider getConfigurationProvider() {
327316
return configurationProvider;
328317
}
329318

330319
protected void requestConfigReconnect(String bucketName, Reconfigurable rec) {
331-
configurationProvider.markForResubscribe(bucketName, rec);
320+
configurationProvider.signalOutdated();
332321
needsReconnect = true;
333322
}
334323

@@ -367,7 +356,7 @@ void checkConfigUpdate() {
367356
}
368357

369358
if (doingResubscribe.compareAndSet(false, true)) {
370-
resubConfigUpdate();
359+
getConfigurationProvider().signalOutdated();
371360
} else {
372361
LOGGER.log(Level.CONFIG, "Duplicate resubscribe for config updates"
373362
+ " suppressed.");
@@ -379,15 +368,6 @@ void checkConfigUpdate() {
379368
}
380369
}
381370

382-
/**
383-
* Resubscribe for configuration updates.
384-
*/
385-
private synchronized void resubConfigUpdate() {
386-
LOGGER.log(Level.INFO, "Attempting to resubscribe for cluster config"
387-
+ " updates.");
388-
resubExec.execute(new Resubscriber());
389-
}
390-
391371
/**
392372
* Checks if there have been more requests than allowed through
393373
* maxConfigCheck in a 10 second period.
@@ -466,51 +446,6 @@ int getMaxConfigCheck() {
466446
return maxConfigCheck;
467447
}
468448

469-
private class Resubscriber implements Runnable {
470-
471-
public void run() {
472-
String threadNameBase = "Couchbase/Resubscriber (Status: ";
473-
Thread.currentThread().setName(threadNameBase + "running)");
474-
LOGGER.log(Level.CONFIG, "Resubscribing for {0} using base list {1}",
475-
new Object[]{bucket, storedBaseList});
476-
477-
long reconnectAttempt = 0;
478-
long backoffTime = 1000;
479-
long maxWaitTime = 10000;
480-
do {
481-
try {
482-
long waitTime = (reconnectAttempt++)*backoffTime;
483-
if(reconnectAttempt >= 10) {
484-
waitTime = maxWaitTime;
485-
}
486-
LOGGER.log(Level.INFO, "Reconnect attempt {0}, waiting {1}ms",
487-
new Object[]{reconnectAttempt, waitTime});
488-
Thread.sleep(waitTime);
489-
490-
ConfigurationProvider oldConfigProvider = getConfigurationProvider();
491-
Reconfigurable oldRec = oldConfigProvider.getReconfigurable();
492-
493-
ConfigurationProvider newConfigProvider =
494-
new ConfigurationProviderHTTP(storedBaseList, bucket, pass);
495-
newConfigProvider.subscribe(bucket, oldRec);
496-
497-
setConfigurationProvider(newConfigProvider);
498-
oldConfigProvider.shutdown();
499-
500-
if (!doingResubscribe.compareAndSet(true, false)) {
501-
LOGGER.log(Level.WARNING,
502-
"Could not reset from doing a resubscribe.");
503-
}
504-
} catch (Exception ex) {
505-
LOGGER.log(Level.WARNING,
506-
"Resubscribe attempt failed: ", ex);
507-
}
508-
} while(doingResubscribe.get());
509-
510-
Thread.currentThread().setName(threadNameBase + "complete)");
511-
}
512-
}
513-
514449
/**
515450
* Returns a ClusterManager and initializes one if it does not exist.
516451
* @return Returns an instance of a ClusterManager.
@@ -522,35 +457,6 @@ public ClusterManager getClusterManager() {
522457
return clusterManager;
523458
}
524459

525-
/**
526-
* Updates the stored base list with a new one based on the config.
527-
*
528-
* @param config
529-
*/
530-
public void updateStoredBaseList(Config config) {
531-
List<String> bucketServers = config.getRestEndpoints();
532-
if (bucketServers.size() > 0) {
533-
List<URI> newList = new ArrayList<URI>();
534-
for (String bucketServer : bucketServers) {
535-
try {
536-
newList.add(new URI(bucketServer));
537-
} catch(URISyntaxException ex) {
538-
getLogger().warn("Could not add node to updated bucket list because "
539-
+ "of a parsing exception.");
540-
getLogger().debug("Could not parse list because: " + ex);
541-
}
542-
}
543-
544-
if (nodeListsAreDifferent(storedBaseList, newList)) {
545-
getLogger().info("Replacing current streaming node list "
546-
+ storedBaseList + " with " + newList);
547-
potentiallyRandomizeNodeList(newList);
548-
storedBaseList = newList;
549-
getConfigurationProvider().updateBaseListFromConfig(newList);
550-
}
551-
}
552-
}
553-
554460
/**
555461
* Returns the current base list.
556462
*
@@ -560,39 +466,6 @@ List<URI> getStoredBaseList() {
560466
return storedBaseList;
561467
}
562468

563-
/**
564-
* Randomizes the entries of the node list if needed.
565-
*
566-
* @param list the list to potentially randomize.
567-
*/
568-
private void potentiallyRandomizeNodeList(List<URI> list) {
569-
if (getStreamingNodeOrder().equals(CouchbaseNodeOrder.ORDERED)) {
570-
return;
571-
}
572-
573-
Collections.shuffle(list);
574-
}
575-
576-
/**
577-
* Check if two given node lists are different.
578-
*
579-
* @param left one node list
580-
* @param right the other node list
581-
* @return true if they are different, false otherwise.
582-
*/
583-
private boolean nodeListsAreDifferent(List<URI> left, List<URI> right) {
584-
if (left.size() != right.size()) {
585-
return true;
586-
}
587-
588-
for (URI uri : left) {
589-
if (!right.contains(uri)) {
590-
return true;
591-
}
592-
}
593-
return false;
594-
}
595-
596469
@Override
597470
public String toString() {
598471
final StringBuilder sb = new StringBuilder("CouchbaseConnectionFactory{");

src/main/java/com/couchbase/client/TapConnectionProvider.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
package com.couchbase.client;
2424

25-
import com.couchbase.client.vbucket.ConfigurationProvider;
25+
import com.couchbase.client.vbucket.provider.ConfigurationProvider;
2626
import com.couchbase.client.vbucket.Reconfigurable;
2727
import com.couchbase.client.vbucket.config.Bucket;
2828

@@ -71,7 +71,7 @@ public TapConnectionProvider(CouchbaseConnectionFactory cf) throws IOException {
7171
super(cf, AddrUtil.getAddresses(cf.getVBucketConfig().getServers()));
7272
this.cf=cf;
7373
ConfigurationProvider cp = cf.getConfigurationProvider();
74-
cp.subscribe(cf.getBucketName(), this);
74+
cp.subscribe(this);
7575
}
7676

7777
/**

src/main/java/com/couchbase/client/vbucket/config/Bucket.java

+11
Original file line numberDiff line numberDiff line change
@@ -117,4 +117,15 @@ public final void setIsNotUpdating() {
117117
LOGGER.finest("Marking bucket as not updating,"
118118
+ " disconnected from config stream");
119119
}
120+
121+
@Override
122+
public String toString() {
123+
return "Bucket{" +
124+
"name='" + name + '\'' +
125+
", configuration=" + configuration +
126+
", streamingURI=" + streamingURI +
127+
", isNotUpdating=" + isNotUpdating +
128+
", nodes=" + nodes +
129+
'}';
130+
}
120131
}

src/main/java/com/couchbase/client/vbucket/config/CacheConfig.java

+11
Original file line numberDiff line numberDiff line change
@@ -156,4 +156,15 @@ public ConfigType getConfigType() {
156156
public List<URL> getCouchServers() {
157157
throw new UnsupportedOperationException("No couch port for cache buckets");
158158
}
159+
160+
@Override
161+
public String toString() {
162+
return "CacheConfig{" +
163+
"vbucketsCount=" + vbucketsCount +
164+
", serversCount=" + serversCount +
165+
", servers=" + servers +
166+
", vbuckets=" + vbuckets +
167+
", restEndpoints=" + restEndpoints +
168+
'}';
169+
}
159170
}

src/main/java/com/couchbase/client/vbucket/config/DefaultConfig.java

+16
Original file line numberDiff line numberDiff line change
@@ -238,4 +238,20 @@ public boolean nodeHasActiveVBuckets(InetSocketAddress node) {
238238
public ConfigType getConfigType() {
239239
return ConfigType.COUCHBASE;
240240
}
241+
242+
@Override
243+
public String toString() {
244+
return "DefaultConfig{" +
245+
"hashAlgorithm=" + hashAlgorithm +
246+
", vbucketsCount=" + vbucketsCount +
247+
", mask=" + mask +
248+
", serversCount=" + serversCount +
249+
", replicasCount=" + replicasCount +
250+
", servers=" + servers +
251+
", vbuckets=" + vbuckets +
252+
", couchServers=" + couchServers +
253+
", serversWithVBuckets=" + serversWithVBuckets +
254+
", restEndpoints=" + restEndpoints +
255+
'}';
256+
}
241257
}

src/main/java/com/couchbase/client/vbucket/config/Node.java

+9
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,13 @@ public int hashCode() {
8282
result = 31 * result + ports.hashCode();
8383
return result;
8484
}
85+
86+
@Override
87+
public String toString() {
88+
return "Node{" +
89+
"status=" + status +
90+
", hostname='" + hostname + '\'' +
91+
", ports=" + ports +
92+
'}';
93+
}
8594
}

0 commit comments

Comments
 (0)