From 9c2219df672d188eaf1a5bd76a2f79efe0ea01c5 Mon Sep 17 00:00:00 2001 From: Yas Okada Date: Tue, 21 May 2024 18:10:30 +0900 Subject: [PATCH 1/3] reconnect when auth error occurs in runDropStage method --- .../embulk/output/SnowflakeOutputPlugin.java | 26 +++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java index 7f34019..fed6794 100644 --- a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java +++ b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java @@ -7,6 +7,7 @@ import java.sql.Types; import java.util.*; import java.util.function.BiFunction; +import net.snowflake.client.jdbc.SnowflakeSQLException; import net.snowflake.client.jdbc.internal.org.bouncycastle.operator.OperatorCreationException; import net.snowflake.client.jdbc.internal.org.bouncycastle.pkcs.PKCSException; import org.embulk.config.ConfigDiff; @@ -204,12 +205,12 @@ public ConfigDiff transaction( snowflakeCon.runCreateStage(stageIdentifier); configDiff = super.transaction(config, schema, taskCount, control); if (t.getDeleteStage()) { - snowflakeCon.runDropStage(stageIdentifier); + runDropStage(snowflakeCon, stageIdentifier, task); } } catch (Exception e) { if (t.getDeleteStage() && t.getDeleteStageOnError()) { try { - snowflakeCon.runDropStage(stageIdentifier); + runDropStage(snowflakeCon, stageIdentifier, task); } catch (SQLException ex) { throw new RuntimeException(ex); } @@ -220,6 +221,27 @@ public ConfigDiff transaction( return configDiff; } + private void runDropStage( + SnowflakeOutputConnection snowflakeCon, StageIdentifier stageIdentifier, PluginTask task) + throws SQLException { + try { + snowflakeCon.runDropStage(stageIdentifier); + } catch (SnowflakeSQLException ex) { + logger.info("SnowflakeSQLException was caught: {}", ex.getMessage()); + + if (ex.getMessage().startsWith("Authentication token has expired.") + || ex.getMessage().startsWith("Session no longer exists.")) { + + // INFO: If runCreateStage consumed a lot of time, authentication might be expired. + // In this case, retry to drop stage. + snowflakeCon = (SnowflakeOutputConnection) getConnector(task, true).connect(true); + snowflakeCon.runDropStage(stageIdentifier); + } else { + throw ex; + } + } + } + @Override public ConfigDiff resume( TaskSource taskSource, Schema schema, int taskCount, OutputPlugin.Control control) { From 364ecd609fa0eb0906477d4d22dc27bfadd6b6f7 Mon Sep 17 00:00:00 2001 From: Yas Okada Date: Thu, 30 May 2024 15:20:09 +0900 Subject: [PATCH 2/3] fix to handle error codes instead of error message --- .../embulk/output/SnowflakeOutputPlugin.java | 40 ++++++++++++++----- 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java index fed6794..5d37671 100644 --- a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java +++ b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java @@ -125,6 +125,16 @@ public static MatchByColumnName fromString(String value) { } } + // error codes which need reauthenticate + // ref: + // https://github.com/snowflakedb/snowflake-jdbc/blob/v3.13.26/src/main/java/net/snowflake/client/jdbc/SnowflakeUtil.java#L42 + private static final int ID_TOKEN_EXPIRED_GS_CODE = 390110; + private static final int SESSION_NOT_EXIST_GS_CODE = 390111; + private static final int MASTER_TOKEN_NOTFOUND = 390113; + private static final int MASTER_EXPIRED_GS_CODE = 390114; + private static final int MASTER_TOKEN_INVALID_GS_CODE = 390115; + private static final int ID_TOKEN_INVALID_LOGIN_REQUEST_GS_CODE = 390195; + @Override protected Class getTaskClass() { return SnowflakePluginTask.class; @@ -227,17 +237,25 @@ private void runDropStage( try { snowflakeCon.runDropStage(stageIdentifier); } catch (SnowflakeSQLException ex) { - logger.info("SnowflakeSQLException was caught: {}", ex.getMessage()); - - if (ex.getMessage().startsWith("Authentication token has expired.") - || ex.getMessage().startsWith("Session no longer exists.")) { - - // INFO: If runCreateStage consumed a lot of time, authentication might be expired. - // In this case, retry to drop stage. - snowflakeCon = (SnowflakeOutputConnection) getConnector(task, true).connect(true); - snowflakeCon.runDropStage(stageIdentifier); - } else { - throw ex; + // INFO: Don't handle only SnowflakeReauthenticationRequest here + // because SnowflakeSQLException with following error codes may be thrown in some cases. + + logger.info("SnowflakeSQLException was caught: ({}) {}", ex.getErrorCode(), ex.getMessage()); + + switch (ex.getErrorCode()) { + case ID_TOKEN_EXPIRED_GS_CODE: + case SESSION_NOT_EXIST_GS_CODE: + case MASTER_TOKEN_NOTFOUND: + case MASTER_EXPIRED_GS_CODE: + case MASTER_TOKEN_INVALID_GS_CODE: + case ID_TOKEN_INVALID_LOGIN_REQUEST_GS_CODE: + // INFO: If runCreateStage consumed a lot of time, authentication might be expired. + // In this case, retry to drop stage. + snowflakeCon = (SnowflakeOutputConnection) getConnector(task, true).connect(true); + snowflakeCon.runDropStage(stageIdentifier); + break; + default: + throw ex; } } } From f383ccf4cf1b988c6d4cda265f9a44175732cb60 Mon Sep 17 00:00:00 2001 From: Yas Okada Date: Thu, 7 Nov 2024 17:02:54 +0900 Subject: [PATCH 3/3] refactor: rename runDropStage to runDropStageWithRecovery --- src/main/java/org/embulk/output/SnowflakeOutputPlugin.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java index 5d37671..e379fa9 100644 --- a/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java +++ b/src/main/java/org/embulk/output/SnowflakeOutputPlugin.java @@ -215,12 +215,12 @@ public ConfigDiff transaction( snowflakeCon.runCreateStage(stageIdentifier); configDiff = super.transaction(config, schema, taskCount, control); if (t.getDeleteStage()) { - runDropStage(snowflakeCon, stageIdentifier, task); + runDropStageWithRecovery(snowflakeCon, stageIdentifier, task); } } catch (Exception e) { if (t.getDeleteStage() && t.getDeleteStageOnError()) { try { - runDropStage(snowflakeCon, stageIdentifier, task); + runDropStageWithRecovery(snowflakeCon, stageIdentifier, task); } catch (SQLException ex) { throw new RuntimeException(ex); } @@ -231,7 +231,7 @@ public ConfigDiff transaction( return configDiff; } - private void runDropStage( + private void runDropStageWithRecovery( SnowflakeOutputConnection snowflakeCon, StageIdentifier stageIdentifier, PluginTask task) throws SQLException { try {