diff --git a/README.md b/README.md index 52dfc6b..0b7c03f 100644 --- a/README.md +++ b/README.md @@ -662,13 +662,16 @@ If you want to use Avro with a schema registry, follow these steps: curl -s http://localhost:8081/subjects/com.kpipe.customer/versions/latest | jq -r '.schema' | jq --indent 2 '.' # Produce an Avro message using kafka-avro-console-producer - echo '{"id":1,"name":"Mariano Gonzalez","email":{"string":"mariano@example.com"},"active":true,"registrationDate":1635724800000,"address":{"com.kpipe.customer.Address":{"street":"123 Main St","city":"Chicago","zipCode":"00000","country":"USA"}},"tags":["premium","verified"],"preferences":{"notifications":"email"}}' \ - | docker run -i --rm --network=host confluentinc/cp-schema-registry:7.7.1 \ - kafka-avro-console-producer \ - --bootstrap-server localhost:9092 \ - --topic avro-topic \ - --property schema.registry.url=http://localhost:8081 \ - --property value.schema.id=1 + cat <<'JSON' | docker run -i --rm --network kpipe_default \ + -v "$PWD/lib/src/test/resources/avro/customer.avsc:/tmp/customer.avsc:ro" \ + confluentinc/cp-schema-registry:8.2.0 \ + sh -ec 'kafka-avro-console-producer \ + --bootstrap-server kafka:9092 \ + --topic avro-topic \ + --property schema.registry.url=http://schema-registry:8081 \ + --property value.schema="$(cat /tmp/customer.avsc)"' +{"id":1,"name":"Mariano Gonzalez","email":{"string":"mariano@example.com"},"active":true,"registrationDate":1635724800000,"address":{"com.kpipe.customer.Address":{"street":"123 Main St","city":"Chicago","zipCode":"00000","country":"USA"}},"tags":["premium","verified"],"preferences":{"notifications":"email"}} +JSON ``` Kafka consumer will: diff --git a/app/avro/build.gradle.kts b/app/avro/build.gradle.kts index 6e75f01..6795d30 100644 --- a/app/avro/build.gradle.kts +++ b/app/avro/build.gradle.kts @@ -1,4 +1,5 @@ import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar +import org.gradle.api.artifacts.VersionCatalogsExtension tasks.named("shadowJar") { archiveClassifier.set("all") @@ -9,8 +10,10 @@ tasks.named("shadowJar") { } } +val libsCatalog = rootProject.extensions.getByType().named("libs") + description = "KPipe - Kafka Consumer Application Using Avro" dependencies { - implementation("org.apache.avro:avro:1.12.1") + implementation(libsCatalog.findLibrary("avro").get()) } diff --git a/app/avro/src/test/java/org/kpipe/AppIntegrationTest.java b/app/avro/src/test/java/org/kpipe/AppIntegrationTest.java index 942d18b..8be9356 100644 --- a/app/avro/src/test/java/org/kpipe/AppIntegrationTest.java +++ b/app/avro/src/test/java/org/kpipe/AppIntegrationTest.java @@ -3,18 +3,25 @@ import static org.junit.jupiter.api.Assertions.*; import com.dslplatform.json.DslJson; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpServer; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.lang.System.Logger; import java.lang.System.Logger.Level; +import java.net.InetSocketAddress; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; import org.apache.avro.Schema; @@ -29,50 +36,65 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.kpipe.config.AppConfig; import org.kpipe.sink.MessageSink; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.containers.Network; -import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.kafka.ConfluentKafkaContainer; import org.testcontainers.utility.DockerImageName; @Testcontainers class AppIntegrationTest { private static final Logger log = System.getLogger(AppIntegrationTest.class.getName()); - - static Network network = Network.newNetwork(); + private static final String CONFLUENT_PLATFORM_VERSION = System.getProperty("confluentPlatformVersion", "8.2.0"); + private static final String AVRO_TOPIC = "avro-topic"; + private static final String SCHEMA_CONTENT_TYPE = "application/vnd.schemaregistry.v1+json"; + private static final String SCHEMA_SUBJECT = "com.kpipe.customer"; + private static final String SCHEMA_VERSIONS_PATH = "/subjects/" + SCHEMA_SUBJECT + "/versions"; + private static final String SCHEMA_LATEST_PATH = SCHEMA_VERSIONS_PATH + "/latest"; + private static final String SUBJECTS_EMPTY_JSON = """ + [] + """; + private static final String REGISTERED_SCHEMA_RESPONSE_JSON = """ + {"id":1} + """; + private static final String SCHEMA_NOT_FOUND_JSON = """ + {"error_code":40403,"message":"Schema not found"} + """; + private static final String ROUTE_NOT_FOUND_JSON = """ + {"message":"Not found"} + """; + private static final DslJson JSON = new DslJson<>(); + private static final HttpClient HTTP_CLIENT = HttpClient.newHttpClient(); + private static volatile String latestSchema; + private static HttpServer schemaRegistryStub; @Container - static KafkaContainer kafka = new KafkaContainer( - DockerImageName.parse("confluentinc/cp-kafka:7.7.1").asCompatibleSubstituteFor("apache/kafka") + static ConfluentKafkaContainer kafka = new ConfluentKafkaContainer( + DockerImageName.parse("confluentinc/cp-kafka:%s".formatted(CONFLUENT_PLATFORM_VERSION)) ) - .withNetwork(network) - .withNetworkAliases("kafka") - .waitingFor(Wait.forListeningPort()); + .withStartupAttempts(3); - @Container - static GenericContainer schemaRegistry = new GenericContainer<>( - DockerImageName - .parse("confluentinc/cp-schema-registry:7.7.1") - .asCompatibleSubstituteFor("confluentinc/cp-schema-registry") - ) - .withNetwork(network) - .withNetworkAliases("schema-registry") - .withExposedPorts(8081) - .withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry") - .withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://kafka:9092") - .dependsOn(kafka) - .waitingFor(Wait.forHttp("/subjects").forStatusCode(200).withStartupTimeout(Duration.ofMinutes(2))); + @BeforeAll + static void startSchemaRegistryStub() throws Exception { + schemaRegistryStub = HttpServer.create(new InetSocketAddress(0), 0); + schemaRegistryStub.createContext("/subjects", AppIntegrationTest::handleSchemaStubRequest); + schemaRegistryStub.start(); + } + + @AfterAll + static void stopSchemaRegistryStub() { + if (schemaRegistryStub != null) schemaRegistryStub.stop(0); + latestSchema = null; + } @Test void testAvroAppEndToEnd() throws Exception { - final var topic = "avro-topic"; - final var srUrl = "http://%s:%d".formatted(schemaRegistry.getHost(), schemaRegistry.getMappedPort(8081)); + final var srUrl = "http://localhost:%d".formatted(schemaRegistryStub.getAddress().getPort()); // Load and register schema before App construction (App fetches latest schema during init). final Schema schema; @@ -80,12 +102,12 @@ void testAvroAppEndToEnd() throws Exception { assertNotNull(is, "Schema file not found"); schema = new Schema.Parser().parse(is); } - registerSchema(srUrl, "com.kpipe.customer", schema.toString()); + registerSchema(srUrl, schema.toString()); final var config = new AppConfig( kafka.getBootstrapServers(), "test-group", - topic, + AVRO_TOPIC, "avro-app", Duration.ofMillis(100), Duration.ofSeconds(1), @@ -111,26 +133,6 @@ void testAvroAppEndToEnd() throws Exception { } }); - // Produce an Avro message with Confluent Wire Format (Magic Byte 0 + Schema ID) - final var record = new GenericData.Record(schema); - record.put("id", 1L); - record.put("name", "Test User"); - record.put("email", "test.user@example.com"); - record.put("active", true); - record.put("registrationDate", System.currentTimeMillis()); - record.put("address", null); - record.put("tags", new GenericData.Array<>(schema.getField("tags").schema(), Collections.emptyList())); - record.put("preferences", Collections.emptyMap()); - - final var out = new ByteArrayOutputStream(); - out.write(0); // Magic byte - out.write(ByteBuffer.allocate(4).putInt(1).array()); // Schema ID 1 - - final var encoder = EncoderFactory.get().binaryEncoder(out, null); - final var writer = new GenericDatumWriter(schema); - writer.write(record, encoder); - encoder.flush(); - final var producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); @@ -138,7 +140,7 @@ final var record = new GenericData.Record(schema); // Retry produce during the warm-up window so we do not miss messages while the consumer // finishes initial group assignment. - produceUntilConsumed(out.toByteArray(), producerProps, capturingSink, Duration.ofSeconds(10)); + produceUntilConsumed(createConfluentWirePayload(schema), producerProps, capturingSink, Duration.ofSeconds(10)); // Verify assertTrue(appThread.isAlive()); @@ -158,6 +160,76 @@ final var record = new GenericData.Record(schema); } } + private static byte[] createConfluentWirePayload(final Schema schema) throws Exception { + final var record = new GenericData.Record(schema); + record.put("id", 1L); + record.put("name", "Test User"); + record.put("email", "test.user@example.com"); + record.put("active", true); + record.put("registrationDate", System.currentTimeMillis()); + record.put("address", null); + record.put("tags", new GenericData.Array<>(schema.getField("tags").schema(), Collections.emptyList())); + record.put("preferences", Collections.emptyMap()); + + final var out = new ByteArrayOutputStream(); + out.write(0); // Magic byte + out.write(ByteBuffer.allocate(4).putInt(1).array()); // Schema ID 1 + + final var encoder = EncoderFactory.get().binaryEncoder(out, null); + final var writer = new GenericDatumWriter(schema); + writer.write(record, encoder); + encoder.flush(); + return out.toByteArray(); + } + + private static void handleSchemaStubRequest(final HttpExchange exchange) { + try (exchange) { + final var method = exchange.getRequestMethod(); + final var path = exchange.getRequestURI().getPath(); + + if ("GET".equals(method) && "/subjects".equals(path)) { + writeJson(exchange, 200, SUBJECTS_EMPTY_JSON); + return; + } + + if ("POST".equals(method) && SCHEMA_VERSIONS_PATH.equals(path)) { + final var bodyBytes = exchange.getRequestBody().readAllBytes(); + final var payload = JSON.deserialize(Map.class, new ByteArrayInputStream(bodyBytes)); + latestSchema = payload != null && payload.get("schema") != null ? payload.get("schema").toString() : null; + writeJson(exchange, 200, REGISTERED_SCHEMA_RESPONSE_JSON); + return; + } + + if ("GET".equals(method) && SCHEMA_LATEST_PATH.equals(path)) { + if (latestSchema == null) { + writeJson(exchange, 404, SCHEMA_NOT_FOUND_JSON); + return; + } + + final var response = new HashMap(); + response.put("subject", SCHEMA_SUBJECT); + response.put("version", 1); + response.put("id", 1); + response.put("schema", latestSchema); + + final byte[] json; + try (final var out = new ByteArrayOutputStream()) { + JSON.serialize(response, out); + json = out.toByteArray(); + } + + exchange.getResponseHeaders().set("Content-Type", SCHEMA_CONTENT_TYPE); + exchange.sendResponseHeaders(200, json.length); + exchange.getResponseBody().write(json); + return; + } + + writeJson(exchange, 404, ROUTE_NOT_FOUND_JSON); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + private static void produceUntilConsumed( final byte[] payload, final Properties producerProps, @@ -167,7 +239,7 @@ private static void produceUntilConsumed( final var deadline = System.nanoTime() + timeout.toNanos(); try (final var producer = new KafkaProducer(producerProps)) { while (System.nanoTime() < deadline) { - producer.send(new ProducerRecord<>("avro-topic", payload)).get(); + producer.send(new ProducerRecord<>(AVRO_TOPIC, payload)).get(); if (sink.size() >= 1) return; TimeUnit.MILLISECONDS.sleep(250); } @@ -175,30 +247,39 @@ private static void produceUntilConsumed( throw new AssertionError("Timed out waiting for consumer to receive produced message(s)"); } - private static void registerSchema(final String schemaRegistryUrl, final String subject, final String schemaJson) - throws Exception { - final var json = new DslJson<>(); + private static void registerSchema(final String schemaRegistryUrl, final String schemaJson) throws Exception { final var payloadMap = Collections.singletonMap("schema", schemaJson); final byte[] payload; try (final var out = new ByteArrayOutputStream()) { - json.serialize(payloadMap, out); + JSON.serialize(payloadMap, out); payload = out.toByteArray(); } final var request = HttpRequest .newBuilder() - .uri(URI.create("%s/subjects/%s/versions".formatted(schemaRegistryUrl, subject))) - .header("Content-Type", "application/vnd.schemaregistry.v1+json") + .uri(URI.create("%s%s".formatted(schemaRegistryUrl, SCHEMA_VERSIONS_PATH))) + .header("Content-Type", SCHEMA_CONTENT_TYPE) .POST(HttpRequest.BodyPublishers.ofByteArray(payload)) .build(); - final var response = HttpClient.newHttpClient().send(request, HttpResponse.BodyHandlers.ofString()); + final var response = HTTP_CLIENT.send(request, HttpResponse.BodyHandlers.ofString()); assertTrue( response.statusCode() == 200 || response.statusCode() == 409, "Schema registration failed: HTTP %d body=%s".formatted(response.statusCode(), response.body()) ); } + private static void writeJson(final HttpExchange exchange, final int status, final String body) { + try { + final var payload = body.getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().set("Content-Type", SCHEMA_CONTENT_TYPE); + exchange.sendResponseHeaders(status, payload.length); + exchange.getResponseBody().write(payload); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + private static class CapturingSink implements MessageSink { private final List messages = new ArrayList<>(); diff --git a/app/build.gradle.kts b/app/build.gradle.kts index 8d094e5..8a08753 100644 --- a/app/build.gradle.kts +++ b/app/build.gradle.kts @@ -1,4 +1,5 @@ import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar +import org.gradle.api.artifacts.VersionCatalogsExtension import org.gradle.api.tasks.testing.logging.TestExceptionFormat.FULL plugins { @@ -6,27 +7,23 @@ plugins { id("com.gradleup.shadow") version "8.3.5" apply false } -val kafkaVersion = "3.9.0" -val slf4jVersion = "2.0.9" -val junitVersion = "5.10.0" -val testcontainersVersion = "2.0.3" -val dslJsonVersion = "2.0.2" +val libsCatalog = rootProject.extensions.getByType().named("libs") subprojects { apply(plugin = "java") apply(plugin = "com.gradleup.shadow") dependencies { - "implementation"(project(":lib")) - "implementation"("org.apache.kafka:kafka-clients:$kafkaVersion") - "implementation"("org.slf4j:slf4j-simple:$slf4jVersion") - - "testImplementation"("org.junit.jupiter:junit-jupiter:$junitVersion") - "testImplementation"("org.testcontainers:testcontainers:$testcontainersVersion") - "testImplementation"("org.testcontainers:testcontainers-junit-jupiter:$testcontainersVersion") - "testImplementation"("org.testcontainers:testcontainers-kafka:$testcontainersVersion") - "testImplementation"("com.dslplatform:dsl-json:$dslJsonVersion") - "testRuntimeOnly"("org.junit.platform:junit-platform-launcher") + add("implementation", project(":lib")) + add("implementation", libsCatalog.findLibrary("kafkaClients").get()) + add("implementation", libsCatalog.findLibrary("slf4jSimple").get()) + + add("testImplementation", libsCatalog.findLibrary("junitJupiter").get()) + add("testImplementation", libsCatalog.findLibrary("testcontainers").get()) + add("testImplementation", libsCatalog.findLibrary("testcontainersJunitJupiter").get()) + add("testImplementation", libsCatalog.findLibrary("testcontainersKafka").get()) + add("testImplementation", libsCatalog.findLibrary("dslJson").get()) + add("testRuntimeOnly", libsCatalog.findLibrary("junitPlatformLauncher").get()) } tasks.withType { diff --git a/app/json/src/test/java/org/kpipe/AppIntegrationTest.java b/app/json/src/test/java/org/kpipe/AppIntegrationTest.java index 0cbf8fa..e72d8fa 100644 --- a/app/json/src/test/java/org/kpipe/AppIntegrationTest.java +++ b/app/json/src/test/java/org/kpipe/AppIntegrationTest.java @@ -20,21 +20,22 @@ import org.junit.jupiter.api.Test; import org.kpipe.config.AppConfig; import org.kpipe.sink.MessageSink; -import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.kafka.ConfluentKafkaContainer; import org.testcontainers.utility.DockerImageName; @Testcontainers class AppIntegrationTest { private static final Logger log = System.getLogger(AppIntegrationTest.class.getName()); + private static final String CONFLUENT_PLATFORM_VERSION = System.getProperty("confluentPlatformVersion", "8.2.0"); @Container - static KafkaContainer kafka = new KafkaContainer( - DockerImageName.parse("confluentinc/cp-kafka:7.7.1").asCompatibleSubstituteFor("apache/kafka") + static ConfluentKafkaContainer kafka = new ConfluentKafkaContainer( + DockerImageName.parse("confluentinc/cp-kafka:%s".formatted(CONFLUENT_PLATFORM_VERSION)) ) - .waitingFor(org.testcontainers.containers.wait.strategy.Wait.forListeningPort()); + .withStartupAttempts(3); @Test void testJsonAppEndToEnd() throws Exception { @@ -108,9 +109,7 @@ private static void produceUntilConsumed( try (final var producer = new KafkaProducer(producerProps)) { while (System.nanoTime() < deadline) { producer.send(new ProducerRecord<>("json-topic", payload)).get(); - if (sink.size() >= 1) { - return; - } + if (sink.size() >= 1) return; TimeUnit.MILLISECONDS.sleep(250); } } diff --git a/benchmarks/build.gradle.kts b/benchmarks/build.gradle.kts index 30e3af2..da675f9 100644 --- a/benchmarks/build.gradle.kts +++ b/benchmarks/build.gradle.kts @@ -1,31 +1,32 @@ +import org.gradle.api.artifacts.VersionCatalogsExtension + plugins { java id("me.champeau.jmh") version "0.7.3" } -val kafkaVersion = "3.9.1" -val slf4jVersion = "2.0.9" -val junitVersion = "5.10.0" +val libsCatalog = rootProject.extensions.getByType().named("libs") dependencies { // Benchmarks consume public API from :lib implementation(project(":lib")) // Benchmark targets - implementation("org.apache.kafka:kafka-clients:$kafkaVersion") - implementation("io.confluent.parallelconsumer:parallel-consumer-core:0.5.3.0") - implementation("org.apache.avro:avro:1.12.1") - implementation("com.dslplatform:dsl-json:2.0.2") + implementation(libsCatalog.findLibrary("kafkaClients").get()) + implementation(libsCatalog.findLibrary("parallelConsumerCore").get()) + implementation(libsCatalog.findLibrary("avro").get()) + implementation(libsCatalog.findLibrary("dslJson").get()) // Logging for JMH forks - implementation("org.slf4j:slf4j-simple:$slf4jVersion") + implementation(libsCatalog.findLibrary("slf4jSimple").get()) // Apache Kafka test-kit for embedded benchmark broker - implementation("org.apache.kafka:kafka_2.13:$kafkaVersion") + val kafkaVersion = libsCatalog.findVersion("kafka").get().requiredVersion + implementation(libsCatalog.findLibrary("kafkaScala213").get()) implementation("org.apache.kafka:kafka_2.13:$kafkaVersion:test") implementation("org.apache.kafka:kafka-clients:$kafkaVersion:test") implementation("org.apache.kafka:kafka-server-common:$kafkaVersion:test") - implementation("org.junit.jupiter:junit-jupiter-api:$junitVersion") + implementation(libsCatalog.findLibrary("junitJupiterApi").get()) } jmh { diff --git a/docker-compose.yaml b/docker-compose.yaml index 2c8a909..154e2b6 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,64 +1,50 @@ services: - - zookeeper: - image: confluentinc/cp-zookeeper:7.7.1 + kafka: + image: confluentinc/cp-kafka:8.2.0 ports: - - "2181:2181" - - "2888:2888" - - "3888:3888" + - "9092:9092" environment: - - ZOOKEEPER_CLIENT_PORT=2181 - - ZOOKEEPER_TICK_TIME=2000 - - ZOOKEEPER_4LW_COMMANDS_WHITELIST=srvr,ruok - - ZOOKEEPER_SERVER_ID=1 - - ZOOKEEPER_SERVERS=zookeeper:2888:3888 + - CLUSTER_ID=kpipe-cluster + - KAFKA_NODE_ID=1 + - KAFKA_PROCESS_ROLES=broker,controller + - KAFKA_CONTROLLER_QUORUM_VOTERS=1@kafka:9093 + - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 + - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 + - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT + - KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER + - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT + - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true + - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 + - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 + - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 + - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 volumes: - - zookeeper_data:/var/lib/zookeeper/data - - zookeeper_log:/var/lib/zookeeper/log + - kafka_data:/var/lib/kafka/data healthcheck: - test: ["CMD-SHELL", "echo srvr | nc -w 2 localhost 2181 | grep -q 'Mode:'"] + test: ["CMD-SHELL", "kafka-broker-api-versions --bootstrap-server localhost:9092 >/dev/null 2>&1"] interval: 10s timeout: 5s - retries: 8 - - kafka: - image: soldevelo/kafka:3.9.0 - ports: - - "9092:9092" - environment: - - KAFKA_BROKER_ID=1 - - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 - - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 - - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - - ALLOW_PLAINTEXT_LISTENER=yes - - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true - - KAFKA_CFG_ZOOKEEPER_CONNECTION_TIMEOUT_MS=60000 - volumes: - - kafka_data:/bitnami/kafka - - ./scripts:/scripts - command: [ "/scripts/wait-for-it.sh", "zookeeper:2181", "-t", "60", "--", "/opt/bitnami/scripts/kafka/entrypoint.sh", "/opt/bitnami/scripts/kafka/run.sh" ] - depends_on: - zookeeper: - condition: service_healthy + retries: 10 + start_period: 20s restart: on-failure kafka-init: - image: soldevelo/kafka:3.9.0 + image: confluentinc/cp-kafka:8.2.0 depends_on: - - kafka - command: > - sh -c " - echo 'Waiting for Kafka to be ready...' && - sleep 30 && - kafka-topics.sh --create --if-not-exists --bootstrap-server kafka:9092 --partitions 1 --replication-factor 1 --topic json-topic && - kafka-topics.sh --create --if-not-exists --bootstrap-server kafka:9092 --partitions 1 --replication-factor 1 --topic avro-topic && - kafka-topics.sh --create --if-not-exists --bootstrap-server kafka:9092 --partitions 1 --replication-factor 1 --topic proto-topic && + kafka: + condition: service_healthy + command: + - sh + - -ec + - | + kafka-topics --create --if-not-exists --bootstrap-server kafka:9092 --partitions 1 --replication-factor 1 --topic json-topic + kafka-topics --create --if-not-exists --bootstrap-server kafka:9092 --partitions 1 --replication-factor 1 --topic avro-topic + kafka-topics --create --if-not-exists --bootstrap-server kafka:9092 --partitions 1 --replication-factor 1 --topic proto-topic echo 'Topics created successfully' - " restart: on-failure schema-registry: - image: confluentinc/cp-schema-registry:7.7.1 + image: confluentinc/cp-schema-registry:8.2.0 container_name: schema-registry ports: - "8081:8081" @@ -68,16 +54,34 @@ services: - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=PLAINTEXT://kafka:9092 - SCHEMA_REGISTRY_KAFKASTORE_TOPIC=_schemas healthcheck: - test: [ "CMD-SHELL", "curl -fsS http://localhost:8081/subjects >/dev/null || exit 1" ] + test: [ "CMD-SHELL", "curl -fsS http://localhost:8081/subjects >/dev/null" ] interval: 10s timeout: 5s retries: 5 start_period: 15s depends_on: kafka: - condition: service_started + condition: service_healthy restart: unless-stopped + schema-init: + image: alpine:3.22 + depends_on: + kafka-init: + condition: service_completed_successfully + schema-registry: + condition: service_healthy + volumes: + - ./scripts/register-schema.sh:/scripts/register-schema.sh:ro + - ./lib/src/test/resources/avro/customer.avsc:/schemas/customer.avsc:ro + command: + - sh + - -ec + - | + apk add --no-cache curl jq + sh /scripts/register-schema.sh /schemas/customer.avsc com.kpipe.customer http://schema-registry:8081 + restart: "no" + kafka-consumer-app: build: context: . @@ -92,9 +96,9 @@ services: depends_on: schema-registry: condition: service_healthy + schema-init: + condition: service_completed_successfully restart: unless-stopped volumes: - zookeeper_data: - zookeeper_log: kafka_data: diff --git a/gradle.properties b/gradle.properties index 40138d6..1a0a7a5 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ org.gradle.caching=false org.gradle.parallel=false -org.gradle.workers.max=2 +org.gradle.workers.max=4 org.gradle.jvmargs=-Xmx4g -Xms4g -XX:MaxMetaspaceSize=1g -XX:+UseParallelGC -XX:+HeapDumpOnOutOfMemoryError diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml new file mode 100644 index 0000000..24000d4 --- /dev/null +++ b/gradle/libs.versions.toml @@ -0,0 +1,36 @@ +[versions] +kafka = "4.2.0" +avro = "1.12.1" +dslJson = "2.0.2" +slf4j = "2.0.9" +junit = "5.10.0" +junitPlatform = "1.10.0" +mockito = "5.18.0" +testcontainers = "2.0.3" +parallelConsumer = "0.5.3.0" + +[libraries] +kafkaClients = { module = "org.apache.kafka:kafka-clients", version.ref = "kafka" } +kafkaScala213 = { module = "org.apache.kafka:kafka_2.13", version.ref = "kafka" } +kafkaServerCommon = { module = "org.apache.kafka:kafka-server-common", version.ref = "kafka" } + +avro = { module = "org.apache.avro:avro", version.ref = "avro" } +dslJson = { module = "com.dslplatform:dsl-json", version.ref = "dslJson" } + +slf4jApi = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" } +slf4jSimple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" } + +junitBom = { module = "org.junit:junit-bom", version.ref = "junit" } +junitJupiter = { module = "org.junit.jupiter:junit-jupiter", version.ref = "junit" } +junitJupiterApi = { module = "org.junit.jupiter:junit-jupiter-api", version.ref = "junit" } +junitPlatformLauncher = { module = "org.junit.platform:junit-platform-launcher", version.ref = "junitPlatform" } + +mockitoCore = { module = "org.mockito:mockito-core", version.ref = "mockito" } +mockitoJunitJupiter = { module = "org.mockito:mockito-junit-jupiter", version.ref = "mockito" } + +testcontainers = { module = "org.testcontainers:testcontainers", version.ref = "testcontainers" } +testcontainersJunitJupiter = { module = "org.testcontainers:testcontainers-junit-jupiter", version.ref = "testcontainers" } +testcontainersKafka = { module = "org.testcontainers:testcontainers-kafka", version.ref = "testcontainers" } + +parallelConsumerCore = { module = "io.confluent.parallelconsumer:parallel-consumer-core", version.ref = "parallelConsumer" } + diff --git a/lib/build.gradle.kts b/lib/build.gradle.kts index aa69f1d..40584f1 100644 --- a/lib/build.gradle.kts +++ b/lib/build.gradle.kts @@ -1,3 +1,4 @@ +import org.gradle.api.artifacts.VersionCatalogsExtension import org.jreleaser.model.Active.ALWAYS import org.jreleaser.model.Active.NEVER @@ -9,6 +10,8 @@ plugins { id("org.jreleaser") version "1.19.0" } +val libsCatalog = rootProject.extensions.getByType().named("libs") + description = "KPipe - Functional Kafka Consumer Library" java { @@ -34,26 +37,26 @@ repositories { dependencies { // Kafka - implementation("org.apache.kafka:kafka-clients:3.9.1") + implementation(libsCatalog.findLibrary("kafkaClients").get()) // DSL-JSON - implementation("com.dslplatform:dsl-json:2.0.2") + implementation(libsCatalog.findLibrary("dslJson").get()) // Avro - implementation("org.apache.avro:avro:1.12.1") + implementation(libsCatalog.findLibrary("avro").get()) // SLF4J API only - implementation("org.slf4j:slf4j-api:2.0.9") + implementation(libsCatalog.findLibrary("slf4jApi").get()) // Testing - testImplementation(platform("org.junit:junit-bom:5.10.0")) - testImplementation("org.junit.jupiter:junit-jupiter") - testRuntimeOnly("org.junit.platform:junit-platform-launcher") + testImplementation(platform(libsCatalog.findLibrary("junitBom").get())) + testImplementation(libsCatalog.findLibrary("junitJupiter").get()) + testRuntimeOnly(libsCatalog.findLibrary("junitPlatformLauncher").get()) - testImplementation("org.mockito:mockito-core:5.18.0") - testImplementation("org.mockito:mockito-junit-jupiter:5.18.0") + testImplementation(libsCatalog.findLibrary("mockitoCore").get()) + testImplementation(libsCatalog.findLibrary("mockitoJunitJupiter").get()) - testImplementation("org.slf4j:slf4j-simple:2.0.9") + testImplementation(libsCatalog.findLibrary("slf4jSimple").get()) } tasks.test { diff --git a/lib/src/test/java/org/kpipe/consumer/FunctionalConsumerMockingTest.java b/lib/src/test/java/org/kpipe/consumer/FunctionalConsumerMockingTest.java index 206d59e..4d40c35 100644 --- a/lib/src/test/java/org/kpipe/consumer/FunctionalConsumerMockingTest.java +++ b/lib/src/test/java/org/kpipe/consumer/FunctionalConsumerMockingTest.java @@ -70,10 +70,11 @@ void shouldSubscribeToTopic() { offsetManager ); - functionalConsumer.start(); - - verify(mockConsumer).subscribe(topicCaptor.capture()); - assertEquals(List.of(TOPIC), topicCaptor.getValue()); + try (functionalConsumer) { + functionalConsumer.start(); + verify(mockConsumer).subscribe(topicCaptor.capture()); + assertEquals(List.of(TOPIC), topicCaptor.getValue()); + } } @Test @@ -884,27 +885,31 @@ void shouldIntegrateWithOffsetManager() { // Start consumer functionalConsumer.start(); - // Verify offset manager is started - verify(offsetManager).start(); + try { + // Verify offset manager is started + verify(offsetManager).start(); - // Simulate a command to track offset - var record = new ConsumerRecord<>(TOPIC, PARTITION, 123L, "key", "value"); - commandQueue.offer(ConsumerCommand.TRACK_OFFSET.withRecord(record)); + // Simulate a command to track offset + var record = new ConsumerRecord<>(TOPIC, PARTITION, 123L, "key", "value"); + commandQueue.offer(ConsumerCommand.TRACK_OFFSET.withRecord(record)); - // Process commands - functionalConsumer.processCommands(); + // Process commands + functionalConsumer.processCommands(); - // Verify offset is tracked - verify(offsetManager).trackOffset(record); + // Verify offset is tracked + verify(offsetManager).trackOffset(record); - // Simulate processing completion - commandQueue.offer(ConsumerCommand.MARK_OFFSET_PROCESSED.withRecord(record)); + // Simulate processing completion + commandQueue.offer(ConsumerCommand.MARK_OFFSET_PROCESSED.withRecord(record)); - // Process commands - functionalConsumer.processCommands(); + // Process commands + functionalConsumer.processCommands(); - // Verify offset is marked as processed - verify(offsetManager).markOffsetProcessed(record); + // Verify offset is marked as processed + verify(offsetManager, timeout(500)).markOffsetProcessed(record); + } finally { + functionalConsumer.close(); + } } @Test diff --git a/scripts/register-schema.sh b/scripts/register-schema.sh new file mode 100644 index 0000000..8da5b39 --- /dev/null +++ b/scripts/register-schema.sh @@ -0,0 +1,25 @@ +#!/usr/bin/env sh +set -eu + +SCHEMA_FILE="${1:-/schemas/customer.avsc}" +SUBJECT="${2:-com.kpipe.customer}" +REGISTRY_URL="${3:-http://schema-registry:8081}" + +until curl -fsS "${REGISTRY_URL}/subjects" >/dev/null; do + echo "Waiting for Schema Registry at ${REGISTRY_URL}..." + sleep 2 +done + +if curl -fsS "${REGISTRY_URL}/subjects/${SUBJECT}/versions/latest" >/dev/null 2>&1; then + echo "Schema subject ${SUBJECT} already exists, skipping." + exit 0 +fi + +PAYLOAD="$(jq -Rs '{schema: .}' "${SCHEMA_FILE}")" + +curl -fsS -X POST \ + -H "Content-Type: application/vnd.schemaregistry.v1+json" \ + --data "${PAYLOAD}" \ + "${REGISTRY_URL}/subjects/${SUBJECT}/versions" >/dev/null + +echo "Registered schema for subject ${SUBJECT}" diff --git a/scripts/wait-for-it.sh b/scripts/wait-for-it.sh deleted file mode 100755 index 3974640..0000000 --- a/scripts/wait-for-it.sh +++ /dev/null @@ -1,182 +0,0 @@ -#!/usr/bin/env bash -# Use this script to test if a given TCP host/port are available - -WAITFORIT_cmdname=${0##*/} - -echoerr() { if [[ $WAITFORIT_QUIET -ne 1 ]]; then echo "$@" 1>&2; fi } - -usage() -{ - cat << USAGE >&2 -Usage: - $WAITFORIT_cmdname host:port [-s] [-t timeout] [-- command args] - -h HOST | --host=HOST Host or IP under test - -p PORT | --port=PORT TCP port under test - Alternatively, you specify the host and port as host:port - -s | --strict Only execute subcommand if the test succeeds - -q | --quiet Don't output any status messages - -t TIMEOUT | --timeout=TIMEOUT - Timeout in seconds, zero for no timeout - -- COMMAND ARGS Execute command with args after the test finishes -USAGE - exit 1 -} - -wait_for() -{ - if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then - echoerr "$WAITFORIT_cmdname: waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT" - else - echoerr "$WAITFORIT_cmdname: waiting for $WAITFORIT_HOST:$WAITFORIT_PORT without a timeout" - fi - WAITFORIT_start_ts=$(date +%s) - while : - do - if [[ $WAITFORIT_ISBUSY -eq 1 ]]; then - nc -z $WAITFORIT_HOST $WAITFORIT_PORT - WAITFORIT_result=$? - else - (echo -n > /dev/tcp/$WAITFORIT_HOST/$WAITFORIT_PORT) >/dev/null 2>&1 - WAITFORIT_result=$? - fi - if [[ $WAITFORIT_result -eq 0 ]]; then - WAITFORIT_end_ts=$(date +%s) - echoerr "$WAITFORIT_cmdname: $WAITFORIT_HOST:$WAITFORIT_PORT is available after $((WAITFORIT_end_ts - WAITFORIT_start_ts)) seconds" - break - fi - sleep 1 - done - return $WAITFORIT_result -} - -wait_for_wrapper() -{ - # In order to support SIGINT during timeout: http://unix.stackexchange.com/a/57692 - if [[ $WAITFORIT_QUIET -eq 1 ]]; then - timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --quiet --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT & - else - timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT & - fi - WAITFORIT_PID=$! - trap "kill -INT -$WAITFORIT_PID" INT - wait $WAITFORIT_PID - WAITFORIT_RESULT=$? - if [[ $WAITFORIT_RESULT -ne 0 ]]; then - echoerr "$WAITFORIT_cmdname: timeout occurred after waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT" - fi - return $WAITFORIT_RESULT -} - -# process arguments -while [[ $# -gt 0 ]] -do - case "$1" in - *:* ) - WAITFORIT_hostport=(${1//:/ }) - WAITFORIT_HOST=${WAITFORIT_hostport[0]} - WAITFORIT_PORT=${WAITFORIT_hostport[1]} - shift 1 - ;; - --child) - WAITFORIT_CHILD=1 - shift 1 - ;; - -q | --quiet) - WAITFORIT_QUIET=1 - shift 1 - ;; - -s | --strict) - WAITFORIT_STRICT=1 - shift 1 - ;; - -h) - WAITFORIT_HOST="$2" - if [[ $WAITFORIT_HOST == "" ]]; then break; fi - shift 2 - ;; - --host=*) - WAITFORIT_HOST="${1#*=}" - shift 1 - ;; - -p) - WAITFORIT_PORT="$2" - if [[ $WAITFORIT_PORT == "" ]]; then break; fi - shift 2 - ;; - --port=*) - WAITFORIT_PORT="${1#*=}" - shift 1 - ;; - -t) - WAITFORIT_TIMEOUT="$2" - if [[ $WAITFORIT_TIMEOUT == "" ]]; then break; fi - shift 2 - ;; - --timeout=*) - WAITFORIT_TIMEOUT="${1#*=}" - shift 1 - ;; - --) - shift - WAITFORIT_CLI=("$@") - break - ;; - --help) - usage - ;; - *) - echoerr "Unknown argument: $1" - usage - ;; - esac -done - -if [[ "$WAITFORIT_HOST" == "" || "$WAITFORIT_PORT" == "" ]]; then - echoerr "Error: you need to provide a host and port to test." - usage -fi - -WAITFORIT_TIMEOUT=${WAITFORIT_TIMEOUT:-15} -WAITFORIT_STRICT=${WAITFORIT_STRICT:-0} -WAITFORIT_CHILD=${WAITFORIT_CHILD:-0} -WAITFORIT_QUIET=${WAITFORIT_QUIET:-0} - -# Check to see if timeout is from busybox? -WAITFORIT_TIMEOUT_PATH=$(type -p timeout) -WAITFORIT_TIMEOUT_PATH=$(realpath $WAITFORIT_TIMEOUT_PATH 2>/dev/null || readlink -f $WAITFORIT_TIMEOUT_PATH) - -WAITFORIT_BUSYTIMEFLAG="" -if [[ $WAITFORIT_TIMEOUT_PATH =~ "busybox" ]]; then - WAITFORIT_ISBUSY=1 - # Check if busybox timeout uses -t flag - # (recent Alpine versions don't support -t anymore) - if timeout &>/dev/stdout | grep -q -e '-t '; then - WAITFORIT_BUSYTIMEFLAG="-t" - fi -else - WAITFORIT_ISBUSY=0 -fi - -if [[ $WAITFORIT_CHILD -gt 0 ]]; then - wait_for - WAITFORIT_RESULT=$? - exit $WAITFORIT_RESULT -else - if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then - wait_for_wrapper - WAITFORIT_RESULT=$? - else - wait_for - WAITFORIT_RESULT=$? - fi -fi - -if [[ $WAITFORIT_CLI != "" ]]; then - if [[ $WAITFORIT_RESULT -ne 0 && $WAITFORIT_STRICT -eq 1 ]]; then - echoerr "$WAITFORIT_cmdname: strict mode, refusing to execute subprocess" - exit $WAITFORIT_RESULT - fi - exec "${WAITFORIT_CLI[@]}" -else - exit $WAITFORIT_RESULT -fi \ No newline at end of file