Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,11 @@ void initTransaction() {

private void initTransactionInternal(BeginTransactionRequest request) {
try {
XGoogSpannerRequestId reqId =
session.getRequestIdCreator().nextRequestId(1 /*TODO: retrieve channelId*/, 1);
Transaction transaction =
rpc.beginTransaction(request, getTransactionChannelHint(), isRouteToLeader());
rpc.beginTransaction(
request, reqId.withOptions(getTransactionChannelHint()), isRouteToLeader());
if (!transaction.hasReadTimestamp()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field");
Expand Down Expand Up @@ -803,7 +806,8 @@ ResultSet executeQueryInternalWithOptions(
tracer.createStatementAttributes(statement, options),
session.getErrorHandler(),
rpc.getExecuteQueryRetrySettings(),
rpc.getExecuteQueryRetryableCodes()) {
rpc.getExecuteQueryRetryableCodes(),
session.getRequestIdCreator()) {
@Override
CloseableIterator<PartialResultSet> startStream(
@Nullable ByteString resumeToken,
Expand All @@ -826,11 +830,13 @@ CloseableIterator<PartialResultSet> startStream(
if (selector != null) {
request.setTransaction(selector);
}

this.incrementXGoogRequestIdAttempt();
SpannerRpc.StreamingCall call =
rpc.executeQuery(
request.build(),
stream.consumer(),
getTransactionChannelHint(),
this.xGoogRequestId.withOptions(getTransactionChannelHint()),
isRouteToLeader());
session.markUsed(clock.instant());
stream.setCall(call, request.getTransaction().hasBegin());
Expand Down Expand Up @@ -1008,7 +1014,8 @@ ResultSet readInternalWithOptions(
tracer.createTableAttributes(table, readOptions),
session.getErrorHandler(),
rpc.getReadRetrySettings(),
rpc.getReadRetryableCodes()) {
rpc.getReadRetryableCodes(),
session.getRequestIdCreator()) {
@Override
CloseableIterator<PartialResultSet> startStream(
@Nullable ByteString resumeToken,
Expand All @@ -1029,11 +1036,12 @@ CloseableIterator<PartialResultSet> startStream(
builder.setTransaction(selector);
}
builder.setRequestOptions(buildRequestOptions(readOptions));
this.incrementXGoogRequestIdAttempt();
SpannerRpc.StreamingCall call =
rpc.read(
builder.build(),
stream.consumer(),
getTransactionChannelHint(),
this.xGoogRequestId.withOptions(getTransactionChannelHint()),
isRouteToLeader());
session.markUsed(clock.instant());
stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ private List<Partition> partitionReadUsingIndex(

final PartitionReadRequest request = builder.build();
try {
PartitionResponse response = rpc.partitionRead(request, options);
XGoogSpannerRequestId reqId =
this.session.getRequestIdCreator().nextRequestId(1 /* channelId */, 0);
PartitionResponse response = rpc.partitionRead(request, reqId.withOptions(options));
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
Partition partition =
Expand Down Expand Up @@ -315,7 +317,9 @@ private List<Partition> partitionQuery(

final PartitionQueryRequest request = builder.build();
try {
PartitionResponse response = rpc.partitionQuery(request, options);
XGoogSpannerRequestId reqId =
this.session.getRequestIdCreator().nextRequestId(1 /* channelId */, 0);
PartitionResponse response = rpc.partitionQuery(request, reqId.withOptions(options));
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
Partition partition =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
import com.google.cloud.spanner.SpannerImpl.ClosedException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.spanner.v1.BatchWriteResponse;
import io.opentelemetry.api.common.Attributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import javax.annotation.Nullable;

class DatabaseClientImpl implements DatabaseClient {
Expand All @@ -40,6 +44,8 @@ class DatabaseClientImpl implements DatabaseClient {
@VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient;
@VisibleForTesting final boolean useMultiplexedSessionPartitionedOps;
@VisibleForTesting final boolean useMultiplexedSessionForRW;
private final int dbId;
private final AtomicInteger nthRequest;

final boolean useMultiplexedSessionBlindWrite;

Expand Down Expand Up @@ -86,6 +92,18 @@ class DatabaseClientImpl implements DatabaseClient {
this.tracer = tracer;
this.useMultiplexedSessionForRW = useMultiplexedSessionForRW;
this.commonAttributes = commonAttributes;

this.dbId = this.dbIdFromClientId(this.clientId);
this.nthRequest = new AtomicInteger(0);
}

private int dbIdFromClientId(String clientId) {
int i = clientId.indexOf("-");
String strWithValue = clientId.substring(i + 1);
if (Objects.equals(strWithValue, "")) {
strWithValue = "0";
}
return Integer.parseInt(strWithValue);
}

@VisibleForTesting
Expand Down Expand Up @@ -159,7 +177,11 @@ public CommitResponse writeWithOptions(
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
return getMultiplexedSessionDatabaseClient().writeWithOptions(mutations, options);
}
return runWithSessionRetry(session -> session.writeWithOptions(mutations, options));

return runWithSessionRetry(
(session, reqId) -> {
return session.writeWithOptions(mutations, withReqId(reqId, options));
});
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
Expand All @@ -177,14 +199,27 @@ public Timestamp writeAtLeastOnce(final Iterable<Mutation> mutations) throws Spa
public CommitResponse writeAtLeastOnceWithOptions(
final Iterable<Mutation> mutations, final TransactionOption... options)
throws SpannerException {
CommitResponse res = doWriteAtLeastOnceWithOptions(mutations, options);
System.out.println("\033[33mCommitResponse: " + res + "\033[00m");
return res;
}

private CommitResponse doWriteAtLeastOnceWithOptions(
final Iterable<Mutation> mutations, final TransactionOption... options)
throws SpannerException {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
try (IScope s = tracer.withSpan(span)) {
if (useMultiplexedSessionBlindWrite && getMultiplexedSessionDatabaseClient() != null) {
return getMultiplexedSessionDatabaseClient()
.writeAtLeastOnceWithOptions(mutations, options);
}

return runWithSessionRetry(
session -> session.writeAtLeastOnceWithOptions(mutations, options));
(session, reqId) -> {
CommitResponse in = session.writeAtLeastOnceWithOptions(mutations, withReqId(reqId, options));
System.out.println("\033[35minternalDo: " + in + "\033[00m");
return in;
});
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
Expand All @@ -193,6 +228,10 @@ public CommitResponse writeAtLeastOnceWithOptions(
}
}

private int nextNthRequest() {
return this.nthRequest.incrementAndGet();
}

@Override
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
final Iterable<MutationGroup> mutationGroups, final TransactionOption... options)
Expand All @@ -202,7 +241,9 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
return getMultiplexedSessionDatabaseClient().batchWriteAtLeastOnce(mutationGroups, options);
}
return runWithSessionRetry(session -> session.batchWriteAtLeastOnce(mutationGroups, options));
return runWithSessionRetry(
(session, reqId) ->
session.batchWriteAtLeastOnce(mutationGroups, withReqId(reqId, options)));
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
Expand Down Expand Up @@ -346,27 +387,57 @@ public long executePartitionedUpdate(final Statement stmt, final UpdateOption...
return executePartitionedUpdateWithPooledSession(stmt, options);
}

private UpdateOption[] withReqId(
final XGoogSpannerRequestId reqId, final UpdateOption... options) {
if (reqId == null) {
return options;
}
ArrayList<UpdateOption> allOptions = new ArrayList(Arrays.asList(options));
allOptions.add(new Options.RequestIdOption(reqId));
return allOptions.toArray(new UpdateOption[0]);
}

private TransactionOption[] withReqId(
final XGoogSpannerRequestId reqId, final TransactionOption... options) {
if (reqId == null) {
return options;
}
ArrayList<TransactionOption> allOptions = new ArrayList(Arrays.asList(options));
allOptions.add(new Options.RequestIdOption(reqId));
return allOptions.toArray(new TransactionOption[0]);
}

private long executePartitionedUpdateWithPooledSession(
final Statement stmt, final UpdateOption... options) {
ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION, commonAttributes);
try (IScope s = tracer.withSpan(span)) {
return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options));
return runWithSessionRetry(
(session, reqId) -> {
return session.executePartitionedUpdate(stmt, withReqId(reqId, options));
});
} catch (RuntimeException e) {
span.setStatus(e);
span.end();
throw e;
}
}

private <T> T runWithSessionRetry(Function<Session, T> callable) {
private <T> T runWithSessionRetry(BiFunction<Session, XGoogSpannerRequestId, T> callable) {
PooledSessionFuture session = getSession();
XGoogSpannerRequestId reqId =
XGoogSpannerRequestId.of(
this.dbId, Long.valueOf(session.getChannel()), this.nextNthRequest(), 0);
while (true) {
try {
return callable.apply(session);
reqId.incrementAttempt();
return callable.apply(session, reqId);
} catch (SessionNotFoundException e) {
session =
(PooledSessionFuture)
pool.getPooledSessionReplacementHandler().replaceSession(e, session);
reqId =
XGoogSpannerRequestId.of(
this.dbId, Long.valueOf(session.getChannel()), this.nextNthRequest(), 0);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ void appendToOptions(Options options) {
private RpcLockHint lockHint;
private Boolean lastStatement;
private IsolationLevel isolationLevel;
private XGoogSpannerRequestId reqId;

// Construction is via factory methods below.
private Options() {}
Expand Down Expand Up @@ -591,6 +592,14 @@ String pageToken() {
return pageToken;
}

boolean hasReqId() {
return reqId != null;
}

XGoogSpannerRequestId reqId() {
return reqId;
}

boolean hasFilter() {
return filter != null;
}
Expand Down Expand Up @@ -756,6 +765,9 @@ public String toString() {
if (isolationLevel != null) {
b.append("isolationLevel: ").append(isolationLevel).append(' ');
}
if (reqId != null) {
b.append("requestId: ").append(reqId.toString());
}
return b.toString();
}

Expand Down Expand Up @@ -798,7 +810,8 @@ public boolean equals(Object o) {
&& Objects.equals(orderBy(), that.orderBy())
&& Objects.equals(isLastStatement(), that.isLastStatement())
&& Objects.equals(lockHint(), that.lockHint())
&& Objects.equals(isolationLevel(), that.isolationLevel());
&& Objects.equals(isolationLevel(), that.isolationLevel())
&& Objects.equals(reqId(), that.reqId());
}

@Override
Expand Down Expand Up @@ -867,6 +880,9 @@ public int hashCode() {
if (isolationLevel != null) {
result = 31 * result + isolationLevel.hashCode();
}
if (reqId != null) {
result = 31 * result + reqId.hashCode();
}
return result;
}

Expand Down Expand Up @@ -1052,4 +1068,30 @@ public boolean equals(Object o) {
return o instanceof LastStatementUpdateOption;
}
}

static final class RequestIdOption extends InternalOption
implements TransactionOption, UpdateOption {
private final XGoogSpannerRequestId reqId;

RequestIdOption(XGoogSpannerRequestId reqId) {
this.reqId = reqId;
}

@Override
void appendToOptions(Options options) {
options.reqId = this.reqId;
}

@Override
public int hashCode() {
return RequestIdOption.class.hashCode();
}

@Override
public boolean equals(Object o) {
// TODO: Examine why the precedent for LastStatementUpdateOption
// does not check against the actual value.
return o instanceof RequestIdOption;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,22 @@ long executeStreamingPartitionedUpdate(
long updateCount = 0L;
Stopwatch stopwatch = Stopwatch.createStarted(ticker);
Options options = Options.fromUpdateOptions(updateOptions);
XGoogSpannerRequestId reqId = options.reqId();
if (reqId == null) {
reqId = session.getRequestIdCreator().nextRequestId(1 /*TODO: infer channelId*/, 0);
}

try {
ExecuteSqlRequest request = newTransactionRequestFrom(statement, options);

while (true) {
reqId.incrementAttempt();
final Duration remainingTimeout = tryUpdateTimeout(timeout, stopwatch);

try {
ServerStream<PartialResultSet> stream =
rpc.executeStreamingPartitionedDml(request, session.getOptions(), remainingTimeout);
rpc.executeStreamingPartitionedDml(
request, reqId.withOptions(session.getOptions()), remainingTimeout);

for (PartialResultSet rs : stream) {
if (rs.getResumeToken() != null && !rs.getResumeToken().isEmpty()) {
Expand All @@ -113,12 +119,17 @@ long executeStreamingPartitionedUpdate(
LOGGER.log(
Level.FINER, "Retrying PartitionedDml transaction after InternalException - EOS", e);
request = resumeOrRestartRequest(resumeToken, statement, request, options);
if (resumeToken.isEmpty()) {
// Create a new xGoogSpannerRequestId.
reqId = session.getRequestIdCreator().nextRequestId(1 /*TODO: infer channelId*/, 0);
}
} catch (AbortedException e) {
LOGGER.log(Level.FINER, "Retrying PartitionedDml transaction after AbortedException", e);
resumeToken = ByteString.EMPTY;
foundStats = false;
updateCount = 0L;
request = newTransactionRequestFrom(statement, options);
reqId = session.getRequestIdCreator().nextRequestId(1 /*TODO: infer channelId*/, 0);
}
}
if (!foundStats) {
Expand Down Expand Up @@ -209,7 +220,11 @@ private ByteString initTransaction(final Options options) {
.setExcludeTxnFromChangeStreams(
options.withExcludeTxnFromChangeStreams() == Boolean.TRUE))
.build();
Transaction tx = rpc.beginTransaction(request, session.getOptions(), true);
XGoogSpannerRequestId reqId = options.reqId();
if (reqId == null) {
reqId = session.getRequestIdCreator().nextRequestId(1 /*TODO: infer channelId*/, 1);
}
Transaction tx = rpc.beginTransaction(request, reqId.withOptions(session.getOptions()), true);
if (tx.getId().isEmpty()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL,
Expand Down
Loading