From 8ccbc844f6fb757417639d2ada33dd0a5b96a7ca Mon Sep 17 00:00:00 2001 From: Dominik Przybysz Date: Wed, 19 Feb 2025 08:56:07 +0100 Subject: [PATCH] SNOW-1901399: Let query timeout be server side or client side, not both --- .../client/core/SFBaseStatement.java | 5 - .../snowflake/client/core/SFStatement.java | 10 +- .../client/jdbc/SnowflakeStatementV1.java | 4 +- .../client/jdbc/StatementLatestIT.java | 126 +++++++++++++++--- 4 files changed, 122 insertions(+), 23 deletions(-) diff --git a/src/main/java/net/snowflake/client/core/SFBaseStatement.java b/src/main/java/net/snowflake/client/core/SFBaseStatement.java index 33c30aba5..52980c032 100644 --- a/src/main/java/net/snowflake/client/core/SFBaseStatement.java +++ b/src/main/java/net/snowflake/client/core/SFBaseStatement.java @@ -40,12 +40,7 @@ public void addProperty(String propertyName, Object propertyValue) throws SFExce statementParametersMap.put(propertyName, propertyValue); if ("query_timeout".equalsIgnoreCase(propertyName)) { - // Client side implementation queryTimeout = (Integer) propertyValue; - if (this.getSFBaseSession().getImplicitServerSideQueryTimeout()) { - // Set server parameter for supporting query timeout on async queries - statementParametersMap.put("STATEMENT_TIMEOUT_IN_SECONDS", (Integer) propertyValue); - } } // check if the number of session properties exceed limit diff --git a/src/main/java/net/snowflake/client/core/SFStatement.java b/src/main/java/net/snowflake/client/core/SFStatement.java index 0d0668e27..5d85a1084 100644 --- a/src/main/java/net/snowflake/client/core/SFStatement.java +++ b/src/main/java/net/snowflake/client/core/SFStatement.java @@ -487,8 +487,14 @@ public Object executeHelper( // if timeout is set, start a thread to cancel the request after timeout // reached. if (this.queryTimeout > 0) { - executor = Executors.newScheduledThreadPool(1); - setTimeBomb(executor); + if (session.getImplicitServerSideQueryTimeout()) { + // Server side only query timeout + statementParametersMap.put("STATEMENT_TIMEOUT_IN_SECONDS", this.queryTimeout); + } else { + // client side only query timeout + executor = Executors.newScheduledThreadPool(1); + setTimeBomb(executor); + } } StmtUtil.StmtOutput stmtOutput = null; diff --git a/src/main/java/net/snowflake/client/jdbc/SnowflakeStatementV1.java b/src/main/java/net/snowflake/client/jdbc/SnowflakeStatementV1.java index 092c19dd4..649f456be 100644 --- a/src/main/java/net/snowflake/client/jdbc/SnowflakeStatementV1.java +++ b/src/main/java/net/snowflake/client/jdbc/SnowflakeStatementV1.java @@ -893,10 +893,12 @@ public void setAsyncQueryTimeout(int seconds) throws SQLException { try { if (this.sfBaseStatement != null) { this.sfBaseStatement.addProperty("STATEMENT_TIMEOUT_IN_SECONDS", seconds); + // disable statement level query timeout to avoid override by connection parameter + this.setQueryTimeout(0); } } catch (SFException ex) { throw new SnowflakeSQLException( - ex.getCause(), ex.getSqlState(), ex.getVendorCode(), ex.getParams()); + null, ex.getCause(), ex.getSqlState(), ex.getVendorCode(), ex.getParams()); } } diff --git a/src/test/java/net/snowflake/client/jdbc/StatementLatestIT.java b/src/test/java/net/snowflake/client/jdbc/StatementLatestIT.java index 86ed04510..5d4ee67a2 100644 --- a/src/test/java/net/snowflake/client/jdbc/StatementLatestIT.java +++ b/src/test/java/net/snowflake/client/jdbc/StatementLatestIT.java @@ -5,6 +5,7 @@ import static net.snowflake.client.jdbc.ErrorCode.ROW_DOES_NOT_EXIST; import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -21,12 +22,19 @@ import java.sql.SQLException; import java.sql.Statement; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import net.snowflake.client.TestUtil; import net.snowflake.client.annotations.DontRunOnGithubActions; +import net.snowflake.client.annotations.DontRunOnTestaccount; import net.snowflake.client.category.TestTags; import net.snowflake.client.core.ParameterBindingDTO; import net.snowflake.client.core.QueryStatus; @@ -335,25 +343,82 @@ public void testSetQueryTimeoutForAsyncQueryUsingConnectionProperty() throws SQL /** * Test for setting query timeout on regular queries with the IMPLICIT_SERVER_SIDE_QUERY_TIMEOUT - * property set to true. Applicable to versions after 3.21.0. + * property set to true should rely on server only. Applicable to versions after 3.21.0. In + * version above 3.22.0 the error should be handled only on the server side. * * @throws SQLException if there is an error when executing */ @Test - public void testSetQueryTimeoutWhenAsyncConnectionPropertySet() throws SQLException { + @DontRunOnTestaccount // uses too many resources on Jenkins making the test flaky + public void testSetQueryTimeoutOnStatementWhenImplicitQueryTimeoutIsSet() + throws SQLException, InterruptedException, ExecutionException { + int threads = 20; + ExecutorService executor = Executors.newFixedThreadPool(threads); + List> futures = new ArrayList<>(); Properties p = new Properties(); p.put("IMPLICIT_SERVER_SIDE_QUERY_TIMEOUT", true); - try (Connection con = getConnection(p); - Statement statement = con.createStatement()) { - statement.setQueryTimeout(3); + try (Connection con = getConnection(p)) { String sql = "select seq4() from table(generator(rowcount => 1000000000))"; - try { - statement.executeQuery(sql); - fail("This query should fail."); - } catch (SQLException e) { - assertEquals(SqlState.QUERY_CANCELED, e.getSQLState()); + for (int i = 0; i < threads; ++i) { + futures.add( + executor.submit( + () -> { + try (Statement statement = con.createStatement()) { + statement.setQueryTimeout(3); + statement.executeQuery(sql); + fail("This query should fail."); + } catch (SQLException e) { + assertEquals(SqlState.QUERY_CANCELED, e.getSQLState()); + } + })); + } + executor.shutdown(); + assertTrue(executor.awaitTermination(60, TimeUnit.SECONDS)); + for (Future future : futures) { + assertNull(future.get()); + } + } + } + + /** + * Test for setting connection level query timeout on regular queries with the + * IMPLICIT_SERVER_SIDE_QUERY_TIMEOUT property set to true should rely on server only. Applicable + * to versions after 3.22.0. + * + * @throws SQLException if there is an error when executing + */ + @Test + @DontRunOnTestaccount // uses too many resources on Jenkins making the test flaky + public void testSetQueryTimeoutOnConnectionWhenImplicitQueryTimeoutIsSet() + throws SQLException, InterruptedException, ExecutionException { + int threads = 20; + ExecutorService executor = Executors.newFixedThreadPool(threads); + List> futures = new ArrayList<>(); + Properties p = new Properties(); + p.put("IMPLICIT_SERVER_SIDE_QUERY_TIMEOUT", true); + p.put("queryTimeout", 3); + try (Connection con = getConnection(p)) { + + String sql = "select seq4() from table(generator(rowcount => 1000000000))"; + + for (int i = 0; i < threads; ++i) { + futures.add( + executor.submit( + () -> { + try (Statement statement = con.createStatement()) { + statement.executeQuery(sql); + fail("This query should fail."); + } catch (SQLException e) { + assertEquals(SqlState.QUERY_CANCELED, e.getSQLState()); + } + })); + } + executor.shutdown(); + assertTrue(executor.awaitTermination(60, TimeUnit.SECONDS)); + for (Future future : futures) { + assertNull(future.get()); } } } @@ -378,11 +443,42 @@ public void testSetQueryTimeoutForAsyncQuery() throws SQLException { .atMost(Duration.ofSeconds(10)) .until(() -> sfrs.getStatusV2().getStatus() == QueryStatus.FAILED_WITH_ERROR); - assertTrue( - sfrs.getStatusV2() - .getErrorMessage() - .contains( - "Statement reached its statement or warehouse timeout of 3 second(s) and was canceled")); + assertThat( + sfrs.getStatusV2().getErrorMessage(), + containsString( + "Statement reached its statement or warehouse timeout of 3 second(s) and was canceled")); + } + } + } + + /** + * Applicable to versions after 3.22.0. + * + * @throws SQLException if there is an error when executing + */ + @Test + public void testSetAsyncQueryTimeoutOverridesConnectionQueryTimeoutForAsyncQuery() + throws SQLException { + Properties p = new Properties(); + p.put("IMPLICIT_SERVER_SIDE_QUERY_TIMEOUT", true); + p.put("queryTimeout", 1); + try (Connection con = getConnection(p); + Statement statement = con.createStatement()) { + SnowflakeStatement sfStmt = statement.unwrap(SnowflakeStatement.class); + sfStmt.setAsyncQueryTimeout(3); + + String sql = "select seq4() from table(generator(rowcount => 1000000000))"; + + try (ResultSet resultSet = sfStmt.executeAsyncQuery(sql)) { + SnowflakeResultSet sfrs = resultSet.unwrap(SnowflakeResultSet.class); + await() + .atMost(Duration.ofSeconds(10)) + .until(() -> sfrs.getStatusV2().getStatus() == QueryStatus.FAILED_WITH_ERROR); + + assertThat( + sfrs.getStatusV2().getErrorMessage(), + containsString( + "Statement reached its statement or warehouse timeout of 3 second(s) and was canceled")); } } }