Skip to content

Commit 3530672

Browse files
fix: we should isolate the client used in StreamWriter and the client used in ConnectionWorker (googleapis#1933)
* . * . * . * . * . * . * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * . * . * . * . * . * . * . * . * . * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 53820c6 commit 3530672

File tree

8 files changed

+113
-79
lines changed

8 files changed

+113
-79
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,13 @@ implementation 'com.google.cloud:google-cloud-bigquerystorage'
5656
If you are using Gradle without BOM, add this to your dependencies:
5757

5858
```Groovy
59-
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.1'
59+
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.2'
6060
```
6161

6262
If you are using SBT, add this to your dependencies:
6363

6464
```Scala
65-
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.1"
65+
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.2"
6666
```
6767

6868
## Authentication

google-cloud-bigquerystorage/clirr-ignored-differences.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,16 @@
8686
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool</className>
8787
<method>ConnectionWorkerPool(long, long, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteClient, boolean)</method>
8888
</difference>
89+
<difference>
90+
<differenceType>7005</differenceType>
91+
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorker</className>
92+
<method>ConnectionWorker(java.lang.String, com.google.cloud.bigquery.storage.v1.ProtoSchema, long, long, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteClient, boolean)</method>
93+
<to>ConnectionWorker(java.lang.String, com.google.cloud.bigquery.storage.v1.ProtoSchema, long, long, java.time.Duration, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings)</to>
94+
</difference>
95+
<difference>
96+
<differenceType>7005</differenceType>
97+
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool</className>
98+
<method>ConnectionWorkerPool(long, long, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteClient, boolean)</method>
99+
<to>ConnectionWorkerPool(long, long, java.time.Duration, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings)</to>
100+
</difference>
89101
</differences>

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -171,11 +171,6 @@ public class ConnectionWorker implements AutoCloseable {
171171
*/
172172
private BigQueryWriteClient client;
173173

174-
/*
175-
* If true, the client above is created by this writer and should be closed.
176-
*/
177-
private boolean ownsBigQueryWriteClient = false;
178-
179174
/*
180175
* Wraps the underlying bi-directional stream connection with server.
181176
*/
@@ -209,8 +204,7 @@ public ConnectionWorker(
209204
Duration maxRetryDuration,
210205
FlowController.LimitExceededBehavior limitExceededBehavior,
211206
String traceId,
212-
BigQueryWriteClient client,
213-
boolean ownsBigQueryWriteClient)
207+
BigQueryWriteSettings clientSettings)
214208
throws IOException {
215209
this.lock = new ReentrantLock();
216210
this.hasMessageInWaitingQueue = lock.newCondition();
@@ -229,8 +223,8 @@ public ConnectionWorker(
229223
this.traceId = traceId;
230224
this.waitingRequestQueue = new LinkedList<AppendRequestAndResponse>();
231225
this.inflightRequestQueue = new LinkedList<AppendRequestAndResponse>();
232-
this.client = client;
233-
this.ownsBigQueryWriteClient = ownsBigQueryWriteClient;
226+
// Always recreate a client for connection worker.
227+
this.client = BigQueryWriteClient.create(clientSettings);
234228

235229
this.appendThread =
236230
new Thread(
@@ -382,13 +376,11 @@ public void close() {
382376
log.warning(
383377
"Append handler join is interrupted. Stream: " + streamName + " Error: " + e.toString());
384378
}
385-
if (this.ownsBigQueryWriteClient) {
386-
this.client.close();
387-
try {
388-
// Backend request has a 2 minute timeout, so wait a little longer than that.
389-
this.client.awaitTermination(150, TimeUnit.SECONDS);
390-
} catch (InterruptedException ignored) {
391-
}
379+
this.client.close();
380+
try {
381+
// Backend request has a 2 minute timeout, so wait a little longer than that.
382+
this.client.awaitTermination(150, TimeUnit.SECONDS);
383+
} catch (InterruptedException ignored) {
392384
}
393385
}
394386

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -149,12 +149,7 @@ public class ConnectionWorkerPool {
149149
/*
150150
* A client used to interact with BigQuery.
151151
*/
152-
private BigQueryWriteClient client;
153-
154-
/*
155-
* If true, the client above is created by this writer and should be closed.
156-
*/
157-
private boolean ownsBigQueryWriteClient = false;
152+
private BigQueryWriteSettings clientSettings;
158153

159154
/**
160155
* The current maximum connection count. This value is gradually increased till the user defined
@@ -204,15 +199,13 @@ public ConnectionWorkerPool(
204199
java.time.Duration maxRetryDuration,
205200
FlowController.LimitExceededBehavior limitExceededBehavior,
206201
String traceId,
207-
BigQueryWriteClient client,
208-
boolean ownsBigQueryWriteClient) {
202+
BigQueryWriteSettings clientSettings) {
209203
this.maxInflightRequests = maxInflightRequests;
210204
this.maxInflightBytes = maxInflightBytes;
211205
this.maxRetryDuration = maxRetryDuration;
212206
this.limitExceededBehavior = limitExceededBehavior;
213207
this.traceId = traceId;
214-
this.client = client;
215-
this.ownsBigQueryWriteClient = ownsBigQueryWriteClient;
208+
this.clientSettings = clientSettings;
216209
this.currentMaxConnectionCount = settings.minConnectionsPerRegion();
217210
}
218211

@@ -308,6 +301,7 @@ private ConnectionWorker createOrReuseConnectionWorker(
308301
}
309302
return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getProtoSchema());
310303
} else {
304+
311305
// Stick to the original connection if all the connections are overwhelmed.
312306
if (existingConnectionWorker != null) {
313307
return existingConnectionWorker;
@@ -355,8 +349,6 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
355349
// Though atomic integer is super lightweight, add extra if check in case adding future logic.
356350
testValueCreateConnectionCount.getAndIncrement();
357351
}
358-
// currently we use different header for the client in each connection worker to be different
359-
// as the backend require the header to have the same write_stream field as request body.
360352
ConnectionWorker connectionWorker =
361353
new ConnectionWorker(
362354
streamName,
@@ -366,8 +358,7 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
366358
maxRetryDuration,
367359
limitExceededBehavior,
368360
traceId,
369-
client,
370-
ownsBigQueryWriteClient);
361+
clientSettings);
371362
connectionWorkerPool.add(connectionWorker);
372363
log.info(
373364
String.format(
@@ -402,8 +393,11 @@ public void close(StreamWriter streamWriter) {
402393
log.info(
403394
String.format(
404395
"During closing of writeStream for %s with writer id %s, we decided to close %s "
405-
+ "connections",
406-
streamWriter.getStreamName(), streamWriter.getWriterId(), connectionToRemove.size()));
396+
+ "connections, pool size after removal $s",
397+
streamWriter.getStreamName(),
398+
streamWriter.getWriterId(),
399+
connectionToRemove.size(),
400+
connectionToWriteStream.size() - 1));
407401
connectionToWriteStream.keySet().removeAll(connectionToRemove);
408402
} finally {
409403
lock.unlock();
@@ -447,16 +441,12 @@ String getTraceId() {
447441
return traceId;
448442
}
449443

450-
boolean ownsBigQueryWriteClient() {
451-
return ownsBigQueryWriteClient;
452-
}
453-
454444
FlowController.LimitExceededBehavior limitExceededBehavior() {
455445
return limitExceededBehavior;
456446
}
457447

458-
BigQueryWriteClient bigQueryWriteClient() {
459-
return client;
448+
BigQueryWriteSettings bigQueryWriteSettings() {
449+
return clientSettings;
460450
}
461451

462452
static String toTableName(String streamName) {

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java

Lines changed: 17 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ public static SingleConnectionOrConnectionPool ofConnectionPool(
184184
private StreamWriter(Builder builder) throws IOException {
185185
this.streamName = builder.streamName;
186186
this.writerSchema = builder.writerSchema;
187-
boolean ownsBigQueryWriteClient = builder.client == null;
187+
BigQueryWriteSettings clientSettings = getBigQueryWriteSettings(builder);
188188
if (!builder.enableConnectionPool) {
189189
this.location = builder.location;
190190
this.singleConnectionOrConnectionPool =
@@ -197,8 +197,7 @@ private StreamWriter(Builder builder) throws IOException {
197197
builder.maxRetryDuration,
198198
builder.limitExceededBehavior,
199199
builder.traceId,
200-
getBigQueryWriteClient(builder),
201-
ownsBigQueryWriteClient));
200+
clientSettings));
202201
} else {
203202
if (!isDefaultStream(streamName)) {
204203
log.warning(
@@ -208,7 +207,9 @@ private StreamWriter(Builder builder) throws IOException {
208207
"Trying to enable connection pool in non-default stream.");
209208
}
210209

211-
BigQueryWriteClient client = getBigQueryWriteClient(builder);
210+
// We need a client to perform some getWriteStream calls.
211+
BigQueryWriteClient client =
212+
builder.client != null ? builder.client : new BigQueryWriteClient(clientSettings);
212213
String location = builder.location;
213214
if (location == null || location.isEmpty()) {
214215
// Location is not passed in, try to fetch from RPC
@@ -256,14 +257,11 @@ private StreamWriter(Builder builder) throws IOException {
256257
builder.maxRetryDuration,
257258
builder.limitExceededBehavior,
258259
builder.traceId,
259-
client,
260-
ownsBigQueryWriteClient);
260+
client.getSettings());
261261
}));
262262
validateFetchedConnectonPool(builder);
263-
// Shut down the passed in client. Internally we will create another client inside connection
264-
// pool for every new connection worker.
265-
if (client != singleConnectionOrConnectionPool.connectionWorkerPool().bigQueryWriteClient()
266-
&& ownsBigQueryWriteClient) {
263+
// If the client is not from outside, then shutdown the client we created.
264+
if (builder.client == null) {
267265
client.shutdown();
268266
try {
269267
client.awaitTermination(150, TimeUnit.SECONDS);
@@ -293,19 +291,16 @@ static boolean isDefaultStream(String streamName) {
293291
return streamMatcher.find();
294292
}
295293

296-
private BigQueryWriteClient getBigQueryWriteClient(Builder builder) throws IOException {
297-
if (builder.client == null) {
298-
BigQueryWriteSettings stubSettings =
299-
BigQueryWriteSettings.newBuilder()
300-
.setCredentialsProvider(builder.credentialsProvider)
301-
.setTransportChannelProvider(builder.channelProvider)
302-
.setBackgroundExecutorProvider(builder.executorProvider)
303-
.setEndpoint(builder.endpoint)
304-
.build();
305-
testOnlyClientCreatedTimes++;
306-
return BigQueryWriteClient.create(stubSettings);
294+
private BigQueryWriteSettings getBigQueryWriteSettings(Builder builder) throws IOException {
295+
if (builder.client != null) {
296+
return builder.client.getSettings();
307297
} else {
308-
return builder.client;
298+
return BigQueryWriteSettings.newBuilder()
299+
.setCredentialsProvider(builder.credentialsProvider)
300+
.setTransportChannelProvider(builder.channelProvider)
301+
.setBackgroundExecutorProvider(builder.executorProvider)
302+
.setEndpoint(builder.endpoint)
303+
.build();
309304
}
310305
}
311306

@@ -316,10 +311,6 @@ private void validateFetchedConnectonPool(StreamWriter.Builder builder) {
316311
this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId(),
317312
builder.traceId)) {
318313
paramsValidatedFailed = "Trace id";
319-
} else if (!Objects.equals(
320-
this.singleConnectionOrConnectionPool.connectionWorkerPool().ownsBigQueryWriteClient(),
321-
builder.client == null)) {
322-
paramsValidatedFailed = "Whether using passed in clients";
323314
} else if (!Objects.equals(
324315
this.singleConnectionOrConnectionPool.connectionWorkerPool().limitExceededBehavior(),
325316
builder.limitExceededBehavior)) {

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java

Lines changed: 60 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public class ConnectionWorkerPoolTest {
4848
private FakeBigQueryWrite testBigQueryWrite;
4949
private FakeScheduledExecutorService fakeExecutor;
5050
private static MockServiceHelper serviceHelper;
51-
private BigQueryWriteClient client;
51+
private BigQueryWriteSettings clientSettings;
5252

5353
private static final String TEST_TRACE_ID = "home:job1";
5454
private static final String TEST_STREAM_1 = "projects/p1/datasets/d1/tables/t1/streams/_default";
@@ -63,12 +63,11 @@ public void setUp() throws Exception {
6363
serviceHelper.start();
6464
fakeExecutor = new FakeScheduledExecutorService();
6565
testBigQueryWrite.setExecutor(fakeExecutor);
66-
client =
67-
BigQueryWriteClient.create(
68-
BigQueryWriteSettings.newBuilder()
69-
.setCredentialsProvider(NoCredentialsProvider.create())
70-
.setTransportChannelProvider(serviceHelper.createChannelProvider())
71-
.build());
66+
clientSettings =
67+
BigQueryWriteSettings.newBuilder()
68+
.setCredentialsProvider(NoCredentialsProvider.create())
69+
.setTransportChannelProvider(serviceHelper.createChannelProvider())
70+
.build();
7271
ConnectionWorker.Load.setOverwhelmedCountsThreshold(0.5);
7372
ConnectionWorker.Load.setOverwhelmedBytesThreshold(0.6);
7473
}
@@ -325,6 +324,56 @@ public void testToTableName() {
325324
IllegalArgumentException.class, () -> ConnectionWorkerPool.toTableName("projects/p/"));
326325
}
327326

327+
@Test
328+
public void testCloseExternalClient()
329+
throws IOException, InterruptedException, ExecutionException {
330+
// Try append 100 requests.
331+
long appendCount = 100L;
332+
// testBigQueryWrite is used to
333+
for (long i = 0; i < appendCount * 2; i++) {
334+
testBigQueryWrite.addResponse(createAppendResponse(i));
335+
}
336+
testBigQueryWrite.addResponse(WriteStream.newBuilder().setLocation("us").build());
337+
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
338+
BigQueryWriteClient externalClient =
339+
BigQueryWriteClient.create(
340+
BigQueryWriteSettings.newBuilder()
341+
.setCredentialsProvider(NoCredentialsProvider.create())
342+
.setTransportChannelProvider(serviceHelper.createChannelProvider())
343+
.build());
344+
// Create some stream writers.
345+
List<StreamWriter> streamWriterList = new ArrayList<>();
346+
for (int i = 0; i < 4; i++) {
347+
StreamWriter sw =
348+
StreamWriter.newBuilder(
349+
String.format("projects/p1/datasets/d1/tables/t%s/streams/_default", i),
350+
externalClient)
351+
.setWriterSchema(createProtoSchema())
352+
.setTraceId(TEST_TRACE_ID)
353+
.setEnableConnectionPool(true)
354+
.build();
355+
streamWriterList.add(sw);
356+
}
357+
358+
for (long i = 0; i < appendCount; i++) {
359+
StreamWriter sw = streamWriterList.get((int) (i % streamWriterList.size()));
360+
// Round robinly insert requests to different tables.
361+
futures.add(sw.append(createProtoRows(new String[] {String.valueOf(i)}), i));
362+
}
363+
externalClient.close();
364+
externalClient.awaitTermination(1, TimeUnit.MINUTES);
365+
// Send more requests, the connections should still work.
366+
for (long i = appendCount; i < appendCount * 2; i++) {
367+
StreamWriter sw = streamWriterList.get((int) (i % streamWriterList.size()));
368+
futures.add(sw.append(createProtoRows(new String[] {String.valueOf(i)}), i));
369+
}
370+
for (int i = 0; i < appendCount * 2; i++) {
371+
AppendRowsResponse response = futures.get(i).get();
372+
assertThat(response.getAppendResult().getOffset().getValue()).isEqualTo(i);
373+
}
374+
assertThat(testBigQueryWrite.getAppendRequests().size()).isEqualTo(appendCount * 2);
375+
}
376+
328377
private AppendRowsResponse createAppendResponse(long offset) {
329378
return AppendRowsResponse.newBuilder()
330379
.setAppendResult(
@@ -333,9 +382,11 @@ private AppendRowsResponse createAppendResponse(long offset) {
333382
}
334383

335384
private StreamWriter getTestStreamWriter(String streamName) throws IOException {
336-
return StreamWriter.newBuilder(streamName, client)
385+
return StreamWriter.newBuilder(streamName)
337386
.setWriterSchema(createProtoSchema())
338387
.setTraceId(TEST_TRACE_ID)
388+
.setCredentialsProvider(NoCredentialsProvider.create())
389+
.setChannelProvider(serviceHelper.createChannelProvider())
339390
.build();
340391
}
341392

@@ -380,7 +431,6 @@ ConnectionWorkerPool createConnectionWorkerPool(
380431
maxRetryDuration,
381432
FlowController.LimitExceededBehavior.Block,
382433
TEST_TRACE_ID,
383-
client,
384-
/*ownsBigQueryWriteClient=*/ false);
434+
clientSettings);
385435
}
386436
}

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -309,8 +309,7 @@ private ConnectionWorker createConnectionWorker(
309309
maxRetryDuration,
310310
FlowController.LimitExceededBehavior.Block,
311311
TEST_TRACE_ID,
312-
client,
313-
/*ownsBigQueryWriteClient=*/ false);
312+
client.getSettings());
314313
}
315314

316315
private ProtoSchema createProtoSchema(String protoName) {

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/BigQueryReadClientTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ public void readRowsNoRetryForResourceExhaustedWithRetryInfo()
287287
throws ExecutionException, InterruptedException {
288288
RetryInfo retryInfo =
289289
RetryInfo.newBuilder()
290-
.setRetryDelay(Duration.newBuilder().setSeconds(123).setNanos(456).build())
290+
.setRetryDelay(Duration.newBuilder().setSeconds(2).setNanos(456).build())
291291
.build();
292292

293293
Metadata metadata = new Metadata();

0 commit comments

Comments
 (0)