Skip to content

Commit 85d074d

Browse files
authored
chore(x-goog-spanner-request-id): propagate reqId into exceptions plus prior code review suggestions (#3922)
* chore(x-goog-spanner-request-id): propagate reqId into exceptions plus prior code review suggestions This change propagates the associated request-id into encountered exceptions and also addresses some code review questions from PR #3900. While here added some updates for AbstractReadContext.java and ResumableStreamIterator.java to set grounds for much smaller PRs in which we shall wholesomely test the changes. Curved out of PR #3898 and PR #3915 * Update tests with session.getRequestIdCreator * More plumbing * Update tests * Deal with the multiplex-session .getOptions null returns in getChannel * Correct array copy
1 parent cf6941a commit 85d074d

14 files changed

+190
-45
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -457,22 +457,30 @@ void initTransaction() {
457457
}
458458

459459
private void initTransactionInternal(BeginTransactionRequest request) {
460+
XGoogSpannerRequestId reqId =
461+
session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
460462
try {
461463
Transaction transaction =
462-
rpc.beginTransaction(request, getTransactionChannelHint(), isRouteToLeader());
464+
rpc.beginTransaction(
465+
request, reqId.withOptions(getTransactionChannelHint()), isRouteToLeader());
463466
if (!transaction.hasReadTimestamp()) {
464467
throw SpannerExceptionFactory.newSpannerException(
465-
ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field");
468+
ErrorCode.INTERNAL,
469+
"Missing expected transaction.read_timestamp metadata field",
470+
reqId);
466471
}
467472
if (transaction.getId().isEmpty()) {
468473
throw SpannerExceptionFactory.newSpannerException(
469-
ErrorCode.INTERNAL, "Missing expected transaction.id metadata field");
474+
ErrorCode.INTERNAL, "Missing expected transaction.id metadata field", reqId);
470475
}
471476
try {
472477
timestamp = Timestamp.fromProto(transaction.getReadTimestamp());
473478
} catch (IllegalArgumentException e) {
474479
throw SpannerExceptionFactory.newSpannerException(
475-
ErrorCode.INTERNAL, "Bad value in transaction.read_timestamp metadata field", e);
480+
ErrorCode.INTERNAL,
481+
"Bad value in transaction.read_timestamp metadata field",
482+
e,
483+
reqId);
476484
}
477485
transactionId = transaction.getId();
478486
span.addAnnotation(
@@ -803,7 +811,8 @@ ResultSet executeQueryInternalWithOptions(
803811
tracer.createStatementAttributes(statement, options),
804812
session.getErrorHandler(),
805813
rpc.getExecuteQueryRetrySettings(),
806-
rpc.getExecuteQueryRetryableCodes()) {
814+
rpc.getExecuteQueryRetryableCodes(),
815+
session.getRequestIdCreator()) {
807816
@Override
808817
CloseableIterator<PartialResultSet> startStream(
809818
@Nullable ByteString resumeToken,
@@ -826,11 +835,12 @@ CloseableIterator<PartialResultSet> startStream(
826835
if (selector != null) {
827836
request.setTransaction(selector);
828837
}
838+
this.ensureNonNullXGoogRequestId();
829839
SpannerRpc.StreamingCall call =
830840
rpc.executeQuery(
831841
request.build(),
832842
stream.consumer(),
833-
getTransactionChannelHint(),
843+
this.xGoogRequestId.withOptions(getTransactionChannelHint()),
834844
isRouteToLeader());
835845
session.markUsed(clock.instant());
836846
stream.setCall(call, request.getTransaction().hasBegin());
@@ -1008,7 +1018,8 @@ ResultSet readInternalWithOptions(
10081018
tracer.createTableAttributes(table, readOptions),
10091019
session.getErrorHandler(),
10101020
rpc.getReadRetrySettings(),
1011-
rpc.getReadRetryableCodes()) {
1021+
rpc.getReadRetryableCodes(),
1022+
session.getRequestIdCreator()) {
10121023
@Override
10131024
CloseableIterator<PartialResultSet> startStream(
10141025
@Nullable ByteString resumeToken,
@@ -1029,11 +1040,13 @@ CloseableIterator<PartialResultSet> startStream(
10291040
builder.setTransaction(selector);
10301041
}
10311042
builder.setRequestOptions(buildRequestOptions(readOptions));
1043+
this.incrementXGoogRequestIdAttempt();
1044+
this.xGoogRequestId.setChannelId(session.getChannel());
10321045
SpannerRpc.StreamingCall call =
10331046
rpc.read(
10341047
builder.build(),
10351048
stream.consumer(),
1036-
getTransactionChannelHint(),
1049+
this.xGoogRequestId.withOptions(getTransactionChannelHint()),
10371050
isRouteToLeader());
10381051
session.markUsed(clock.instant());
10391052
stream.setCall(call, /* withBeginTransaction= */ builder.getTransaction().hasBegin());

google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,9 +250,11 @@ private List<Partition> partitionReadUsingIndex(
250250
}
251251
builder.setPartitionOptions(pbuilder.build());
252252

253+
XGoogSpannerRequestId reqId =
254+
session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
253255
final PartitionReadRequest request = builder.build();
254256
try {
255-
PartitionResponse response = rpc.partitionRead(request, options);
257+
PartitionResponse response = rpc.partitionRead(request, reqId.withOptions(options));
256258
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
257259
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
258260
Partition partition =
@@ -272,6 +274,7 @@ private List<Partition> partitionReadUsingIndex(
272274
return partitionReadUsingIndex(
273275
partitionOptions, table, index, keys, columns, true, option);
274276
}
277+
e.setRequestId(reqId);
275278
throw e;
276279
}
277280
}
@@ -313,9 +316,11 @@ private List<Partition> partitionQuery(
313316
}
314317
builder.setPartitionOptions(pbuilder.build());
315318

319+
XGoogSpannerRequestId reqId =
320+
session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
316321
final PartitionQueryRequest request = builder.build();
317322
try {
318-
PartitionResponse response = rpc.partitionQuery(request, options);
323+
PartitionResponse response = rpc.partitionQuery(request, reqId.withOptions(options));
319324
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
320325
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
321326
Partition partition =
@@ -328,6 +333,7 @@ private List<Partition> partitionQuery(
328333
if (!isFallback && maybeMarkUnimplementedForPartitionedOps(e)) {
329334
return partitionQuery(partitionOptions, statement, true, option);
330335
}
336+
e.setRequestId(reqId);
331337
throw e;
332338
}
333339
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,12 @@ long executeStreamingPartitionedUpdate(
7979
boolean foundStats = false;
8080
long updateCount = 0L;
8181
Stopwatch stopwatch = Stopwatch.createStarted(ticker);
82-
Options options = Options.fromUpdateOptions(updateOptions);
82+
XGoogSpannerRequestId reqId =
83+
session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
84+
UpdateOption[] allOptions = new UpdateOption[updateOptions.length + 1];
85+
System.arraycopy(updateOptions, 0, allOptions, 0, updateOptions.length);
86+
allOptions[allOptions.length - 1] = new Options.RequestIdOption(reqId);
87+
Options options = Options.fromUpdateOptions(allOptions);
8388

8489
try {
8590
ExecuteSqlRequest request = newTransactionRequestFrom(statement, options);
@@ -89,7 +94,8 @@ long executeStreamingPartitionedUpdate(
8994

9095
try {
9196
ServerStream<PartialResultSet> stream =
92-
rpc.executeStreamingPartitionedDml(request, session.getOptions(), remainingTimeout);
97+
rpc.executeStreamingPartitionedDml(
98+
request, reqId.withOptions(session.getOptions()), remainingTimeout);
9399

94100
for (PartialResultSet rs : stream) {
95101
if (rs.getResumeToken() != null && !rs.getResumeToken().isEmpty()) {
@@ -104,6 +110,7 @@ long executeStreamingPartitionedUpdate(
104110
} catch (UnavailableException e) {
105111
LOGGER.log(
106112
Level.FINER, "Retrying PartitionedDml transaction after UnavailableException", e);
113+
reqId.incrementAttempt();
107114
request = resumeOrRestartRequest(resumeToken, statement, request, options);
108115
} catch (InternalException e) {
109116
if (!isRetryableInternalErrorPredicate.apply(e)) {
@@ -112,24 +119,31 @@ long executeStreamingPartitionedUpdate(
112119

113120
LOGGER.log(
114121
Level.FINER, "Retrying PartitionedDml transaction after InternalException - EOS", e);
122+
reqId.incrementAttempt();
115123
request = resumeOrRestartRequest(resumeToken, statement, request, options);
116124
} catch (AbortedException e) {
117125
LOGGER.log(Level.FINER, "Retrying PartitionedDml transaction after AbortedException", e);
118126
resumeToken = ByteString.EMPTY;
119127
foundStats = false;
120128
updateCount = 0L;
121129
request = newTransactionRequestFrom(statement, options);
130+
// Create a new xGoogSpannerRequestId.
131+
reqId = session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
132+
} catch (SpannerException e) {
133+
e.setRequestId(reqId);
134+
throw e;
122135
}
123136
}
124137
if (!foundStats) {
125138
throw SpannerExceptionFactory.newSpannerException(
126139
ErrorCode.INVALID_ARGUMENT,
127-
"Partitioned DML response missing stats possibly due to non-DML statement as input");
140+
"Partitioned DML response missing stats possibly due to non-DML statement as input",
141+
reqId);
128142
}
129143
LOGGER.log(Level.FINER, "Finished PartitionedUpdate statement");
130144
return updateCount;
131145
} catch (Exception e) {
132-
throw SpannerExceptionFactory.newSpannerException(e);
146+
throw SpannerExceptionFactory.newSpannerException(e, reqId);
133147
}
134148
}
135149

@@ -209,11 +223,14 @@ private ByteString initTransaction(final Options options) {
209223
.setExcludeTxnFromChangeStreams(
210224
options.withExcludeTxnFromChangeStreams() == Boolean.TRUE))
211225
.build();
212-
Transaction tx = rpc.beginTransaction(request, session.getOptions(), true);
226+
XGoogSpannerRequestId reqId =
227+
session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
228+
Transaction tx = rpc.beginTransaction(request, reqId.withOptions(session.getOptions()), true);
213229
if (tx.getId().isEmpty()) {
214230
throw SpannerExceptionFactory.newSpannerException(
215231
ErrorCode.INTERNAL,
216-
"Failed to init transaction, missing transaction id\n" + session.getName());
232+
"Failed to init transaction, missing transaction id\n" + session.getName(),
233+
reqId);
217234
}
218235
return tx.getId();
219236
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ abstract class ResumableStreamIterator extends AbstractIterator<PartialResultSet
7171
private CloseableIterator<PartialResultSet> stream;
7272
private ByteString resumeToken;
7373
private boolean finished;
74+
public XGoogSpannerRequestId xGoogRequestId;
75+
private XGoogSpannerRequestId.RequestIdCreator xGoogRequestIdCreator;
7476

7577
/**
7678
* Indicates whether it is currently safe to retry RPCs. This will be {@code false} if we have
@@ -86,7 +88,8 @@ protected ResumableStreamIterator(
8688
TraceWrapper tracer,
8789
ErrorHandler errorHandler,
8890
RetrySettings streamingRetrySettings,
89-
Set<Code> retryableCodes) {
91+
Set<Code> retryableCodes,
92+
XGoogSpannerRequestId.RequestIdCreator xGoogRequestIdCreator) {
9093
this(
9194
maxBufferSize,
9295
streamName,
@@ -95,7 +98,8 @@ protected ResumableStreamIterator(
9598
Attributes.empty(),
9699
errorHandler,
97100
streamingRetrySettings,
98-
retryableCodes);
101+
retryableCodes,
102+
xGoogRequestIdCreator);
99103
}
100104

101105
protected ResumableStreamIterator(
@@ -106,14 +110,16 @@ protected ResumableStreamIterator(
106110
Attributes attributes,
107111
ErrorHandler errorHandler,
108112
RetrySettings streamingRetrySettings,
109-
Set<Code> retryableCodes) {
113+
Set<Code> retryableCodes,
114+
XGoogSpannerRequestId.RequestIdCreator xGoogRequestIdCreator) {
110115
checkArgument(maxBufferSize >= 0);
111116
this.maxBufferSize = maxBufferSize;
112117
this.tracer = tracer;
113118
this.span = tracer.spanBuilderWithExplicitParent(streamName, parent, attributes);
114119
this.errorHandler = errorHandler;
115120
this.streamingRetrySettings = Preconditions.checkNotNull(streamingRetrySettings);
116121
this.retryableCodes = Preconditions.checkNotNull(retryableCodes);
122+
this.xGoogRequestIdCreator = xGoogRequestIdCreator;
117123
}
118124

119125
private ExponentialBackOff newBackOff() {
@@ -181,15 +187,27 @@ private void backoffSleep(Context context, long backoffMillis) throws SpannerExc
181187
}
182188
if (latch.await(backoffMillis, TimeUnit.MILLISECONDS)) {
183189
// Woken by context cancellation.
184-
throw newSpannerExceptionForCancellation(context, null, null /*TODO: requestId*/);
190+
throw newSpannerExceptionForCancellation(context, null, this.xGoogRequestId);
185191
}
186192
} catch (InterruptedException interruptExcept) {
187-
throw newSpannerExceptionForCancellation(context, interruptExcept, null /*TODO: requestId*/);
193+
throw newSpannerExceptionForCancellation(context, interruptExcept, this.xGoogRequestId);
188194
} finally {
189195
context.removeListener(listener);
190196
}
191197
}
192198

199+
public void ensureNonNullXGoogRequestId() {
200+
if (this.xGoogRequestId == null) {
201+
this.xGoogRequestId =
202+
this.xGoogRequestIdCreator.nextRequestId(1 /*TODO: infer channelId*/, 1 /*attempt*/);
203+
}
204+
}
205+
206+
public void incrementXGoogRequestIdAttempt() {
207+
this.ensureNonNullXGoogRequestId();
208+
this.xGoogRequestId.incrementAttempt();
209+
}
210+
193211
private enum DirectExecutor implements Executor {
194212
INSTANCE;
195213

@@ -281,6 +299,7 @@ protected PartialResultSet computeNext() {
281299
}
282300
assert buffer.isEmpty() || buffer.getLast().getResumeToken().equals(resumeToken);
283301
stream = null;
302+
incrementXGoogRequestIdAttempt();
284303
try (IScope s = tracer.withSpan(span)) {
285304
long delay = spannerException.getRetryDelayInMillis();
286305
if (delay != -1) {
@@ -302,12 +321,14 @@ protected PartialResultSet computeNext() {
302321
if (++numAttemptsOnOtherChannel < errorHandler.getMaxAttempts()
303322
&& prepareIteratorForRetryOnDifferentGrpcChannel()) {
304323
stream = null;
324+
xGoogRequestId = null;
305325
continue;
306326
}
307327
}
308328
}
309329
span.addAnnotation("Stream broken. Not safe to retry", spannerException);
310330
span.setStatus(spannerException);
331+
spannerException.setRequestId(this.xGoogRequestId);
311332
throw spannerException;
312333
} catch (RuntimeException e) {
313334
span.addAnnotation("Stream broken. Not safe to retry", e);
@@ -328,6 +349,11 @@ private void startGrpcStreaming() {
328349
// this Span.
329350
stream = checkNotNull(startStream(resumeToken, streamMessageListener));
330351
stream.requestPrefetchChunks();
352+
if (this.xGoogRequestId == null) {
353+
this.xGoogRequestId =
354+
this.xGoogRequestIdCreator.nextRequestId(
355+
1 /* channelId shall be replaced by the instantiated class. */, 0);
356+
}
331357
}
332358
}
333359
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -300,11 +300,12 @@ public CommitResponse writeAtLeastOnceWithOptions(
300300
}
301301
CommitRequest request = requestBuilder.build();
302302
ISpan span = tracer.spanBuilder(SpannerImpl.COMMIT);
303-
final XGoogSpannerRequestId reqId = reqIdOrFresh(options);
304303

305304
try (IScope s = tracer.withSpan(span)) {
306305
return SpannerRetryHelper.runTxWithRetriesOnAborted(
307306
() -> {
307+
// On Aborted, we have to start a fresh request id.
308+
final XGoogSpannerRequestId reqId = reqIdOrFresh(options);
308309
return new CommitResponse(
309310
spanner.getRpc().commit(request, reqId.withOptions(getOptions())));
310311
});
@@ -464,15 +465,15 @@ public AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption...
464465

465466
@Override
466467
public ApiFuture<Empty> asyncClose() {
467-
XGoogSpannerRequestId reqId = this.getRequestIdCreator().nextRequestId(this.getChannel(), 0);
468+
XGoogSpannerRequestId reqId = this.getRequestIdCreator().nextRequestId(this.getChannel(), 1);
468469
return spanner.getRpc().asyncDeleteSession(getName(), reqId.withOptions(getOptions()));
469470
}
470471

471472
@Override
472473
public void close() {
473474
ISpan span = tracer.spanBuilder(SpannerImpl.DELETE_SESSION);
474475
try (IScope s = tracer.withSpan(span)) {
475-
XGoogSpannerRequestId reqId = this.getRequestIdCreator().nextRequestId(this.getChannel(), 0);
476+
XGoogSpannerRequestId reqId = this.getRequestIdCreator().nextRequestId(this.getChannel(), 1);
476477
spanner.getRpc().deleteSession(getName(), reqId.withOptions(getOptions()));
477478
} catch (RuntimeException e) {
478479
span.setStatus(e);
@@ -516,7 +517,7 @@ ApiFuture<Transaction> beginTransactionAsync(
516517
Transaction txn = requestFuture.get();
517518
if (txn.getId().isEmpty()) {
518519
throw newSpannerException(
519-
ErrorCode.INTERNAL, "Missing id in transaction\n" + getName());
520+
ErrorCode.INTERNAL, "Missing id in transaction\n" + getName(), reqId);
520521
}
521522
span.end();
522523
res.set(txn);
@@ -525,7 +526,7 @@ ApiFuture<Transaction> beginTransactionAsync(
525526
span.end();
526527
res.setException(
527528
SpannerExceptionFactory.newSpannerException(
528-
e.getCause() == null ? e : e.getCause()));
529+
e.getCause() == null ? e : e.getCause(), reqId));
529530
} catch (InterruptedException e) {
530531
span.setStatus(e);
531532
span.end();
@@ -599,7 +600,14 @@ int getChannel() {
599600
if (getIsMultiplexed()) {
600601
return 0;
601602
}
602-
Long channelHint = (Long) this.getOptions().get(SpannerRpc.Option.CHANNEL_HINT);
603+
Map<SpannerRpc.Option, ?> options = this.getOptions();
604+
if (options == null) {
605+
return 0;
606+
}
607+
Long channelHint = (Long) options.get(SpannerRpc.Option.CHANNEL_HINT);
608+
if (channelHint == null) {
609+
return 0;
610+
}
603611
return (int) (channelHint % this.spanner.getOptions().getNumChannels());
604612
}
605613
}

0 commit comments

Comments
 (0)