Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1901399: Let query timeout be server side or client side, not both #2084

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,7 @@ public void addProperty(String propertyName, Object propertyValue) throws SFExce
statementParametersMap.put(propertyName, propertyValue);

if ("query_timeout".equalsIgnoreCase(propertyName)) {
// Client side implementation
queryTimeout = (Integer) propertyValue;
if (this.getSFBaseSession().getImplicitServerSideQueryTimeout()) {
// Set server parameter for supporting query timeout on async queries
statementParametersMap.put("STATEMENT_TIMEOUT_IN_SECONDS", (Integer) propertyValue);
}
}

// check if the number of session properties exceed limit
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/net/snowflake/client/core/SFStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,14 @@ public Object executeHelper(
// if timeout is set, start a thread to cancel the request after timeout
// reached.
if (this.queryTimeout > 0) {
executor = Executors.newScheduledThreadPool(1);
setTimeBomb(executor);
if (session.getImplicitServerSideQueryTimeout()) {
// Server side only query timeout
statementParametersMap.put("STATEMENT_TIMEOUT_IN_SECONDS", this.queryTimeout);
} else {
// client side only query timeout
executor = Executors.newScheduledThreadPool(1);
setTimeBomb(executor);
}
}

StmtUtil.StmtOutput stmtOutput = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -893,10 +893,12 @@ public void setAsyncQueryTimeout(int seconds) throws SQLException {
try {
if (this.sfBaseStatement != null) {
this.sfBaseStatement.addProperty("STATEMENT_TIMEOUT_IN_SECONDS", seconds);
// disable statement level query timeout to avoid override by connection parameter
this.setQueryTimeout(0);
}
} catch (SFException ex) {
throw new SnowflakeSQLException(
ex.getCause(), ex.getSqlState(), ex.getVendorCode(), ex.getParams());
null, ex.getCause(), ex.getSqlState(), ex.getVendorCode(), ex.getParams());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/
package net.snowflake.client.annotations;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@DisabledIfEnvironmentVariable(named = "SNOWFLAKE_TEST_HOST", matches = ".*.reg.local")
public @interface DontRunOnJenkins {}
126 changes: 111 additions & 15 deletions src/test/java/net/snowflake/client/jdbc/StatementLatestIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import static net.snowflake.client.jdbc.ErrorCode.ROW_DOES_NOT_EXIST;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -21,12 +22,19 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import net.snowflake.client.TestUtil;
import net.snowflake.client.annotations.DontRunOnGithubActions;
import net.snowflake.client.annotations.DontRunOnJenkins;
import net.snowflake.client.category.TestTags;
import net.snowflake.client.core.ParameterBindingDTO;
import net.snowflake.client.core.QueryStatus;
Expand Down Expand Up @@ -335,25 +343,82 @@ public void testSetQueryTimeoutForAsyncQueryUsingConnectionProperty() throws SQL

/**
* Test for setting query timeout on regular queries with the IMPLICIT_SERVER_SIDE_QUERY_TIMEOUT
* property set to true. Applicable to versions after 3.21.0.
* property set to true should rely on server only. Applicable to versions after 3.21.0. In
* version above 3.22.0 the error should be handled only on the server side.
*
* @throws SQLException if there is an error when executing
*/
@Test
public void testSetQueryTimeoutWhenAsyncConnectionPropertySet() throws SQLException {
@DontRunOnJenkins // uses too many resources on Jenkins making the test flaky
public void testSetQueryTimeoutOnStatementWhenImplicitQueryTimeoutIsSet()
throws SQLException, InterruptedException, ExecutionException {
int threads = 20;
ExecutorService executor = Executors.newFixedThreadPool(threads);
List<Future<?>> futures = new ArrayList<>();
Properties p = new Properties();
p.put("IMPLICIT_SERVER_SIDE_QUERY_TIMEOUT", true);
try (Connection con = getConnection(p);
Statement statement = con.createStatement()) {
statement.setQueryTimeout(3);
try (Connection con = getConnection(p)) {

String sql = "select seq4() from table(generator(rowcount => 1000000000))";

try {
statement.executeQuery(sql);
fail("This query should fail.");
} catch (SQLException e) {
assertEquals(SqlState.QUERY_CANCELED, e.getSQLState());
for (int i = 0; i < threads; ++i) {
futures.add(
executor.submit(
() -> {
try (Statement statement = con.createStatement()) {
statement.setQueryTimeout(3);
statement.executeQuery(sql);
fail("This query should fail.");
} catch (SQLException e) {
assertEquals(SqlState.QUERY_CANCELED, e.getSQLState());
}
}));
}
executor.shutdown();
assertTrue(executor.awaitTermination(60, TimeUnit.SECONDS));
for (Future<?> future : futures) {
assertNull(future.get());
}
}
}

/**
* Test for setting connection level query timeout on regular queries with the
* IMPLICIT_SERVER_SIDE_QUERY_TIMEOUT property set to true should rely on server only. Applicable
* to versions after 3.22.0.
*
* @throws SQLException if there is an error when executing
*/
@Test
@DontRunOnJenkins // uses too many resources on Jenkins making the test flaky
public void testSetQueryTimeoutOnConnectionWhenImplicitQueryTimeoutIsSet()
throws SQLException, InterruptedException, ExecutionException {
int threads = 20;
ExecutorService executor = Executors.newFixedThreadPool(threads);
List<Future<?>> futures = new ArrayList<>();
Properties p = new Properties();
p.put("IMPLICIT_SERVER_SIDE_QUERY_TIMEOUT", true);
p.put("queryTimeout", 3);
try (Connection con = getConnection(p)) {

String sql = "select seq4() from table(generator(rowcount => 1000000000))";

for (int i = 0; i < threads; ++i) {
futures.add(
executor.submit(
() -> {
try (Statement statement = con.createStatement()) {
statement.executeQuery(sql);
fail("This query should fail.");
} catch (SQLException e) {
assertEquals(SqlState.QUERY_CANCELED, e.getSQLState());
}
}));
}
executor.shutdown();
assertTrue(executor.awaitTermination(60, TimeUnit.SECONDS));
for (Future<?> future : futures) {
assertNull(future.get());
}
}
}
Expand All @@ -378,11 +443,42 @@ public void testSetQueryTimeoutForAsyncQuery() throws SQLException {
.atMost(Duration.ofSeconds(10))
.until(() -> sfrs.getStatusV2().getStatus() == QueryStatus.FAILED_WITH_ERROR);

assertTrue(
sfrs.getStatusV2()
.getErrorMessage()
.contains(
"Statement reached its statement or warehouse timeout of 3 second(s) and was canceled"));
assertThat(
sfrs.getStatusV2().getErrorMessage(),
containsString(
"Statement reached its statement or warehouse timeout of 3 second(s) and was canceled"));
}
}
}

/**
* Applicable to versions after 3.22.0.
*
* @throws SQLException if there is an error when executing
*/
@Test
public void testSetAsyncQueryTimeoutOverridesConnectionQueryTimeoutForAsyncQuery()
throws SQLException {
Properties p = new Properties();
p.put("IMPLICIT_SERVER_SIDE_QUERY_TIMEOUT", true);
p.put("queryTimeout", 1);
try (Connection con = getConnection(p);
Statement statement = con.createStatement()) {
SnowflakeStatement sfStmt = statement.unwrap(SnowflakeStatement.class);
sfStmt.setAsyncQueryTimeout(3);

String sql = "select seq4() from table(generator(rowcount => 1000000000))";

try (ResultSet resultSet = sfStmt.executeAsyncQuery(sql)) {
SnowflakeResultSet sfrs = resultSet.unwrap(SnowflakeResultSet.class);
await()
.atMost(Duration.ofSeconds(10))
.until(() -> sfrs.getStatusV2().getStatus() == QueryStatus.FAILED_WITH_ERROR);

assertThat(
sfrs.getStatusV2().getErrorMessage(),
containsString(
"Statement reached its statement or warehouse timeout of 3 second(s) and was canceled"));
}
}
}
Expand Down
Loading