Skip to content

chore(x-goog-spanner-request-id): add BeginTransaction+ResumableStreamIterator #3898

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -457,9 +457,12 @@ void initTransaction() {
}

private void initTransactionInternal(BeginTransactionRequest request) {
XGoogSpannerRequestId reqId =
session.getRequestIdCreator().nextRequestId(1 /*TODO: retrieve channelId*/, 1);
try {
Transaction transaction =
rpc.beginTransaction(request, getTransactionChannelHint(), isRouteToLeader());
rpc.beginTransaction(
request, reqId.withOptions(getTransactionChannelHint()), isRouteToLeader());
if (!transaction.hasReadTimestamp()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field");
Expand Down Expand Up @@ -803,7 +806,8 @@ ResultSet executeQueryInternalWithOptions(
tracer.createStatementAttributes(statement, options),
session.getErrorHandler(),
rpc.getExecuteQueryRetrySettings(),
rpc.getExecuteQueryRetryableCodes()) {
rpc.getExecuteQueryRetryableCodes(),
session.getRequestIdCreator()) {
@Override
CloseableIterator<PartialResultSet> startStream(
@Nullable ByteString resumeToken,
Expand All @@ -826,11 +830,12 @@ CloseableIterator<PartialResultSet> startStream(
if (selector != null) {
request.setTransaction(selector);
}
this.incrementXGoogRequestIdAttempt();
SpannerRpc.StreamingCall call =
rpc.executeQuery(
request.build(),
stream.consumer(),
getTransactionChannelHint(),
this.xGoogRequestId.withOptions(getTransactionChannelHint()),
isRouteToLeader());
session.markUsed(clock.instant());
stream.setCall(call, request.getTransaction().hasBegin());
Expand Down Expand Up @@ -1008,7 +1013,8 @@ ResultSet readInternalWithOptions(
tracer.createTableAttributes(table, readOptions),
session.getErrorHandler(),
rpc.getReadRetrySettings(),
rpc.getReadRetryableCodes()) {
rpc.getReadRetryableCodes(),
session.getRequestIdCreator()) {
@Override
CloseableIterator<PartialResultSet> startStream(
@Nullable ByteString resumeToken,
Expand All @@ -1029,11 +1035,12 @@ CloseableIterator<PartialResultSet> startStream(
builder.setTransaction(selector);
}
builder.setRequestOptions(buildRequestOptions(readOptions));
this.incrementXGoogRequestIdAttempt();
SpannerRpc.StreamingCall call =
rpc.read(
builder.build(),
stream.consumer(),
getTransactionChannelHint(),
this.xGoogRequestId.withOptions(getTransactionChannelHint()),
isRouteToLeader());
session.markUsed(clock.instant());
stream.setCall(call, /* withBeginTransaction= */ builder.getTransaction().hasBegin());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,11 @@ private List<Partition> partitionReadUsingIndex(
}
builder.setPartitionOptions(pbuilder.build());

XGoogSpannerRequestId reqId =
session.getRequestIdCreator().nextRequestId(1 /*TODO: retrieve channelId*/, 1);
final PartitionReadRequest request = builder.build();
try {
PartitionResponse response = rpc.partitionRead(request, options);
PartitionResponse response = rpc.partitionRead(request, reqId.withOptions(options));
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
Partition partition =
Expand All @@ -272,6 +274,7 @@ private List<Partition> partitionReadUsingIndex(
return partitionReadUsingIndex(
partitionOptions, table, index, keys, columns, true, option);
}
e.setRequestId(reqId);
throw e;
}
}
Expand Down Expand Up @@ -313,9 +316,11 @@ private List<Partition> partitionQuery(
}
builder.setPartitionOptions(pbuilder.build());

XGoogSpannerRequestId reqId =
session.getRequestIdCreator().nextRequestId(1 /*TODO: retrieve channelId*/, 1);
final PartitionQueryRequest request = builder.build();
try {
PartitionResponse response = rpc.partitionQuery(request, options);
PartitionResponse response = rpc.partitionQuery(request, reqId.withOptions(options));
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
Partition partition =
Expand All @@ -328,6 +333,7 @@ private List<Partition> partitionQuery(
if (!isFallback && maybeMarkUnimplementedForPartitionedOps(e)) {
return partitionQuery(partitionOptions, statement, true, option);
}
e.setRequestId(reqId);
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,22 @@ long executeStreamingPartitionedUpdate(
long updateCount = 0L;
Stopwatch stopwatch = Stopwatch.createStarted(ticker);
Options options = Options.fromUpdateOptions(updateOptions);
XGoogSpannerRequestId reqId = options.reqId();
if (reqId == null) {
reqId = session.getRequestIdCreator().nextRequestId(1 /*TODO: infer channelId*/, 0);
}

try {
ExecuteSqlRequest request = newTransactionRequestFrom(statement, options);

while (true) {
reqId.incrementAttempt();
final Duration remainingTimeout = tryUpdateTimeout(timeout, stopwatch);

try {
ServerStream<PartialResultSet> stream =
rpc.executeStreamingPartitionedDml(request, session.getOptions(), remainingTimeout);
rpc.executeStreamingPartitionedDml(
request, reqId.withOptions(session.getOptions()), remainingTimeout);

for (PartialResultSet rs : stream) {
if (rs.getResumeToken() != null && !rs.getResumeToken().isEmpty()) {
Expand Down Expand Up @@ -119,12 +125,18 @@ long executeStreamingPartitionedUpdate(
foundStats = false;
updateCount = 0L;
request = newTransactionRequestFrom(statement, options);
// Create a new xGoogSpannerRequestId.
reqId = session.getRequestIdCreator().nextRequestId(1 /*TODO: infer channelId*/, 0);
} catch (SpannerException e) {
e.setRequestId(reqId);
throw e;
}
}
if (!foundStats) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT,
"Partitioned DML response missing stats possibly due to non-DML statement as input");
"Partitioned DML response missing stats possibly due to non-DML statement as input",
reqId);
}
LOGGER.log(Level.FINER, "Finished PartitionedUpdate statement");
return updateCount;
Expand Down Expand Up @@ -209,11 +221,16 @@ private ByteString initTransaction(final Options options) {
.setExcludeTxnFromChangeStreams(
options.withExcludeTxnFromChangeStreams() == Boolean.TRUE))
.build();
Transaction tx = rpc.beginTransaction(request, session.getOptions(), true);
XGoogSpannerRequestId reqId = options.reqId();
if (reqId == null) {
reqId = session.getRequestIdCreator().nextRequestId(1 /*TODO: infer channelId*/, 1);
}
Transaction tx = rpc.beginTransaction(request, reqId.withOptions(session.getOptions()), true);
if (tx.getId().isEmpty()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL,
"Failed to init transaction, missing transaction id\n" + session.getName());
"Failed to init transaction, missing transaction id\n" + session.getName(),
reqId);
}
return tx.getId();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ abstract class ResumableStreamIterator extends AbstractIterator<PartialResultSet
private CloseableIterator<PartialResultSet> stream;
private ByteString resumeToken;
private boolean finished;
public XGoogSpannerRequestId xGoogRequestId;
private XGoogSpannerRequestId.RequestIdCreator xGoogRequestIdCreator;

/**
* Indicates whether it is currently safe to retry RPCs. This will be {@code false} if we have
Expand All @@ -86,7 +88,8 @@ protected ResumableStreamIterator(
TraceWrapper tracer,
ErrorHandler errorHandler,
RetrySettings streamingRetrySettings,
Set<Code> retryableCodes) {
Set<Code> retryableCodes,
XGoogSpannerRequestId.RequestIdCreator xGoogRequestIdCreator) {
this(
maxBufferSize,
streamName,
Expand All @@ -95,7 +98,8 @@ protected ResumableStreamIterator(
Attributes.empty(),
errorHandler,
streamingRetrySettings,
retryableCodes);
retryableCodes,
xGoogRequestIdCreator);
}

protected ResumableStreamIterator(
Expand All @@ -106,14 +110,16 @@ protected ResumableStreamIterator(
Attributes attributes,
ErrorHandler errorHandler,
RetrySettings streamingRetrySettings,
Set<Code> retryableCodes) {
Set<Code> retryableCodes,
XGoogSpannerRequestId.RequestIdCreator xGoogRequestIdCreator) {
checkArgument(maxBufferSize >= 0);
this.maxBufferSize = maxBufferSize;
this.tracer = tracer;
this.span = tracer.spanBuilderWithExplicitParent(streamName, parent, attributes);
this.errorHandler = errorHandler;
this.streamingRetrySettings = Preconditions.checkNotNull(streamingRetrySettings);
this.retryableCodes = Preconditions.checkNotNull(retryableCodes);
this.xGoogRequestIdCreator = xGoogRequestIdCreator;
}

private ExponentialBackOff newBackOff() {
Expand Down Expand Up @@ -181,15 +187,23 @@ private void backoffSleep(Context context, long backoffMillis) throws SpannerExc
}
if (latch.await(backoffMillis, TimeUnit.MILLISECONDS)) {
// Woken by context cancellation.
throw newSpannerExceptionForCancellation(context, null, null /*TODO: requestId*/);
throw newSpannerExceptionForCancellation(context, null, this.xGoogRequestId);
}
} catch (InterruptedException interruptExcept) {
throw newSpannerExceptionForCancellation(context, interruptExcept, null /*TODO: requestId*/);
throw newSpannerExceptionForCancellation(context, interruptExcept, this.xGoogRequestId);
} finally {
context.removeListener(listener);
}
}

public void incrementXGoogRequestIdAttempt() {
if (this.xGoogRequestId == null) {
this.xGoogRequestId =
this.xGoogRequestIdCreator.nextRequestId(1 /*TODO: infer channelId*/, 0 /*attempt*/);
}
this.xGoogRequestId.incrementAttempt();
}

private enum DirectExecutor implements Executor {
INSTANCE;

Expand Down Expand Up @@ -281,6 +295,7 @@ protected PartialResultSet computeNext() {
}
assert buffer.isEmpty() || buffer.getLast().getResumeToken().equals(resumeToken);
stream = null;
incrementXGoogRequestIdAttempt();
try (IScope s = tracer.withSpan(span)) {
long delay = spannerException.getRetryDelayInMillis();
if (delay != -1) {
Expand All @@ -302,12 +317,14 @@ protected PartialResultSet computeNext() {
if (++numAttemptsOnOtherChannel < errorHandler.getMaxAttempts()
&& prepareIteratorForRetryOnDifferentGrpcChannel()) {
stream = null;
xGoogRequestId = null;
continue;
}
}
}
span.addAnnotation("Stream broken. Not safe to retry", spannerException);
span.setStatus(spannerException);
spannerException.setRequestId(this.xGoogRequestId);
throw spannerException;
} catch (RuntimeException e) {
span.addAnnotation("Stream broken. Not safe to retry", e);
Expand All @@ -328,6 +345,10 @@ private void startGrpcStreaming() {
// this Span.
stream = checkNotNull(startStream(resumeToken, streamMessageListener));
stream.requestPrefetchChunks();
if (this.xGoogRequestId == null) {
this.xGoogRequestId =
this.xGoogRequestIdCreator.nextRequestId(1 /*TODO: retrieve channelId*/, 0);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,12 @@ public CommitResponse writeAtLeastOnceWithOptions(
}
CommitRequest request = requestBuilder.build();
ISpan span = tracer.spanBuilder(SpannerImpl.COMMIT);
final XGoogSpannerRequestId reqId = reqIdOrFresh(options);

try (IScope s = tracer.withSpan(span)) {
return SpannerRetryHelper.runTxWithRetriesOnAborted(
() -> {
// On Aborted, we have to start a fresh request id.
final XGoogSpannerRequestId reqId = reqIdOrFresh(options);
return new CommitResponse(
spanner.getRpc().commit(request, reqId.withOptions(getOptions())));
});
Expand Down Expand Up @@ -519,7 +520,7 @@ ApiFuture<Transaction> beginTransactionAsync(
Transaction txn = requestFuture.get();
if (txn.getId().isEmpty()) {
throw newSpannerException(
ErrorCode.INTERNAL, "Missing id in transaction\n" + getName());
ErrorCode.INTERNAL, "Missing id in transaction\n" + getName(), reqId);
}
span.end();
res.set(txn);
Expand All @@ -528,7 +529,7 @@ ApiFuture<Transaction> beginTransactionAsync(
span.end();
res.setException(
SpannerExceptionFactory.newSpannerException(
e.getCause() == null ? e : e.getCause()));
e.getCause() == null ? e : e.getCause(), reqId));
} catch (InterruptedException e) {
span.setStatus(e);
span.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public String getResourceName() {

private final ErrorCode code;
private final ApiException apiException;
private final XGoogSpannerRequestId requestId;
private XGoogSpannerRequestId requestId;

/** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */
SpannerException(
Expand Down Expand Up @@ -197,4 +197,9 @@ public ErrorDetails getErrorDetails() {
}
return null;
}

/** Sets the requestId. This method is meant to be used internally and not by customers. */
public void setRequestId(XGoogSpannerRequestId reqId) {
this.requestId = reqId;
}
}
Loading