diff --git a/jdbc-postgres/src/test/java/io/kestra/runner/postgres/PostgresQueueTest.java b/jdbc-postgres/src/test/java/io/kestra/runner/postgres/PostgresQueueTest.java index 38863590341..d5dc565c8e9 100644 --- a/jdbc-postgres/src/test/java/io/kestra/runner/postgres/PostgresQueueTest.java +++ b/jdbc-postgres/src/test/java/io/kestra/runner/postgres/PostgresQueueTest.java @@ -1,7 +1,38 @@ package io.kestra.runner.postgres; +import io.kestra.core.models.executions.TaskRun; +import io.kestra.core.models.flows.State; +import io.kestra.core.queues.QueueException; +import io.kestra.core.runners.WorkerTaskResult; +import io.kestra.core.utils.IdUtils; import io.kestra.jdbc.runner.JdbcQueueTest; +import org.jooq.exception.DataException; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertThrows; class PostgresQueueTest extends JdbcQueueTest { + @Test + void invalidWorkerTaskShouldThrowDataException() throws QueueException { + var workerTaskResult = WorkerTaskResult.builder() + .taskRun(TaskRun.builder() + .taskId("taskId") + .id(IdUtils.create()) + .namespace("namespace") + .flowId("flowId") + .state(new State().withState(State.Type.SUCCESS)) + .outputs(Map.of("value", "\u0000")) + .build() + ) + .build(); + var exception = assertThrows(QueueException.class, () -> workerTaskResultQueue.emit(workerTaskResult)); + assertThat(exception.getMessage(), is("Unable to emit a message to the queue")); + assertThat(exception.getCause(), instanceOf(DataException.class)); + } } \ No newline at end of file diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java index 811f778f589..67cbeafe282 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java @@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j; import org.jooq.*; import org.jooq.Record; +import org.jooq.exception.DataException; import org.jooq.impl.DSL; import java.io.IOException; @@ -129,18 +130,22 @@ private void produce(String consumerGroup, String key, T message, Boolean skipIn Map, Object> fields = this.produceFields(consumerGroup, key, message); - dslContextWrapper.transaction(configuration -> { - DSLContext context = DSL.using(configuration); + try { + dslContextWrapper.transaction(configuration -> { + DSLContext context = DSL.using(configuration); - if (!skipIndexer) { - jdbcQueueIndexer.accept(context, message); - } + if (!skipIndexer) { + jdbcQueueIndexer.accept(context, message); + } - context - .insertInto(table) - .set(fields) - .execute(); - }); + context + .insertInto(table) + .set(fields) + .execute(); + }); + } catch (DataException e) { // The exception is from the data itself, not the database/network/driver so instead of fail fast, we throw a recoverable QueueException + throw new QueueException("Unable to emit a message to the queue", e); + } } public void emitOnly(String consumerGroup, T message) throws QueueException{ diff --git a/jdbc/src/test/java/io/kestra/jdbc/runner/JdbcQueueTest.java b/jdbc/src/test/java/io/kestra/jdbc/runner/JdbcQueueTest.java index a309d9f46f2..926f193aba8 100644 --- a/jdbc/src/test/java/io/kestra/jdbc/runner/JdbcQueueTest.java +++ b/jdbc/src/test/java/io/kestra/jdbc/runner/JdbcQueueTest.java @@ -6,6 +6,7 @@ import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueInterface; import io.kestra.core.runners.Indexer; +import io.kestra.core.runners.WorkerTaskResult; import io.kestra.core.utils.TestsUtils; import io.kestra.plugin.core.debug.Return; import io.kestra.core.utils.IdUtils; @@ -31,6 +32,10 @@ abstract public class JdbcQueueTest { @Named(QueueFactoryInterface.FLOW_NAMED) protected QueueInterface flowQueue; + @Inject + @Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED) + protected QueueInterface workerTaskResultQueue; + @Inject JdbcTestUtils jdbcTestUtils;