Skip to content

Commit

Permalink
fix(jdbc): be resilient to DataException
Browse files Browse the repository at this point in the history
We usually fail fast, but when a DataException is thrown it means the JDBC driver throws an exception with error code 22: data exception.
As the exception is from the data not the database or the network, there is no point of failfast, we throw a QueueException that may or may not be handled gracefully by the call site.
  • Loading branch information
loicmathieu committed Feb 24, 2025
1 parent bd6937a commit 69f91a1
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 10 deletions.
25 changes: 15 additions & 10 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import lombok.extern.slf4j.Slf4j;
import org.jooq.*;
import org.jooq.Record;
import org.jooq.exception.DataException;
import org.jooq.impl.DSL;

import java.io.IOException;
Expand Down Expand Up @@ -129,18 +130,22 @@ private void produce(String consumerGroup, String key, T message, Boolean skipIn

Map<Field<Object>, Object> fields = this.produceFields(consumerGroup, key, message);

dslContextWrapper.transaction(configuration -> {
DSLContext context = DSL.using(configuration);
try {
dslContextWrapper.transaction(configuration -> {
DSLContext context = DSL.using(configuration);

if (!skipIndexer) {
jdbcQueueIndexer.accept(context, message);
}
if (!skipIndexer) {
jdbcQueueIndexer.accept(context, message);
}

context
.insertInto(table)
.set(fields)
.execute();
});
context
.insertInto(table)
.set(fields)
.execute();
});
} catch (DataException e) { // The exception is from the data itself, not the database/network/driver so instead of fail fast, we throw a recoverable QueueException
throw new QueueException("Unable to emit a message to the queue", e);
}
}

public void emitOnly(String consumerGroup, T message) throws QueueException{
Expand Down
30 changes: 30 additions & 0 deletions jdbc/src/test/java/io/kestra/jdbc/runner/JdbcQueueTest.java
Original file line number Diff line number Diff line change
@@ -1,36 +1,47 @@
package io.kestra.jdbc.runner;

import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.property.Property;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.Indexer;
import io.kestra.core.runners.WorkerTaskResult;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.core.debug.Return;
import io.kestra.core.utils.IdUtils;
import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.jooq.exception.DataException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static io.kestra.core.utils.Rethrow.throwConsumer;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertThrows;

@KestraTest
abstract public class JdbcQueueTest {
@Inject
@Named(QueueFactoryInterface.FLOW_NAMED)
protected QueueInterface<FlowWithSource> flowQueue;

@Inject
@Named(QueueFactoryInterface.WORKERTASKRESULT_NAMED)
protected QueueInterface<WorkerTaskResult> workerTaskResultQueue;

@Inject
JdbcTestUtils jdbcTestUtils;

Expand Down Expand Up @@ -128,6 +139,25 @@ void withGroupAndType() throws InterruptedException, QueueException {
assertThat(receive.blockLast().getNamespace(), is("io.kestra.f2"));
}

@Test
void invalidWorkerTaskShouldThrowDataException() throws QueueException {
var workerTaskResult = WorkerTaskResult.builder()
.taskRun(TaskRun.builder()
.taskId("taskId")
.id(IdUtils.create())
.namespace("namespace")
.flowId("flowId")
.state(new State().withState(State.Type.SUCCESS))
.outputs(Map.of("value", "\u0000"))
.build()
)
.build();

var exception = assertThrows(QueueException.class, () -> workerTaskResultQueue.emit(workerTaskResult));

Check failure on line 156 in jdbc/src/test/java/io/kestra/jdbc/runner/JdbcQueueTest.java

View workflow job for this annotation

GitHub Actions / Java Tests Report

io.kestra.runner.h2.H2QueueTest ► invalidWorkerTaskShouldThrowDataException()

Failed test found in: jdbc-h2/build/test-results/test/TEST-io.kestra.runner.h2.H2QueueTest.xml jdbc-mysql/build/test-results/test/TEST-io.kestra.runner.mysql.MysqlQueueTest.xml Error: org.opentest4j.AssertionFailedError: Expected io.kestra.core.queues.QueueException to be thrown, but nothing was thrown.
Raw output
org.opentest4j.AssertionFailedError: Expected io.kestra.core.queues.QueueException to be thrown, but nothing was thrown.
	at app//io.kestra.jdbc.runner.JdbcQueueTest.invalidWorkerTaskShouldThrowDataException(JdbcQueueTest.java:156)
	at [email protected]/java.lang.reflect.Method.invoke(Method.java:580)
	at app//io.micronaut.test.extensions.junit5.MicronautJunit5Extension$2.proceed(MicronautJunit5Extension.java:142)
	at app//io.micronaut.test.extensions.AbstractMicronautExtension.interceptEach(AbstractMicronautExtension.java:162)
	at app//io.micronaut.test.extensions.AbstractMicronautExtension.interceptTest(AbstractMicronautExtension.java:119)
	at app//io.micronaut.test.extensions.junit5.MicronautJunit5Extension.interceptTestMethod(MicronautJunit5Extension.java:129)
	at [email protected]/java.util.ArrayList.forEach(ArrayList.java:1596)
	at [email protected]/java.util.ArrayList.forEach(ArrayList.java:1596)
assertThat(exception.getMessage(), is("Unable to emit a message to the queue"));
assertThat(exception.getCause(), instanceOf(DataException.class));
}

private static FlowWithSource builder(String namespace) {
return FlowWithSource.builder()
.id(IdUtils.create())
Expand Down

0 comments on commit 69f91a1

Please sign in to comment.