diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index 6ea4510d3d..2860e86d1b 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -457,9 +457,12 @@ void initTransaction() { } private void initTransactionInternal(BeginTransactionRequest request) { + XGoogSpannerRequestId reqId = + session.getRequestIdCreator().nextRequestId(1 /*TODO: retrieve channelId*/, 1); try { Transaction transaction = - rpc.beginTransaction(request, getTransactionChannelHint(), isRouteToLeader()); + rpc.beginTransaction( + request, reqId.withOptions(getTransactionChannelHint()), isRouteToLeader()); if (!transaction.hasReadTimestamp()) { throw SpannerExceptionFactory.newSpannerException( ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field"); @@ -803,7 +806,8 @@ ResultSet executeQueryInternalWithOptions( tracer.createStatementAttributes(statement, options), session.getErrorHandler(), rpc.getExecuteQueryRetrySettings(), - rpc.getExecuteQueryRetryableCodes()) { + rpc.getExecuteQueryRetryableCodes(), + session.getRequestIdCreator()) { @Override CloseableIterator startStream( @Nullable ByteString resumeToken, @@ -826,11 +830,12 @@ CloseableIterator startStream( if (selector != null) { request.setTransaction(selector); } + this.incrementXGoogRequestIdAttempt(); SpannerRpc.StreamingCall call = rpc.executeQuery( request.build(), stream.consumer(), - getTransactionChannelHint(), + this.xGoogRequestId.withOptions(getTransactionChannelHint()), isRouteToLeader()); session.markUsed(clock.instant()); stream.setCall(call, request.getTransaction().hasBegin()); @@ -1008,7 +1013,8 @@ ResultSet readInternalWithOptions( tracer.createTableAttributes(table, readOptions), session.getErrorHandler(), rpc.getReadRetrySettings(), - rpc.getReadRetryableCodes()) { + rpc.getReadRetryableCodes(), + session.getRequestIdCreator()) { @Override CloseableIterator startStream( @Nullable ByteString resumeToken, @@ -1029,11 +1035,12 @@ CloseableIterator startStream( builder.setTransaction(selector); } builder.setRequestOptions(buildRequestOptions(readOptions)); + this.incrementXGoogRequestIdAttempt(); SpannerRpc.StreamingCall call = rpc.read( builder.build(), stream.consumer(), - getTransactionChannelHint(), + this.xGoogRequestId.withOptions(getTransactionChannelHint()), isRouteToLeader()); session.markUsed(clock.instant()); stream.setCall(call, /* withBeginTransaction= */ builder.getTransaction().hasBegin()); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java index 86aea9ef98..fb036ee762 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java @@ -250,9 +250,11 @@ private List partitionReadUsingIndex( } builder.setPartitionOptions(pbuilder.build()); + XGoogSpannerRequestId reqId = + session.getRequestIdCreator().nextRequestId(1 /*TODO: retrieve channelId*/, 1); final PartitionReadRequest request = builder.build(); try { - PartitionResponse response = rpc.partitionRead(request, options); + PartitionResponse response = rpc.partitionRead(request, reqId.withOptions(options)); ImmutableList.Builder partitions = ImmutableList.builder(); for (com.google.spanner.v1.Partition p : response.getPartitionsList()) { Partition partition = @@ -272,6 +274,7 @@ private List partitionReadUsingIndex( return partitionReadUsingIndex( partitionOptions, table, index, keys, columns, true, option); } + e.setRequestId(reqId); throw e; } } @@ -313,9 +316,11 @@ private List partitionQuery( } builder.setPartitionOptions(pbuilder.build()); + XGoogSpannerRequestId reqId = + session.getRequestIdCreator().nextRequestId(1 /*TODO: retrieve channelId*/, 1); final PartitionQueryRequest request = builder.build(); try { - PartitionResponse response = rpc.partitionQuery(request, options); + PartitionResponse response = rpc.partitionQuery(request, reqId.withOptions(options)); ImmutableList.Builder partitions = ImmutableList.builder(); for (com.google.spanner.v1.Partition p : response.getPartitionsList()) { Partition partition = @@ -328,6 +333,7 @@ private List partitionQuery( if (!isFallback && maybeMarkUnimplementedForPartitionedOps(e)) { return partitionQuery(partitionOptions, statement, true, option); } + e.setRequestId(reqId); throw e; } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java index 93cebb6333..dbaa4c4c5b 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java @@ -80,16 +80,22 @@ long executeStreamingPartitionedUpdate( long updateCount = 0L; Stopwatch stopwatch = Stopwatch.createStarted(ticker); Options options = Options.fromUpdateOptions(updateOptions); + XGoogSpannerRequestId reqId = options.reqId(); + if (reqId == null) { + reqId = session.getRequestIdCreator().nextRequestId(1 /*TODO: infer channelId*/, 0); + } try { ExecuteSqlRequest request = newTransactionRequestFrom(statement, options); while (true) { + reqId.incrementAttempt(); final Duration remainingTimeout = tryUpdateTimeout(timeout, stopwatch); try { ServerStream stream = - rpc.executeStreamingPartitionedDml(request, session.getOptions(), remainingTimeout); + rpc.executeStreamingPartitionedDml( + request, reqId.withOptions(session.getOptions()), remainingTimeout); for (PartialResultSet rs : stream) { if (rs.getResumeToken() != null && !rs.getResumeToken().isEmpty()) { @@ -119,12 +125,18 @@ long executeStreamingPartitionedUpdate( foundStats = false; updateCount = 0L; request = newTransactionRequestFrom(statement, options); + // Create a new xGoogSpannerRequestId. + reqId = session.getRequestIdCreator().nextRequestId(1 /*TODO: infer channelId*/, 0); + } catch (SpannerException e) { + e.setRequestId(reqId); + throw e; } } if (!foundStats) { throw SpannerExceptionFactory.newSpannerException( ErrorCode.INVALID_ARGUMENT, - "Partitioned DML response missing stats possibly due to non-DML statement as input"); + "Partitioned DML response missing stats possibly due to non-DML statement as input", + reqId); } LOGGER.log(Level.FINER, "Finished PartitionedUpdate statement"); return updateCount; @@ -209,11 +221,16 @@ private ByteString initTransaction(final Options options) { .setExcludeTxnFromChangeStreams( options.withExcludeTxnFromChangeStreams() == Boolean.TRUE)) .build(); - Transaction tx = rpc.beginTransaction(request, session.getOptions(), true); + XGoogSpannerRequestId reqId = options.reqId(); + if (reqId == null) { + reqId = session.getRequestIdCreator().nextRequestId(1 /*TODO: infer channelId*/, 1); + } + Transaction tx = rpc.beginTransaction(request, reqId.withOptions(session.getOptions()), true); if (tx.getId().isEmpty()) { throw SpannerExceptionFactory.newSpannerException( ErrorCode.INTERNAL, - "Failed to init transaction, missing transaction id\n" + session.getName()); + "Failed to init transaction, missing transaction id\n" + session.getName(), + reqId); } return tx.getId(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java index 44a0b637f6..6839e4e6bc 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java @@ -71,6 +71,8 @@ abstract class ResumableStreamIterator extends AbstractIterator stream; private ByteString resumeToken; private boolean finished; + public XGoogSpannerRequestId xGoogRequestId; + private XGoogSpannerRequestId.RequestIdCreator xGoogRequestIdCreator; /** * Indicates whether it is currently safe to retry RPCs. This will be {@code false} if we have @@ -86,7 +88,8 @@ protected ResumableStreamIterator( TraceWrapper tracer, ErrorHandler errorHandler, RetrySettings streamingRetrySettings, - Set retryableCodes) { + Set retryableCodes, + XGoogSpannerRequestId.RequestIdCreator xGoogRequestIdCreator) { this( maxBufferSize, streamName, @@ -95,7 +98,8 @@ protected ResumableStreamIterator( Attributes.empty(), errorHandler, streamingRetrySettings, - retryableCodes); + retryableCodes, + xGoogRequestIdCreator); } protected ResumableStreamIterator( @@ -106,7 +110,8 @@ protected ResumableStreamIterator( Attributes attributes, ErrorHandler errorHandler, RetrySettings streamingRetrySettings, - Set retryableCodes) { + Set retryableCodes, + XGoogSpannerRequestId.RequestIdCreator xGoogRequestIdCreator) { checkArgument(maxBufferSize >= 0); this.maxBufferSize = maxBufferSize; this.tracer = tracer; @@ -114,6 +119,7 @@ protected ResumableStreamIterator( this.errorHandler = errorHandler; this.streamingRetrySettings = Preconditions.checkNotNull(streamingRetrySettings); this.retryableCodes = Preconditions.checkNotNull(retryableCodes); + this.xGoogRequestIdCreator = xGoogRequestIdCreator; } private ExponentialBackOff newBackOff() { @@ -181,15 +187,23 @@ private void backoffSleep(Context context, long backoffMillis) throws SpannerExc } if (latch.await(backoffMillis, TimeUnit.MILLISECONDS)) { // Woken by context cancellation. - throw newSpannerExceptionForCancellation(context, null, null /*TODO: requestId*/); + throw newSpannerExceptionForCancellation(context, null, this.xGoogRequestId); } } catch (InterruptedException interruptExcept) { - throw newSpannerExceptionForCancellation(context, interruptExcept, null /*TODO: requestId*/); + throw newSpannerExceptionForCancellation(context, interruptExcept, this.xGoogRequestId); } finally { context.removeListener(listener); } } + public void incrementXGoogRequestIdAttempt() { + if (this.xGoogRequestId == null) { + this.xGoogRequestId = + this.xGoogRequestIdCreator.nextRequestId(1 /*TODO: infer channelId*/, 0 /*attempt*/); + } + this.xGoogRequestId.incrementAttempt(); + } + private enum DirectExecutor implements Executor { INSTANCE; @@ -281,6 +295,7 @@ protected PartialResultSet computeNext() { } assert buffer.isEmpty() || buffer.getLast().getResumeToken().equals(resumeToken); stream = null; + incrementXGoogRequestIdAttempt(); try (IScope s = tracer.withSpan(span)) { long delay = spannerException.getRetryDelayInMillis(); if (delay != -1) { @@ -302,12 +317,14 @@ protected PartialResultSet computeNext() { if (++numAttemptsOnOtherChannel < errorHandler.getMaxAttempts() && prepareIteratorForRetryOnDifferentGrpcChannel()) { stream = null; + xGoogRequestId = null; continue; } } } span.addAnnotation("Stream broken. Not safe to retry", spannerException); span.setStatus(spannerException); + spannerException.setRequestId(this.xGoogRequestId); throw spannerException; } catch (RuntimeException e) { span.addAnnotation("Stream broken. Not safe to retry", e); @@ -328,6 +345,10 @@ private void startGrpcStreaming() { // this Span. stream = checkNotNull(startStream(resumeToken, streamMessageListener)); stream.requestPrefetchChunks(); + if (this.xGoogRequestId == null) { + this.xGoogRequestId = + this.xGoogRequestIdCreator.nextRequestId(1 /*TODO: retrieve channelId*/, 0); + } } } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index c78cfaeea3..653574d4b5 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -300,11 +300,12 @@ public CommitResponse writeAtLeastOnceWithOptions( } CommitRequest request = requestBuilder.build(); ISpan span = tracer.spanBuilder(SpannerImpl.COMMIT); - final XGoogSpannerRequestId reqId = reqIdOrFresh(options); try (IScope s = tracer.withSpan(span)) { return SpannerRetryHelper.runTxWithRetriesOnAborted( () -> { + // On Aborted, we have to start a fresh request id. + final XGoogSpannerRequestId reqId = reqIdOrFresh(options); return new CommitResponse( spanner.getRpc().commit(request, reqId.withOptions(getOptions()))); }); @@ -519,7 +520,7 @@ ApiFuture beginTransactionAsync( Transaction txn = requestFuture.get(); if (txn.getId().isEmpty()) { throw newSpannerException( - ErrorCode.INTERNAL, "Missing id in transaction\n" + getName()); + ErrorCode.INTERNAL, "Missing id in transaction\n" + getName(), reqId); } span.end(); res.set(txn); @@ -528,7 +529,7 @@ ApiFuture beginTransactionAsync( span.end(); res.setException( SpannerExceptionFactory.newSpannerException( - e.getCause() == null ? e : e.getCause())); + e.getCause() == null ? e : e.getCause(), reqId)); } catch (InterruptedException e) { span.setStatus(e); span.end(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerException.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerException.java index 22a5270cef..18d9120b6c 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerException.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerException.java @@ -57,7 +57,7 @@ public String getResourceName() { private final ErrorCode code; private final ApiException apiException; - private final XGoogSpannerRequestId requestId; + private XGoogSpannerRequestId requestId; /** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */ SpannerException( @@ -197,4 +197,9 @@ public ErrorDetails getErrorDetails() { } return null; } + + /** Sets the requestId. This method is meant to be used internally and not by customers. */ + public void setRequestId(XGoogSpannerRequestId reqId) { + this.requestId = reqId; + } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index e35e56f015..11430ce7a1 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -449,6 +449,8 @@ private final class CommitRunnable implements Runnable { @Override public void run() { + XGoogSpannerRequestId reqId = + session.getRequestIdCreator().nextRequestId(1 /*TODO: channelId */, 1); try { prev.get(); if (transactionId == null && transactionIdFuture == null) { @@ -491,7 +493,8 @@ public void run() { final ApiFuture commitFuture; final ISpan opSpan = tracer.spanBuilderWithExplicitParent(SpannerImpl.COMMIT, span); try (IScope ignore = tracer.withSpan(opSpan)) { - commitFuture = rpc.commitAsync(commitRequest, getTransactionChannelHint()); + commitFuture = + rpc.commitAsync(commitRequest, reqId.withOptions(getTransactionChannelHint())); } session.markUsed(clock.instant()); commitFuture.addListener( @@ -502,7 +505,7 @@ public void run() { // future, but we add a result here as well as a safety precaution. res.setException( SpannerExceptionFactory.newSpannerException( - ErrorCode.INTERNAL, "commitFuture is not done")); + ErrorCode.INTERNAL, "commitFuture is not done", reqId)); return; } com.google.spanner.v1.CommitResponse proto = commitFuture.get(); @@ -532,7 +535,9 @@ public void run() { } if (!proto.hasCommitTimestamp()) { throw newSpannerException( - ErrorCode.INTERNAL, "Missing commitTimestamp:\n" + session.getName()); + ErrorCode.INTERNAL, + "Missing commitTimestamp:\n" + session.getName(), + reqId); } span.addAnnotation("Commit Done"); opSpan.end(); @@ -568,7 +573,8 @@ public void run() { res.setException(SpannerExceptionFactory.propagateTimeout(e)); } catch (Throwable e) { res.setException( - SpannerExceptionFactory.newSpannerException(e.getCause() == null ? e : e.getCause())); + SpannerExceptionFactory.newSpannerException( + e.getCause() == null ? e : e.getCause(), reqId)); } } } @@ -923,9 +929,12 @@ private ResultSet internalExecuteUpdate( final ExecuteSqlRequest.Builder builder = getExecuteSqlRequestBuilder( statement, queryMode, options, /* withTransactionSelector= */ true); + XGoogSpannerRequestId reqId = + session.getRequestIdCreator().nextRequestId(1 /*TODO: channelId */, 1); try { com.google.spanner.v1.ResultSet resultSet = - rpc.executeQuery(builder.build(), getTransactionChannelHint(), isRouteToLeader()); + rpc.executeQuery( + builder.build(), reqId.withOptions(getTransactionChannelHint()), isRouteToLeader()); session.markUsed(clock.instant()); if (resultSet.getMetadata().hasTransaction()) { onTransactionMetadata( @@ -1056,9 +1065,11 @@ public long[] batchUpdate(Iterable statements, UpdateOption... update } final ExecuteBatchDmlRequest.Builder builder = getExecuteBatchDmlRequestBuilder(statements, options); + XGoogSpannerRequestId reqId = + session.getRequestIdCreator().nextRequestId(1 /*TODO: channelId */, 1); try { com.google.spanner.v1.ExecuteBatchDmlResponse response = - rpc.executeBatchDml(builder.build(), getTransactionChannelHint()); + rpc.executeBatchDml(builder.build(), reqId.withOptions(getTransactionChannelHint())); session.markUsed(clock.instant()); long[] results = new long[response.getResultSetsCount()]; for (int i = 0; i < response.getResultSetsCount(); ++i) { @@ -1083,7 +1094,7 @@ public long[] batchUpdate(Iterable statements, UpdateOption... update ErrorCode.fromRpcStatus(response.getStatus()), response.getStatus().getMessage(), results, - null /*TODO: requestId*/); + reqId); } return results; } catch (Throwable e) { @@ -1116,11 +1127,15 @@ public ApiFuture batchUpdateAsync( final ExecuteBatchDmlRequest.Builder builder = getExecuteBatchDmlRequestBuilder(statements, options); ApiFuture response; + XGoogSpannerRequestId reqId = + session.getRequestIdCreator().nextRequestId(1 /*TODO: channelId */, 1); try { // Register the update as an async operation that must finish before the transaction may // commit. increaseAsyncOperations(); - response = rpc.executeBatchDmlAsync(builder.build(), getTransactionChannelHint()); + response = + rpc.executeBatchDmlAsync( + builder.build(), reqId.withOptions(getTransactionChannelHint())); session.markUsed(clock.instant()); } catch (Throwable t) { decreaseAsyncOperations(); @@ -1151,7 +1166,7 @@ public ApiFuture batchUpdateAsync( ErrorCode.fromRpcStatus(batchDmlResponse.getStatus()), batchDmlResponse.getStatus().getMessage(), results, - null /*TODO: requestId*/); + reqId); } return results; }, diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 1db7335ef9..862bbc8f29 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -228,20 +228,22 @@ public static void startStaticServer() throws IOException { Set checkMethods = new HashSet( Arrays.asList( - "google.spanner.v1.Spanner/BatchCreateSessions" - // As functionality is added, uncomment each method. - // "google.spanner.v1.Spanner/BatchWrite", - // "google.spanner.v1.Spanner/BeginTransaction", - // "google.spanner.v1.Spanner/CreateSession", - // "google.spanner.v1.Spanner/DeleteSession", - // "google.spanner.v1.Spanner/ExecuteBatchDml", - // "google.spanner.v1.Spanner/ExecuteSql", - // "google.spanner.v1.Spanner/ExecuteStreamingSql", - // "google.spanner.v1.Spanner/StreamingRead", - // "google.spanner.v1.Spanner/PartitionQuery", - // "google.spanner.v1.Spanner/PartitionRead", - // "google.spanner.v1.Spanner/Commit", - )); + "google.spanner.v1.Spanner/BatchCreateSessions", + "google.spanner.v1.Spanner/BatchWrite", + "google.spanner.v1.Spanner/BeginTransaction", + "google.spanner.v1.Spanner/Commit", + "google.spanner.v1.Spanner/CreateSession", + "google.spanner.v1.Spanner/DeleteSession", + "google.spanner.v1.Spanner/ExecuteBatchDml", + "google.spanner.v1.Spanner/ExecuteSql", + "google.spanner.v1.Spanner/ExecuteStreamingSql", + "google.spanner.v1.Spanner/GetSession", + "google.spanner.v1.Spanner/ListSessions", + "google.spanner.v1.Spanner/PartitionQuery", + "google.spanner.v1.Spanner/PartitionRead", + "google.spanner.v1.Spanner/Read", + "google.spanner.v1.Spanner/Rollback", + "google.spanner.v1.Spanner/StreamingRead")); xGoogReqIdInterceptor = new XGoogSpannerRequestIdTest.ServerHeaderEnforcer(checkMethods); executor = Executors.newSingleThreadExecutor(); String uniqueName = InProcessServerBuilder.generateName(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java index 0a2beea773..8129ddbd5d 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java @@ -96,6 +96,8 @@ public class PartitionedDmlTransactionTest { public void setup() { MockitoAnnotations.initMocks(this); when(session.getName()).thenReturn(sessionId); + when(session.getRequestIdCreator()) + .thenReturn(new XGoogSpannerRequestId.NoopRequestIdCreator()); when(session.getOptions()).thenReturn(Collections.EMPTY_MAP); when(rpc.beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true))) .thenReturn(Transaction.newBuilder().setId(txId).build()); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java index ebe8672467..5588b47866 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java @@ -162,7 +162,8 @@ private void initWithLimit(int maxBufferSize) { new TraceWrapper(Tracing.getTracer(), OpenTelemetry.noop().getTracer(""), false), DefaultErrorHandler.INSTANCE, SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetrySettings(), - SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetryableCodes()) { + SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetryableCodes(), + new XGoogSpannerRequestId.NoopRequestIdCreator()) { @Override AbstractResultSet.CloseableIterator startStream( @Nullable ByteString resumeToken, diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index df2c89fd76..8d00f0889b 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -1481,6 +1481,8 @@ public void testSessionNotFoundReadWriteTransaction() { when(closedSession.getName()) .thenReturn("projects/dummy/instances/dummy/database/dummy/sessions/session-closed"); when(closedSession.getErrorHandler()).thenReturn(DefaultErrorHandler.INSTANCE); + when(closedSession.getRequestIdCreator()) + .thenReturn(new XGoogSpannerRequestId.NoopRequestIdCreator()); Span oTspan = mock(Span.class); ISpan span = new OpenTelemetrySpan(oTspan); @@ -1521,6 +1523,8 @@ public void testSessionNotFoundReadWriteTransaction() { TransactionRunnerImpl openTransactionRunner = new TransactionRunnerImpl(openSession); openTransactionRunner.setSpan(span); when(openSession.readWriteTransaction()).thenReturn(openTransactionRunner); + when(openSession.getRequestIdCreator()) + .thenReturn(new XGoogSpannerRequestId.NoopRequestIdCreator()); ResultSet openResultSet = mock(ResultSet.class); when(openResultSet.next()).thenReturn(true, false); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionContextImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionContextImplTest.java index 561bfb8900..4c507b7bea 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionContextImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionContextImplTest.java @@ -67,6 +67,8 @@ public void setup() { .setCommitTimestamp(Timestamp.newBuilder().setSeconds(99L).setNanos(10).build()) .build())); when(session.getName()).thenReturn("test"); + when(session.getRequestIdCreator()) + .thenReturn(new XGoogSpannerRequestId.NoopRequestIdCreator()); doNothing().when(span).setStatus(any(Throwable.class)); doNothing().when(span).end(); doNothing().when(span).addAnnotation("Starting Commit"); @@ -210,6 +212,8 @@ public void testReturnCommitStats() { private void batchDml(int status) { SessionImpl session = mock(SessionImpl.class); when(session.getName()).thenReturn("test"); + when(session.getRequestIdCreator()) + .thenReturn(new XGoogSpannerRequestId.NoopRequestIdCreator()); SpannerRpc rpc = mock(SpannerRpc.class); ExecuteBatchDmlResponse response = ExecuteBatchDmlResponse.newBuilder() diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java index c2f1009b43..2325f2ac40 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java @@ -121,6 +121,8 @@ public void setUp() { when(session.getErrorHandler()).thenReturn(DefaultErrorHandler.INSTANCE); when(session.newTransaction(eq(Options.fromTransactionOptions()), any())).thenReturn(txn); when(session.getTracer()).thenReturn(tracer); + when(session.getRequestIdCreator()) + .thenReturn(new XGoogSpannerRequestId.NoopRequestIdCreator()); when(rpc.executeQuery(Mockito.any(ExecuteSqlRequest.class), Mockito.anyMap(), eq(true))) .thenAnswer( invocation -> { @@ -334,6 +336,7 @@ public void inlineBegin() { spanner, new SessionReference( "projects/p/instances/i/databases/d/sessions/s", Collections.EMPTY_MAP)) {}; + session.setRequestIdCreator(new XGoogSpannerRequestId.NoopRequestIdCreator()); session.setCurrentSpan(new OpenTelemetrySpan(mock(io.opentelemetry.api.trace.Span.class))); TransactionRunnerImpl runner = new TransactionRunnerImpl(session); runner.setSpan(span);