diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java
index 11f2702c3cf..f1844c6aba1 100644
--- a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java
+++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java
@@ -988,7 +988,13 @@ public enum DefaultDriverOption implements DriverOption {
*
Value type: {@link java.util.List List}<{@link String}>
*/
LOAD_BALANCING_DC_FAILOVER_PREFERRED_REMOTE_DCS(
- "advanced.load-balancing-policy.dc-failover.preferred-remote-dcs");
+ "advanced.load-balancing-policy.dc-failover.preferred-remote-dcs"),
+ /**
+ * Report trace for every page fetch request
+ *
+ *
Value-type: {@link Boolean}
+ */
+ REQUEST_TRACE_REPORT_EVERY_PAGE_FETCH("advanced.request.trace.report-every-page-fetch");
private final String path;
diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java
index 98faf3e590c..aba86753d69 100644
--- a/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java
+++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java
@@ -289,6 +289,7 @@ protected static void fillWithDriverDefaults(OptionsMap map) {
map.put(TypedDriverOption.REQUEST_WARN_IF_SET_KEYSPACE, true);
map.put(TypedDriverOption.REQUEST_TRACE_ATTEMPTS, 5);
map.put(TypedDriverOption.REQUEST_TRACE_INTERVAL, Duration.ofMillis(3));
+ map.put(TypedDriverOption.REQUEST_TRACE_REPORT_EVERY_PAGE_FETCH, true);
map.put(TypedDriverOption.REQUEST_TRACE_CONSISTENCY, "ONE");
map.put(TypedDriverOption.REQUEST_LOG_WARNINGS, true);
map.put(TypedDriverOption.GRAPH_PAGING_ENABLED, "AUTO");
diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java
index ca60b67f0ba..e57f452e225 100644
--- a/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java
+++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java
@@ -396,6 +396,10 @@ public String toString() {
/** The consistency level to use for trace queries. */
public static final TypedDriverOption REQUEST_TRACE_CONSISTENCY =
new TypedDriverOption<>(DefaultDriverOption.REQUEST_TRACE_CONSISTENCY, GenericType.STRING);
+ /** Report trace for every page fetch request */
+ public static final TypedDriverOption REQUEST_TRACE_REPORT_EVERY_PAGE_FETCH =
+ new TypedDriverOption<>(
+ DefaultDriverOption.REQUEST_TRACE_REPORT_EVERY_PAGE_FETCH, GenericType.BOOLEAN);
/** Whether or not to publish aggregable histogram for metrics */
public static final TypedDriverOption METRICS_GENERATE_AGGREGABLE_HISTOGRAMS =
new TypedDriverOption<>(
diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/Conversions.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/Conversions.java
index ff9384b3e24..078f8bb3c73 100644
--- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/Conversions.java
+++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/Conversions.java
@@ -320,13 +320,14 @@ public static AsyncResultSet toResultSet(
Result result,
ExecutionInfo executionInfo,
CqlSession session,
- InternalDriverContext context) {
+ InternalDriverContext context,
+ DriverExecutionProfile executionProfile) {
if (result instanceof Rows) {
Rows rows = (Rows) result;
Statement> statement = (Statement>) executionInfo.getRequest();
ColumnDefinitions columnDefinitions = getResultDefinitions(rows, statement, context);
return new DefaultAsyncResultSet(
- columnDefinitions, executionInfo, rows.getData(), session, context);
+ columnDefinitions, executionInfo, rows.getData(), session, context, executionProfile);
} else if (result instanceof Prepared) {
// This should never happen
throw new IllegalArgumentException("Unexpected PREPARED response to a CQL query");
diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java
index 0808bdce63f..a1e9cfac4dd 100644
--- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java
+++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java
@@ -321,7 +321,7 @@ private void setFinalResult(
ExecutionInfo executionInfo =
buildExecutionInfo(callback, resultMessage, responseFrame, schemaInAgreement);
AsyncResultSet resultSet =
- Conversions.toResultSet(resultMessage, executionInfo, session, context);
+ Conversions.toResultSet(resultMessage, executionInfo, session, context, executionProfile);
if (result.complete(resultSet)) {
cancelScheduledTasks();
throttler.signalSuccess(this);
diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultAsyncResultSet.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultAsyncResultSet.java
index 243e9aeb775..25208be2dec 100644
--- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultAsyncResultSet.java
+++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultAsyncResultSet.java
@@ -18,6 +18,8 @@
package com.datastax.oss.driver.internal.core.cql;
import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
@@ -43,6 +45,7 @@ public class DefaultAsyncResultSet implements AsyncResultSet {
private final ColumnDefinitions definitions;
private final ExecutionInfo executionInfo;
+ private final DriverExecutionProfile executionProfile;
private final CqlSession session;
private final CountingIterator iterator;
private final Iterable currentPage;
@@ -52,9 +55,11 @@ public DefaultAsyncResultSet(
ExecutionInfo executionInfo,
Queue> data,
CqlSession session,
- InternalDriverContext context) {
+ InternalDriverContext context,
+ DriverExecutionProfile executionProfile) {
this.definitions = definitions;
this.executionInfo = executionInfo;
+ this.executionProfile = executionProfile;
this.session = session;
this.iterator =
new CountingIterator(data.size()) {
@@ -106,6 +111,11 @@ public CompletionStage fetchNextPage() throws IllegalStateExcept
Statement> statement = (Statement>) executionInfo.getRequest();
LOG.trace("Fetching next page for {}", statement);
Statement> nextStatement = statement.copy(nextState);
+ if (!executionProfile.getBoolean(DefaultDriverOption.REQUEST_TRACE_REPORT_EVERY_PAGE_FETCH)
+ && nextStatement.isTracing()) {
+ // report traces only for first page
+ nextStatement = nextStatement.setTracing(false);
+ }
return session.executeAsync(nextStatement);
}
diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf
index 7b1c43f8bea..19ff1f706ce 100644
--- a/core/src/main/resources/reference.conf
+++ b/core/src/main/resources/reference.conf
@@ -1153,6 +1153,14 @@ datastax-java-driver {
# Modifiable at runtime: yes, the new value will be used for traces fetched after the change.
# Overridable in a profile: yes
consistency = ONE
+
+ # Report trace for every page fetch request. If disabled, only one trace entry will be present
+ # on server side even if client application pages through a large partition.
+ #
+ # Required: yes
+ # Modifiable at runtime: yes, the new value will be used for traces fetched after the change.
+ # Overridable in a profile: yes
+ report-every-page-fetch = true
}
# Whether logging of server warnings generated during query execution should be disabled by the
diff --git a/core/src/test/java/com/datastax/dse/driver/internal/core/cql/reactive/CqlRequestReactiveProcessorTest.java b/core/src/test/java/com/datastax/dse/driver/internal/core/cql/reactive/CqlRequestReactiveProcessorTest.java
index a7a6bced9e8..118a737be16 100644
--- a/core/src/test/java/com/datastax/dse/driver/internal/core/cql/reactive/CqlRequestReactiveProcessorTest.java
+++ b/core/src/test/java/com/datastax/dse/driver/internal/core/cql/reactive/CqlRequestReactiveProcessorTest.java
@@ -29,6 +29,7 @@
import com.datastax.dse.driver.api.core.cql.reactive.ReactiveResultSet;
import com.datastax.dse.driver.api.core.cql.reactive.ReactiveRow;
import com.datastax.oss.driver.api.core.ProtocolVersion;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
@@ -135,6 +136,7 @@ public void should_complete_multi_page_result(ProtocolVersion version) {
CompletableFuture page2Future = new CompletableFuture<>();
when(session.executeAsync(any(Statement.class))).thenAnswer(invocation -> page2Future);
ExecutionInfo mockInfo = mock(ExecutionInfo.class);
+ DriverExecutionProfile mockExecutionProfile = mock(DriverExecutionProfile.class);
ReactiveResultSet publisher =
new CqlRequestReactiveProcessor(new CqlRequestAsyncProcessor())
@@ -152,7 +154,8 @@ public void should_complete_multi_page_result(ProtocolVersion version) {
DseTestFixtures.tenDseRows(2, true),
mockInfo,
harness.getSession(),
- harness.getContext()));
+ harness.getContext(),
+ mockExecutionProfile));
List rows = rowsPublisher.toList().blockingGet();
assertThat(rows).hasSize(20);
diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/AsyncPagingIterableWrapperTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/AsyncPagingIterableWrapperTest.java
index dff9877b62d..3556490e470 100644
--- a/core/src/test/java/com/datastax/oss/driver/internal/core/AsyncPagingIterableWrapperTest.java
+++ b/core/src/test/java/com/datastax/oss/driver/internal/core/AsyncPagingIterableWrapperTest.java
@@ -24,6 +24,7 @@
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.DefaultProtocolVersion;
import com.datastax.oss.driver.api.core.MappedAsyncPagingIterable;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.ColumnDefinition;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
@@ -50,6 +51,7 @@ public class AsyncPagingIterableWrapperTest {
@Mock private Statement> statement;
@Mock private CqlSession session;
@Mock private InternalDriverContext context;
+ @Mock private DriverExecutionProfile executionProfile;
@Before
public void setup() {
@@ -74,10 +76,15 @@ public void should_wrap_result_set() throws Exception {
ExecutionInfo executionInfo1 = mockExecutionInfo();
DefaultAsyncResultSet resultSet1 =
new DefaultAsyncResultSet(
- columnDefinitions, executionInfo1, mockData(0, 5), session, context);
+ columnDefinitions, executionInfo1, mockData(0, 5), session, context, executionProfile);
DefaultAsyncResultSet resultSet2 =
new DefaultAsyncResultSet(
- columnDefinitions, mockExecutionInfo(), mockData(5, 10), session, context);
+ columnDefinitions,
+ mockExecutionInfo(),
+ mockData(5, 10),
+ session,
+ context,
+ executionProfile);
// chain them together:
ByteBuffer mockPagingState = ByteBuffer.allocate(0);
when(executionInfo1.getPagingState()).thenReturn(mockPagingState);
@@ -111,7 +118,12 @@ public void should_share_iteration_progress_with_wrapped_result_set() {
// Given
DefaultAsyncResultSet resultSet =
new DefaultAsyncResultSet(
- columnDefinitions, mockExecutionInfo(), mockData(0, 10), session, context);
+ columnDefinitions,
+ mockExecutionInfo(),
+ mockData(0, 10),
+ session,
+ context,
+ executionProfile);
// When
MappedAsyncPagingIterable iterable = resultSet.map(row -> row.getInt("i"));
diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/DefaultAsyncResultSetTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/DefaultAsyncResultSetTest.java
index 8ed509caeb7..b3cff9093ae 100644
--- a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/DefaultAsyncResultSetTest.java
+++ b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/DefaultAsyncResultSetTest.java
@@ -26,6 +26,7 @@
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.DefaultProtocolVersion;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.ColumnDefinition;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
@@ -55,6 +56,7 @@ public class DefaultAsyncResultSetTest {
@Mock private Statement> statement;
@Mock private CqlSession session;
@Mock private InternalDriverContext context;
+ @Mock private DriverExecutionProfile executionProfile;
@Before
public void setup() {
@@ -73,7 +75,12 @@ public void should_fail_to_fetch_next_page_if_last() {
// When
DefaultAsyncResultSet resultSet =
new DefaultAsyncResultSet(
- columnDefinitions, executionInfo, new ArrayDeque<>(), session, context);
+ columnDefinitions,
+ executionInfo,
+ new ArrayDeque<>(),
+ session,
+ context,
+ executionProfile);
// Then
assertThat(resultSet.hasMorePages()).isFalse();
@@ -95,7 +102,12 @@ public void should_invoke_session_to_fetch_next_page() {
// When
DefaultAsyncResultSet resultSet =
new DefaultAsyncResultSet(
- columnDefinitions, executionInfo, new ArrayDeque<>(), session, context);
+ columnDefinitions,
+ executionInfo,
+ new ArrayDeque<>(),
+ session,
+ context,
+ executionProfile);
assertThat(resultSet.hasMorePages()).isTrue();
CompletionStage nextPageFuture = resultSet.fetchNextPage();
@@ -113,7 +125,12 @@ public void should_report_applied_if_column_not_present_and_empty() {
// When
DefaultAsyncResultSet resultSet =
new DefaultAsyncResultSet(
- columnDefinitions, executionInfo, new ArrayDeque<>(), session, context);
+ columnDefinitions,
+ executionInfo,
+ new ArrayDeque<>(),
+ session,
+ context,
+ executionProfile);
// Then
assertThat(resultSet.wasApplied()).isTrue();
@@ -128,7 +145,8 @@ public void should_report_applied_if_column_not_present_and_not_empty() {
// When
DefaultAsyncResultSet resultSet =
- new DefaultAsyncResultSet(columnDefinitions, executionInfo, data, session, context);
+ new DefaultAsyncResultSet(
+ columnDefinitions, executionInfo, data, session, context, executionProfile);
// Then
assertThat(resultSet.wasApplied()).isTrue();
@@ -149,7 +167,8 @@ public void should_report_not_applied_if_column_present_and_false() {
// When
DefaultAsyncResultSet resultSet =
- new DefaultAsyncResultSet(columnDefinitions, executionInfo, data, session, context);
+ new DefaultAsyncResultSet(
+ columnDefinitions, executionInfo, data, session, context, executionProfile);
// Then
assertThat(resultSet.wasApplied()).isFalse();
@@ -170,7 +189,8 @@ public void should_report_not_applied_if_column_present_and_true() {
// When
DefaultAsyncResultSet resultSet =
- new DefaultAsyncResultSet(columnDefinitions, executionInfo, data, session, context);
+ new DefaultAsyncResultSet(
+ columnDefinitions, executionInfo, data, session, context, executionProfile);
// Then
assertThat(resultSet.wasApplied()).isTrue();
@@ -187,7 +207,12 @@ public void should_fail_to_report_if_applied_if_column_present_but_empty() {
// When
DefaultAsyncResultSet resultSet =
new DefaultAsyncResultSet(
- columnDefinitions, executionInfo, new ArrayDeque<>(), session, context);
+ columnDefinitions,
+ executionInfo,
+ new ArrayDeque<>(),
+ session,
+ context,
+ executionProfile);
// Then
resultSet.wasApplied();
diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/QueryTraceIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/QueryTraceIT.java
index 37a600efbc4..d12c321a496 100644
--- a/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/QueryTraceIT.java
+++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/cql/QueryTraceIT.java
@@ -19,20 +19,31 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.awaitility.Awaitility.await;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.Version;
+import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
+import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.QueryTrace;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.metadata.EndPoint;
import com.datastax.oss.driver.api.testinfra.ccm.CcmRule;
import com.datastax.oss.driver.api.testinfra.requirement.BackendType;
import com.datastax.oss.driver.api.testinfra.session.SessionRule;
+import com.datastax.oss.driver.api.testinfra.session.SessionUtils;
import com.datastax.oss.driver.categories.ParallelizableTests;
+import com.datastax.oss.driver.internal.core.util.Strings;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -46,8 +57,17 @@ public class QueryTraceIT {
private static final SessionRule SESSION_RULE = SessionRule.builder(CCM_RULE).build();
+ private static final SessionRule SESSION_RULE_SINGLE_TRACE =
+ SessionRule.builder(CCM_RULE)
+ .withConfigLoader(
+ SessionUtils.configLoaderBuilder()
+ .withBoolean(DefaultDriverOption.REQUEST_TRACE_REPORT_EVERY_PAGE_FETCH, false)
+ .build())
+ .build();
+
@ClassRule
- public static final TestRule CHAIN = RuleChain.outerRule(CCM_RULE).around(SESSION_RULE);
+ public static final TestRule CHAIN =
+ RuleChain.outerRule(CCM_RULE).around(SESSION_RULE).around(SESSION_RULE_SINGLE_TRACE);
@Test
public void should_not_have_tracing_id_when_tracing_disabled() {
@@ -131,4 +151,78 @@ public void should_fetch_trace_when_tracing_enabled() {
assertThat(sourceAddress0.getPort()).isEqualTo(0);
}
}
+
+ @Test
+ public void should_report_trace_once_during_pagination() {
+ testTraceDuringPagination(SESSION_RULE_SINGLE_TRACE, 1);
+ }
+
+ @Test
+ public void should_report_trace_multiple_during_pagination() {
+ testTraceDuringPagination(SESSION_RULE, 6);
+ }
+
+ private void testTraceDuringPagination(
+ SessionRule sessionRule, int traceEventsCount) {
+ String key = setupPaginationTable(sessionRule);
+
+ String cql = "SELECT v0, v1 FROM trace_pagination WHERE k = ?";
+ SimpleStatement query = SimpleStatement.builder(cql).setTracing().setPageSize(2).build();
+ PreparedStatement preparedStatement = sessionRule.session().prepare(query);
+ ResultSet resultSet = sessionRule.session().execute(preparedStatement.bind(key));
+
+ ExecutionInfo executionInfo = resultSet.getExecutionInfo();
+ assertThat(executionInfo.getTracingId()).isNotNull();
+ QueryTrace queryTrace = executionInfo.getQueryTrace();
+ assertThat(queryTrace.getTracingId()).isEqualTo(executionInfo.getTracingId());
+ assertThat(queryTrace.getRequestType()).isEqualTo("Execute CQL3 prepared query");
+
+ Iterator iterator = resultSet.iterator();
+ while (iterator.hasNext()) {
+ iterator.next(); // iterate over several pages
+ }
+
+ // assert that only one event for tracing has been recorded
+ await()
+ .untilAsserted(
+ () -> {
+ List rows =
+ sessionRule.session().execute("SELECT * FROM system_traces.sessions").all()
+ .stream()
+ .filter(row -> isTraceForQuery(row, cql, key))
+ .collect(Collectors.toList());
+ assertThat(rows).hasSize(traceEventsCount);
+ });
+ }
+
+ private String setupPaginationTable(SessionRule sessionRule) {
+ String key = UUID.randomUUID().toString();
+ sessionRule
+ .session()
+ .execute(
+ SimpleStatement.builder(
+ "CREATE TABLE IF NOT EXISTS trace_pagination (k text, v0 int, v1 int, PRIMARY KEY(k, v0))")
+ .setExecutionProfile(sessionRule.slowProfile())
+ .build());
+ for (int i = 0; i < 10; i++) {
+ sessionRule
+ .session()
+ .execute(
+ SimpleStatement.builder("INSERT INTO trace_pagination (k, v0, v1) VALUES (?, ?, ?)")
+ .addPositionalValues(key, i, i)
+ .build());
+ }
+ return key;
+ }
+
+ private static boolean isTraceForQuery(Row row, String cql, String key) {
+ if (!row.getColumnDefinitions().contains("parameters")) {
+ return false;
+ }
+ Map queryParams = row.getMap("parameters", String.class, String.class);
+ if (queryParams == null || !queryParams.containsKey("query")) {
+ return false;
+ }
+ return queryParams.get("query").contains(cql) && queryParams.containsValue(Strings.quote(key));
+ }
}