Skip to content

Commit

Permalink
fix(jdbc): be resilient to DataException
Browse files Browse the repository at this point in the history
We usually fail fast, but when a DataException is thrown it means the JDBC driver throws an exception with error code 22: data exception.
As the exception is from the data not the database or the network, there is no point of failfast, we throw a QueueException that may or may not be handled gracefully by the call site.
  • Loading branch information
loicmathieu committed Feb 14, 2025
1 parent e6419cf commit 1ece67b
Showing 1 changed file with 15 additions and 10 deletions.
25 changes: 15 additions & 10 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import lombok.extern.slf4j.Slf4j;
import org.jooq.*;
import org.jooq.Record;
import org.jooq.exception.DataException;
import org.jooq.impl.DSL;

import java.io.IOException;
Expand Down Expand Up @@ -129,18 +130,22 @@ private void produce(String consumerGroup, String key, T message, Boolean skipIn

Map<Field<Object>, Object> fields = this.produceFields(consumerGroup, key, message);

dslContextWrapper.transaction(configuration -> {
DSLContext context = DSL.using(configuration);
try {
dslContextWrapper.transaction(configuration -> {
DSLContext context = DSL.using(configuration);

if (!skipIndexer) {
jdbcQueueIndexer.accept(context, message);
}
if (!skipIndexer) {
jdbcQueueIndexer.accept(context, message);
}

context
.insertInto(table)
.set(fields)
.execute();
});
context
.insertInto(table)
.set(fields)
.execute();
});
} catch (DataException e) { // The exception is from the data itself, not the database/network/driver so instead of fail fast, we throw a recoverable QueueException
throw new QueueException("Unable to emit a message to the queue", e);
}
}

public void emitOnly(String consumerGroup, T message) throws QueueException{
Expand Down

0 comments on commit 1ece67b

Please sign in to comment.