From 1ece67b4e47d5406694c960e94c69740aa5fe5bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Fri, 14 Feb 2025 14:42:40 +0100 Subject: [PATCH] fix(jdbc): be resilient to DataException We usually fail fast, but when a DataException is thrown it means the JDBC driver throws an exception with error code 22: data exception. As the exception is from the data not the database or the network, there is no point of failfast, we throw a QueueException that may or may not be handled gracefully by the call site. --- .../java/io/kestra/jdbc/runner/JdbcQueue.java | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java index 811f778f589..67cbeafe282 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java @@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j; import org.jooq.*; import org.jooq.Record; +import org.jooq.exception.DataException; import org.jooq.impl.DSL; import java.io.IOException; @@ -129,18 +130,22 @@ private void produce(String consumerGroup, String key, T message, Boolean skipIn Map, Object> fields = this.produceFields(consumerGroup, key, message); - dslContextWrapper.transaction(configuration -> { - DSLContext context = DSL.using(configuration); + try { + dslContextWrapper.transaction(configuration -> { + DSLContext context = DSL.using(configuration); - if (!skipIndexer) { - jdbcQueueIndexer.accept(context, message); - } + if (!skipIndexer) { + jdbcQueueIndexer.accept(context, message); + } - context - .insertInto(table) - .set(fields) - .execute(); - }); + context + .insertInto(table) + .set(fields) + .execute(); + }); + } catch (DataException e) { // The exception is from the data itself, not the database/network/driver so instead of fail fast, we throw a recoverable QueueException + throw new QueueException("Unable to emit a message to the queue", e); + } } public void emitOnly(String consumerGroup, T message) throws QueueException{