Skip to content

Commit 9b612ce

Browse files
committed
Update internal metrics handling
Make sure `afterClosed` is called whenever a pooled connection is closed. In addition, handle reauth pipelining error.
1 parent b70dc41 commit 9b612ce

File tree

1 file changed

+53
-32
lines changed

1 file changed

+53
-32
lines changed

bolt-api-pooled/src/main/java/org/neo4j/driver/internal/bolt/pooledimpl/PooledBoltConnectionProvider.java

Lines changed: 53 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.neo4j.driver.internal.bolt.api.BoltProtocolVersion;
4545
import org.neo4j.driver.internal.bolt.api.BoltServerAddress;
4646
import org.neo4j.driver.internal.bolt.api.DatabaseName;
47+
import org.neo4j.driver.internal.bolt.api.ListenerEvent;
4748
import org.neo4j.driver.internal.bolt.api.LoggingProvider;
4849
import org.neo4j.driver.internal.bolt.api.MetricsListener;
4950
import org.neo4j.driver.internal.bolt.api.NotificationConfig;
@@ -151,16 +152,16 @@ public CompletionStage<BoltConnection> connect(
151152
return;
152153
}
153154

154-
var beforeAcquiringOrCreatingEvent = metricsListener.createListenerEvent();
155-
metricsListener.beforeAcquiringOrCreating(poolId, beforeAcquiringOrCreatingEvent);
155+
var acquireEvent = metricsListener.createListenerEvent();
156+
metricsListener.beforeAcquiringOrCreating(poolId, acquireEvent);
156157
acquisitionFuture.whenComplete((connection, throwable) -> {
157158
throwable = FutureUtil.completionExceptionCause(throwable);
158159
if (throwable != null) {
159160
if (throwable instanceof TimeoutException) {
160161
metricsListener.afterTimedOutToAcquireOrCreate(poolId);
161162
}
162163
} else {
163-
metricsListener.afterAcquiredOrCreated(poolId, beforeAcquiringOrCreatingEvent);
164+
metricsListener.afterAcquiredOrCreated(poolId, acquireEvent);
164165
}
165166
metricsListener.afterAcquiringOrCreating(poolId);
166167
});
@@ -305,24 +306,28 @@ private void connect(
305306
purge(entry);
306307
metricsListener.afterConnectionReleased(poolId, inUseEvent);
307308
});
308-
reauthStage(entryWithMetadata, authMap).whenComplete((ignored2, throwable2) -> {
309-
if (!acquisitionFuture.complete(pooledConnection)) {
310-
// acquisition timed out
311-
CompletableFuture<PooledBoltConnection> pendingAcquisition;
312-
synchronized (this) {
313-
pendingAcquisition = pendingAcquisitions.poll();
314-
if (pendingAcquisition == null) {
315-
// nothing pending, just make the entry available
316-
entry.available = true;
317-
}
318-
}
319-
if (pendingAcquisition != null) {
320-
if (pendingAcquisition.complete(pooledConnection)) {
321-
metricsListener.afterConnectionCreated(poolId, inUseEvent);
322-
}
323-
}
309+
reauthStage(entryWithMetadata, authMap).whenComplete((ignored2, reauthThrowable) -> {
310+
if (reauthThrowable != null) {
311+
// reauth pipelining failed, purge the connection and try again
312+
purge(entry);
313+
connect(
314+
acquisitionFuture,
315+
securityPlan,
316+
databaseName,
317+
authMap,
318+
authMapStageSupplier,
319+
mode,
320+
bookmarks,
321+
impersonatedUser,
322+
minVersion,
323+
notificationConfig);
324324
} else {
325-
metricsListener.afterConnectionCreated(poolId, inUseEvent);
325+
if (!acquisitionFuture.complete(pooledConnection)) {
326+
// acquisition timed out
327+
findAndCompletePendingAcquisition(entry, pooledConnection, inUseEvent);
328+
} else {
329+
metricsListener.afterConnectionCreated(poolId, inUseEvent);
330+
}
326331
}
327332
});
328333
}
@@ -394,6 +399,25 @@ private void connect(
394399
}
395400
}
396401

402+
private void findAndCompletePendingAcquisition(
403+
ConnectionEntry entry, PooledBoltConnection pooledConnection, ListenerEvent<?> inUseEvent) {
404+
CompletableFuture<PooledBoltConnection> pendingAcquisition;
405+
synchronized (this) {
406+
pendingAcquisition = pendingAcquisitions.poll();
407+
if (pendingAcquisition == null) {
408+
// nothing pending, just make the entry available
409+
entry.available = true;
410+
}
411+
}
412+
if (pendingAcquisition != null) {
413+
if (pendingAcquisition.complete(pooledConnection)) {
414+
metricsListener.afterConnectionCreated(poolId, inUseEvent);
415+
} else {
416+
findAndCompletePendingAcquisition(entry, pooledConnection, inUseEvent);
417+
}
418+
}
419+
}
420+
397421
private synchronized ConnectionEntryWithMetadata acquireExistingEntry(
398422
Map<String, Value> authMap, BoltProtocolVersion minVersion) {
399423
ConnectionEntryWithMetadata connectionEntryWithMetadata = null;
@@ -411,6 +435,7 @@ private synchronized ConnectionEntryWithMetadata acquireExistingEntry(
411435
if (connection.state() != BoltConnectionState.OPEN) {
412436
connection.close();
413437
iterator.remove();
438+
metricsListener.afterClosed(poolId);
414439
continue;
415440
}
416441

@@ -449,6 +474,7 @@ private synchronized ConnectionEntryWithMetadata acquireExistingEntry(
449474
throwable.getClass().getCanonicalName());
450475
}
451476
});
477+
metricsListener.afterClosed(poolId);
452478
continue;
453479
}
454480
}
@@ -469,15 +495,7 @@ private CompletionStage<Void> reauthStage(
469495
.connection
470496
.logoff()
471497
.thenCompose(conn -> conn.logon(authMap))
472-
.handle((ignored, throwable) -> {
473-
if (throwable != null) {
474-
connectionEntryWithMetadata.connectionEntry.connection.close();
475-
synchronized (this) {
476-
pooledConnectionEntries.remove(connectionEntryWithMetadata.connectionEntry);
477-
}
478-
}
479-
return null;
480-
});
498+
.thenApply(ignored -> null);
481499
} else {
482500
stage = CompletableFuture.completedStage(null);
483501
}
@@ -562,9 +580,12 @@ public CompletionStage<Void> close() {
562580
var iterator = pooledConnectionEntries.iterator();
563581
while (iterator.hasNext()) {
564582
var entry = iterator.next();
565-
if (entry.connection != null && entry.connection.state() == BoltConnectionState.OPEN) {
583+
if (entry.connection != null) {
566584
this.closeStage = this.closeStage.thenCompose(
567-
ignored -> entry.connection.close().exceptionally(throwable -> null));
585+
ignored -> entry.connection.close().handle((ignored1, ignored2) -> {
586+
metricsListener.afterClosed(poolId);
587+
return null;
588+
}));
568589
}
569590
iterator.remove();
570591
}
@@ -627,8 +648,8 @@ private void purge(ConnectionEntry entry) {
627648
synchronized (this) {
628649
pooledConnectionEntries.remove(entry);
629650
}
630-
metricsListener.afterClosed(poolId);
631651
entry.connection.close();
652+
metricsListener.afterClosed(poolId);
632653
log.log(System.Logger.Level.DEBUG, "Connection purged from the pool.");
633654
}
634655

0 commit comments

Comments
 (0)