Skip to content

Commit d93441a

Browse files
committed
Fix up withNthRequest for deterministic checks for BatchCreateSessions
1 parent 1bdf647 commit d93441a

File tree

5 files changed

+41
-23
lines changed

5 files changed

+41
-23
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import io.grpc.Status;
4242
import java.time.Duration;
4343
import java.time.temporal.ChronoUnit;
44+
import java.util.Arrays;
4445
import java.util.Map;
4546
import java.util.concurrent.TimeUnit;
4647
import java.util.logging.Level;
@@ -79,11 +80,11 @@ long executeStreamingPartitionedUpdate(
7980
boolean foundStats = false;
8081
long updateCount = 0L;
8182
Stopwatch stopwatch = Stopwatch.createStarted(ticker);
82-
Options options = Options.fromUpdateOptions(updateOptions);
83-
XGoogSpannerRequestId reqId = options.reqId();
84-
if (reqId == null) {
85-
reqId = session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
86-
}
83+
XGoogSpannerRequestId reqId =
84+
session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
85+
UpdateOption[] allOptions = Arrays.copyOf(updateOptions, updateOptions.length + 1);
86+
allOptions[updateOptions.length] = new Options.RequestIdOption(reqId);
87+
Options options = Options.fromUpdateOptions(allOptions);
8788

8889
try {
8990
ExecuteSqlRequest request = newTransactionRequestFrom(statement, options);
@@ -222,10 +223,8 @@ private ByteString initTransaction(final Options options) {
222223
.setExcludeTxnFromChangeStreams(
223224
options.withExcludeTxnFromChangeStreams() == Boolean.TRUE))
224225
.build();
225-
XGoogSpannerRequestId reqId = options.reqId();
226-
if (reqId == null) {
227-
reqId = session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
228-
}
226+
XGoogSpannerRequestId reqId =
227+
session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
229228
Transaction tx = rpc.beginTransaction(request, reqId.withOptions(session.getOptions()), true);
230229
if (tx.getId().isEmpty()) {
231230
throw SpannerExceptionFactory.newSpannerException(

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,6 @@ public void run() {
148148
.addAnnotation(String.format("Creating %d sessions", sessionCount));
149149
while (remainingSessionsToCreate > 0) {
150150
try {
151-
System.out.println("\033[35mchannelHint: " + channelHint + "\033[00m");
152151
sessions = internalBatchCreateSessions(remainingSessionsToCreate, channelHint);
153152
} catch (Throwable t) {
154153
spanner.getTracer().getCurrentSpan().setStatus(t);
@@ -224,10 +223,8 @@ DatabaseId getDatabaseId() {
224223

225224
@Override
226225
public XGoogSpannerRequestId nextRequestId(long channelId, int attempt) {
227-
long nthReq = this.nthRequest.incrementAndGet();
228-
// System.out.println("\033[36mnthRequest.addr: " + System.identityHashCode(this.nthRequest) + "
229-
// value: " + nthReq + "\033[00m");
230-
return XGoogSpannerRequestId.of(this.nthId, nthReq, channelId, attempt);
226+
return XGoogSpannerRequestId.of(
227+
this.nthId, channelId, this.nthRequest.incrementAndGet(), attempt);
231228
}
232229

233230
/** Create a single session. */

google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,4 +176,10 @@ public XGoogSpannerRequestId nextRequestId(long channelId, int attempt) {
176176
return XGoogSpannerRequestId.of(1, 1, 1, 0);
177177
}
178178
}
179+
180+
@VisibleForTesting
181+
XGoogSpannerRequestId withNthRequest(long replacementNthRequest) {
182+
return XGoogSpannerRequestId.of(
183+
this.nthClientId, this.nthChannelId, replacementNthRequest, this.attempt);
184+
}
179185
}

google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2913,28 +2913,29 @@ public void testPartitionedDmlDoesNotTimeout() {
29132913
XGoogSpannerRequestIdTest.MethodAndRequestId[] wantStreamingValues = {
29142914
XGoogSpannerRequestIdTest.ofMethodAndRequestId(
29152915
"google.spanner.v1.Spanner/ExecuteStreamingSql",
2916-
new XGoogSpannerRequestId(dbId, channelId, 1, 1)),
2916+
new XGoogSpannerRequestId(dbId, channelId, 5, 1)),
29172917
};
29182918
xGoogReqIdInterceptor.checkExpectedStreamingXGoogRequestIds(wantStreamingValues);
29192919

29202920
XGoogSpannerRequestIdTest.MethodAndRequestId[] wantUnaryValues = {
29212921
XGoogSpannerRequestIdTest.ofMethodAndRequestId(
29222922
"google.spanner.v1.Spanner/BatchCreateSessions",
2923-
new XGoogSpannerRequestId(dbId, 0, 4, 1)),
2923+
new XGoogSpannerRequestId(dbId, 0, XGoogSpannerRequestIdTest.UNNECESSARY, 1)),
29242924
XGoogSpannerRequestIdTest.ofMethodAndRequestId(
29252925
"google.spanner.v1.Spanner/BatchCreateSessions",
2926-
new XGoogSpannerRequestId(dbId, 1, 2, 1)),
2926+
new XGoogSpannerRequestId(dbId, 1, XGoogSpannerRequestIdTest.UNNECESSARY, 1)),
29272927
XGoogSpannerRequestIdTest.ofMethodAndRequestId(
29282928
"google.spanner.v1.Spanner/BatchCreateSessions",
2929-
new XGoogSpannerRequestId(dbId, 2, 3, 1)),
2929+
new XGoogSpannerRequestId(dbId, 2, XGoogSpannerRequestIdTest.UNNECESSARY, 1)),
29302930
XGoogSpannerRequestIdTest.ofMethodAndRequestId(
29312931
"google.spanner.v1.Spanner/BatchCreateSessions",
2932-
new XGoogSpannerRequestId(dbId, 3, 1, 1)),
2932+
new XGoogSpannerRequestId(dbId, 3, XGoogSpannerRequestIdTest.UNNECESSARY, 1)),
29332933
XGoogSpannerRequestIdTest.ofMethodAndRequestId(
2934-
"google.spanner.v1.Spanner/BeginTransaction", new XGoogSpannerRequestId(dbId, 3, 1, 1)),
2934+
"google.spanner.v1.Spanner/BeginTransaction",
2935+
new XGoogSpannerRequestId(dbId, channelId, 6, 1)),
29352936
XGoogSpannerRequestIdTest.ofMethodAndRequestId(
29362937
"google.spanner.v1.Spanner/ExecuteSql",
2937-
new XGoogSpannerRequestId(dbId, channelId, 5, 1)),
2938+
new XGoogSpannerRequestId(dbId, channelId, 7, 1)),
29382939
};
29392940
xGoogReqIdInterceptor.checkExpectedUnaryXGoogRequestIds(wantUnaryValues);
29402941
}

google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343

4444
@RunWith(JUnit4.class)
4545
public class XGoogSpannerRequestIdTest {
46+
public static long UNNECESSARY = -1;
4647

4748
@Test
4849
public void testEquals() {
@@ -184,20 +185,21 @@ public MethodAndRequestId[] accumulatedStreamingValues() {
184185
public void checkExpectedUnaryXGoogRequestIds(MethodAndRequestId... wantUnaryValues) {
185186
MethodAndRequestId[] gotUnaryValues = this.accumulatedUnaryValues();
186187
sortValues(gotUnaryValues);
187-
for (int i = 0; i < gotUnaryValues.length; i++) {
188+
for (int i = 0; i < gotUnaryValues.length && false; i++) {
188189
System.out.println("\033[33misUnary: #" + i + ":: " + gotUnaryValues[i] + "\033[00m");
189190
}
190191
assertEquals(wantUnaryValues, gotUnaryValues);
191192
}
192193

193194
private void sortValues(MethodAndRequestId[] values) {
195+
massageValues(values);
194196
Arrays.sort(values, new MethodAndRequestIdComparator());
195197
}
196198

197199
public void checkExpectedStreamingXGoogRequestIds(MethodAndRequestId... wantStreamingValues) {
198200
MethodAndRequestId[] gotStreamingValues = this.accumulatedStreamingValues();
199201
sortValues(gotStreamingValues);
200-
for (int i = 0; i < gotStreamingValues.length; i++) {
202+
for (int i = 0; i < gotStreamingValues.length && false; i++) {
201203
System.out.println(
202204
"\033[32misStreaming: #" + i + ":: " + gotStreamingValues[i] + "\033[00m");
203205
}
@@ -242,6 +244,7 @@ public int compare(MethodAndRequestId mr1, MethodAndRequestId mr2) {
242244
if (cmpMethod != 0) {
243245
return cmpMethod;
244246
}
247+
245248
if (Objects.equals(mr1.requestId, mr2.requestId)) {
246249
return 0;
247250
}
@@ -252,6 +255,18 @@ public int compare(MethodAndRequestId mr1, MethodAndRequestId mr2) {
252255
}
253256
}
254257

258+
static void massageValues(MethodAndRequestId[] mreqs) {
259+
for (int i = 0; i < mreqs.length; i++) {
260+
MethodAndRequestId mreq = mreqs[i];
261+
// BatchCreateSessions is so hard to control as the round-robin doling out
262+
// hence we might need to be able to scrub the nth_request that won't match
263+
// nth_req in consecutive order of nth_client.
264+
if (mreq.method.compareTo("google.spanner.v1.Spanner/BatchCreateSessions") == 0) {
265+
mreqs[i] = new MethodAndRequestId(mreq.method, mreq.requestId.withNthRequest(UNNECESSARY));
266+
}
267+
}
268+
}
269+
255270
public static MethodAndRequestId ofMethodAndRequestId(String method, String reqId) {
256271
return new MethodAndRequestId(method, XGoogSpannerRequestId.of(reqId));
257272
}

0 commit comments

Comments
 (0)