Skip to content

Commit

Permalink
fix(jdbc): be resilient to DataException
Browse files Browse the repository at this point in the history
We usually fail fast, but when a DataException is thrown it means the JDBC driver throws an exception with error code 22: data exception.
As the exception is from the data not the database or the network, there is no point of failfast, we throw a QueueException that may or may not be handled gracefully by the call site.
  • Loading branch information
loicmathieu committed Feb 24, 2025
1 parent bd6937a commit 0d61c3e
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,38 @@
package io.kestra.runner.postgres;

import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.runners.WorkerTaskResult;
import io.kestra.core.utils.IdUtils;
import io.kestra.jdbc.runner.JdbcQueueTest;
import org.jooq.exception.DataException;
import org.junit.jupiter.api.Test;

import java.util.Map;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertThrows;

class PostgresQueueTest extends JdbcQueueTest {
@Test
void invalidWorkerTaskShouldThrowDataException() throws QueueException {
var workerTaskResult = WorkerTaskResult.builder()
.taskRun(TaskRun.builder()
.taskId("taskId")
.id(IdUtils.create())
.namespace("namespace")
.flowId("flowId")
.state(new State().withState(State.Type.SUCCESS))
.outputs(Map.of("value", "\u0000"))
.build()
)
.build();

var exception = assertThrows(QueueException.class, () -> workerTaskResultQueue.emit(workerTaskResult));
assertThat(exception.getMessage(), is("Unable to emit a message to the queue"));
assertThat(exception.getCause(), instanceOf(DataException.class));
}
}
25 changes: 15 additions & 10 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import lombok.extern.slf4j.Slf4j;
import org.jooq.*;
import org.jooq.Record;
import org.jooq.exception.DataException;
import org.jooq.impl.DSL;

import java.io.IOException;
Expand Down Expand Up @@ -129,18 +130,22 @@ private void produce(String consumerGroup, String key, T message, Boolean skipIn

Map<Field<Object>, Object> fields = this.produceFields(consumerGroup, key, message);

dslContextWrapper.transaction(configuration -> {
DSLContext context = DSL.using(configuration);
try {
dslContextWrapper.transaction(configuration -> {
DSLContext context = DSL.using(configuration);

if (!skipIndexer) {
jdbcQueueIndexer.accept(context, message);
}
if (!skipIndexer) {
jdbcQueueIndexer.accept(context, message);
}

context
.insertInto(table)
.set(fields)
.execute();
});
context
.insertInto(table)
.set(fields)
.execute();
});
} catch (DataException e) { // The exception is from the data itself, not the database/network/driver so instead of fail fast, we throw a recoverable QueueException
throw new QueueException("Unable to emit a message to the queue", e);
}
}

public void emitOnly(String consumerGroup, T message) throws QueueException{
Expand Down
5 changes: 5 additions & 0 deletions jdbc/src/test/java/io/kestra/jdbc/runner/JdbcQueueTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.Indexer;
import io.kestra.core.runners.WorkerTaskResult;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.debug.Return;
import io.kestra.core.utils.IdUtils;
Expand All @@ -31,6 +32,10 @@ abstract public class JdbcQueueTest {
@Named(QueueFactoryInterface.FLOW_NAMED)
protected QueueInterface<FlowWithSource> flowQueue;

@Inject
@Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED)
protected QueueInterface<WorkerTaskResult> workerTaskResultQueue;

@Inject
JdbcTestUtils jdbcTestUtils;

Expand Down

0 comments on commit 0d61c3e

Please sign in to comment.