Skip to content

Commit a447fbf

Browse files
Option to report trace only once during pagination
1 parent 342e2dc commit a447fbf

File tree

11 files changed

+181
-17
lines changed

11 files changed

+181
-17
lines changed

core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -988,7 +988,13 @@ public enum DefaultDriverOption implements DriverOption {
988988
* <p>Value type: {@link java.util.List List}&#60;{@link String}&#62;
989989
*/
990990
LOAD_BALANCING_DC_FAILOVER_PREFERRED_REMOTE_DCS(
991-
"advanced.load-balancing-policy.dc-failover.preferred-remote-dcs");
991+
"advanced.load-balancing-policy.dc-failover.preferred-remote-dcs"),
992+
/**
993+
* Report trace for every page fetch request
994+
*
995+
* <p>Value-type: {@link Boolean}
996+
*/
997+
REQUEST_TRACE_REPORT_EVERY_PAGE_FETCH("advanced.request.trace.report-every-page-fetch");
992998

993999
private final String path;
9941000

core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java

+1
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ protected static void fillWithDriverDefaults(OptionsMap map) {
289289
map.put(TypedDriverOption.REQUEST_WARN_IF_SET_KEYSPACE, true);
290290
map.put(TypedDriverOption.REQUEST_TRACE_ATTEMPTS, 5);
291291
map.put(TypedDriverOption.REQUEST_TRACE_INTERVAL, Duration.ofMillis(3));
292+
map.put(TypedDriverOption.REQUEST_TRACE_REPORT_EVERY_PAGE_FETCH, true);
292293
map.put(TypedDriverOption.REQUEST_TRACE_CONSISTENCY, "ONE");
293294
map.put(TypedDriverOption.REQUEST_LOG_WARNINGS, true);
294295
map.put(TypedDriverOption.GRAPH_PAGING_ENABLED, "AUTO");

core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java

+4
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,10 @@ public String toString() {
396396
/** The consistency level to use for trace queries. */
397397
public static final TypedDriverOption<String> REQUEST_TRACE_CONSISTENCY =
398398
new TypedDriverOption<>(DefaultDriverOption.REQUEST_TRACE_CONSISTENCY, GenericType.STRING);
399+
/** Report trace for every page fetch request */
400+
public static final TypedDriverOption<Boolean> REQUEST_TRACE_REPORT_EVERY_PAGE_FETCH =
401+
new TypedDriverOption<>(
402+
DefaultDriverOption.REQUEST_TRACE_REPORT_EVERY_PAGE_FETCH, GenericType.BOOLEAN);
399403
/** Whether or not to publish aggregable histogram for metrics */
400404
public static final TypedDriverOption<Boolean> METRICS_GENERATE_AGGREGABLE_HISTOGRAMS =
401405
new TypedDriverOption<>(

core/src/main/java/com/datastax/oss/driver/internal/core/cql/Conversions.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -320,13 +320,14 @@ public static AsyncResultSet toResultSet(
320320
Result result,
321321
ExecutionInfo executionInfo,
322322
CqlSession session,
323-
InternalDriverContext context) {
323+
InternalDriverContext context,
324+
DriverExecutionProfile executionProfile) {
324325
if (result instanceof Rows) {
325326
Rows rows = (Rows) result;
326327
Statement<?> statement = (Statement<?>) executionInfo.getRequest();
327328
ColumnDefinitions columnDefinitions = getResultDefinitions(rows, statement, context);
328329
return new DefaultAsyncResultSet(
329-
columnDefinitions, executionInfo, rows.getData(), session, context);
330+
columnDefinitions, executionInfo, rows.getData(), session, context, executionProfile);
330331
} else if (result instanceof Prepared) {
331332
// This should never happen
332333
throw new IllegalArgumentException("Unexpected PREPARED response to a CQL query");

core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ private void setFinalResult(
321321
ExecutionInfo executionInfo =
322322
buildExecutionInfo(callback, resultMessage, responseFrame, schemaInAgreement);
323323
AsyncResultSet resultSet =
324-
Conversions.toResultSet(resultMessage, executionInfo, session, context);
324+
Conversions.toResultSet(resultMessage, executionInfo, session, context, executionProfile);
325325
if (result.complete(resultSet)) {
326326
cancelScheduledTasks();
327327
throttler.signalSuccess(this);

core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultAsyncResultSet.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package com.datastax.oss.driver.internal.core.cql;
1919

2020
import com.datastax.oss.driver.api.core.CqlSession;
21+
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
22+
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
2123
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
2224
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
2325
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
@@ -43,6 +45,7 @@ public class DefaultAsyncResultSet implements AsyncResultSet {
4345

4446
private final ColumnDefinitions definitions;
4547
private final ExecutionInfo executionInfo;
48+
private final DriverExecutionProfile executionProfile;
4649
private final CqlSession session;
4750
private final CountingIterator<Row> iterator;
4851
private final Iterable<Row> currentPage;
@@ -52,9 +55,11 @@ public DefaultAsyncResultSet(
5255
ExecutionInfo executionInfo,
5356
Queue<List<ByteBuffer>> data,
5457
CqlSession session,
55-
InternalDriverContext context) {
58+
InternalDriverContext context,
59+
DriverExecutionProfile executionProfile) {
5660
this.definitions = definitions;
5761
this.executionInfo = executionInfo;
62+
this.executionProfile = executionProfile;
5863
this.session = session;
5964
this.iterator =
6065
new CountingIterator<Row>(data.size()) {
@@ -106,6 +111,11 @@ public CompletionStage<AsyncResultSet> fetchNextPage() throws IllegalStateExcept
106111
Statement<?> statement = (Statement<?>) executionInfo.getRequest();
107112
LOG.trace("Fetching next page for {}", statement);
108113
Statement<?> nextStatement = statement.copy(nextState);
114+
if (!executionProfile.getBoolean(DefaultDriverOption.REQUEST_TRACE_REPORT_EVERY_PAGE_FETCH)
115+
&& nextStatement.isTracing()) {
116+
// report traces only for first page
117+
nextStatement = nextStatement.setTracing(false);
118+
}
109119
return session.executeAsync(nextStatement);
110120
}
111121

core/src/main/resources/reference.conf

+8
Original file line numberDiff line numberDiff line change
@@ -1153,6 +1153,14 @@ datastax-java-driver {
11531153
# Modifiable at runtime: yes, the new value will be used for traces fetched after the change.
11541154
# Overridable in a profile: yes
11551155
consistency = ONE
1156+
1157+
# Report trace for every page fetch request. If disabled, only one trace entry will be present
1158+
# on server side even if client application pages through a large partition.
1159+
#
1160+
# Required: yes
1161+
# Modifiable at runtime: yes, the new value will be used for traces fetched after the change.
1162+
# Overridable in a profile: yes
1163+
report-every-page-fetch = true
11561164
}
11571165

11581166
# Whether logging of server warnings generated during query execution should be disabled by the

core/src/test/java/com/datastax/dse/driver/internal/core/cql/reactive/CqlRequestReactiveProcessorTest.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.datastax.dse.driver.api.core.cql.reactive.ReactiveResultSet;
3030
import com.datastax.dse.driver.api.core.cql.reactive.ReactiveRow;
3131
import com.datastax.oss.driver.api.core.ProtocolVersion;
32+
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
3233
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
3334
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
3435
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
@@ -135,6 +136,7 @@ public void should_complete_multi_page_result(ProtocolVersion version) {
135136
CompletableFuture<AsyncResultSet> page2Future = new CompletableFuture<>();
136137
when(session.executeAsync(any(Statement.class))).thenAnswer(invocation -> page2Future);
137138
ExecutionInfo mockInfo = mock(ExecutionInfo.class);
139+
DriverExecutionProfile mockExecutionProfile = mock(DriverExecutionProfile.class);
138140

139141
ReactiveResultSet publisher =
140142
new CqlRequestReactiveProcessor(new CqlRequestAsyncProcessor())
@@ -152,7 +154,8 @@ public void should_complete_multi_page_result(ProtocolVersion version) {
152154
DseTestFixtures.tenDseRows(2, true),
153155
mockInfo,
154156
harness.getSession(),
155-
harness.getContext()));
157+
harness.getContext(),
158+
mockExecutionProfile));
156159

157160
List<ReactiveRow> rows = rowsPublisher.toList().blockingGet();
158161
assertThat(rows).hasSize(20);

core/src/test/java/com/datastax/oss/driver/internal/core/AsyncPagingIterableWrapperTest.java

+15-3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.datastax.oss.driver.api.core.CqlSession;
2525
import com.datastax.oss.driver.api.core.DefaultProtocolVersion;
2626
import com.datastax.oss.driver.api.core.MappedAsyncPagingIterable;
27+
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
2728
import com.datastax.oss.driver.api.core.cql.ColumnDefinition;
2829
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
2930
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
@@ -50,6 +51,7 @@ public class AsyncPagingIterableWrapperTest {
5051
@Mock private Statement<?> statement;
5152
@Mock private CqlSession session;
5253
@Mock private InternalDriverContext context;
54+
@Mock private DriverExecutionProfile executionProfile;
5355

5456
@Before
5557
public void setup() {
@@ -74,10 +76,15 @@ public void should_wrap_result_set() throws Exception {
7476
ExecutionInfo executionInfo1 = mockExecutionInfo();
7577
DefaultAsyncResultSet resultSet1 =
7678
new DefaultAsyncResultSet(
77-
columnDefinitions, executionInfo1, mockData(0, 5), session, context);
79+
columnDefinitions, executionInfo1, mockData(0, 5), session, context, executionProfile);
7880
DefaultAsyncResultSet resultSet2 =
7981
new DefaultAsyncResultSet(
80-
columnDefinitions, mockExecutionInfo(), mockData(5, 10), session, context);
82+
columnDefinitions,
83+
mockExecutionInfo(),
84+
mockData(5, 10),
85+
session,
86+
context,
87+
executionProfile);
8188
// chain them together:
8289
ByteBuffer mockPagingState = ByteBuffer.allocate(0);
8390
when(executionInfo1.getPagingState()).thenReturn(mockPagingState);
@@ -111,7 +118,12 @@ public void should_share_iteration_progress_with_wrapped_result_set() {
111118
// Given
112119
DefaultAsyncResultSet resultSet =
113120
new DefaultAsyncResultSet(
114-
columnDefinitions, mockExecutionInfo(), mockData(0, 10), session, context);
121+
columnDefinitions,
122+
mockExecutionInfo(),
123+
mockData(0, 10),
124+
session,
125+
context,
126+
executionProfile);
115127

116128
// When
117129
MappedAsyncPagingIterable<Integer> iterable = resultSet.map(row -> row.getInt("i"));

core/src/test/java/com/datastax/oss/driver/internal/core/cql/DefaultAsyncResultSetTest.java

+32-7
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import com.datastax.oss.driver.api.core.CqlSession;
2828
import com.datastax.oss.driver.api.core.DefaultProtocolVersion;
29+
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
2930
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
3031
import com.datastax.oss.driver.api.core.cql.ColumnDefinition;
3132
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
@@ -55,6 +56,7 @@ public class DefaultAsyncResultSetTest {
5556
@Mock private Statement<?> statement;
5657
@Mock private CqlSession session;
5758
@Mock private InternalDriverContext context;
59+
@Mock private DriverExecutionProfile executionProfile;
5860

5961
@Before
6062
public void setup() {
@@ -73,7 +75,12 @@ public void should_fail_to_fetch_next_page_if_last() {
7375
// When
7476
DefaultAsyncResultSet resultSet =
7577
new DefaultAsyncResultSet(
76-
columnDefinitions, executionInfo, new ArrayDeque<>(), session, context);
78+
columnDefinitions,
79+
executionInfo,
80+
new ArrayDeque<>(),
81+
session,
82+
context,
83+
executionProfile);
7784

7885
// Then
7986
assertThat(resultSet.hasMorePages()).isFalse();
@@ -95,7 +102,12 @@ public void should_invoke_session_to_fetch_next_page() {
95102
// When
96103
DefaultAsyncResultSet resultSet =
97104
new DefaultAsyncResultSet(
98-
columnDefinitions, executionInfo, new ArrayDeque<>(), session, context);
105+
columnDefinitions,
106+
executionInfo,
107+
new ArrayDeque<>(),
108+
session,
109+
context,
110+
executionProfile);
99111
assertThat(resultSet.hasMorePages()).isTrue();
100112
CompletionStage<AsyncResultSet> nextPageFuture = resultSet.fetchNextPage();
101113

@@ -113,7 +125,12 @@ public void should_report_applied_if_column_not_present_and_empty() {
113125
// When
114126
DefaultAsyncResultSet resultSet =
115127
new DefaultAsyncResultSet(
116-
columnDefinitions, executionInfo, new ArrayDeque<>(), session, context);
128+
columnDefinitions,
129+
executionInfo,
130+
new ArrayDeque<>(),
131+
session,
132+
context,
133+
executionProfile);
117134

118135
// Then
119136
assertThat(resultSet.wasApplied()).isTrue();
@@ -128,7 +145,8 @@ public void should_report_applied_if_column_not_present_and_not_empty() {
128145

129146
// When
130147
DefaultAsyncResultSet resultSet =
131-
new DefaultAsyncResultSet(columnDefinitions, executionInfo, data, session, context);
148+
new DefaultAsyncResultSet(
149+
columnDefinitions, executionInfo, data, session, context, executionProfile);
132150

133151
// Then
134152
assertThat(resultSet.wasApplied()).isTrue();
@@ -149,7 +167,8 @@ public void should_report_not_applied_if_column_present_and_false() {
149167

150168
// When
151169
DefaultAsyncResultSet resultSet =
152-
new DefaultAsyncResultSet(columnDefinitions, executionInfo, data, session, context);
170+
new DefaultAsyncResultSet(
171+
columnDefinitions, executionInfo, data, session, context, executionProfile);
153172

154173
// Then
155174
assertThat(resultSet.wasApplied()).isFalse();
@@ -170,7 +189,8 @@ public void should_report_not_applied_if_column_present_and_true() {
170189

171190
// When
172191
DefaultAsyncResultSet resultSet =
173-
new DefaultAsyncResultSet(columnDefinitions, executionInfo, data, session, context);
192+
new DefaultAsyncResultSet(
193+
columnDefinitions, executionInfo, data, session, context, executionProfile);
174194

175195
// Then
176196
assertThat(resultSet.wasApplied()).isTrue();
@@ -187,7 +207,12 @@ public void should_fail_to_report_if_applied_if_column_present_but_empty() {
187207
// When
188208
DefaultAsyncResultSet resultSet =
189209
new DefaultAsyncResultSet(
190-
columnDefinitions, executionInfo, new ArrayDeque<>(), session, context);
210+
columnDefinitions,
211+
executionInfo,
212+
new ArrayDeque<>(),
213+
session,
214+
context,
215+
executionProfile);
191216

192217
// Then
193218
resultSet.wasApplied();

0 commit comments

Comments
 (0)