Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -662,13 +662,16 @@ If you want to use Avro with a schema registry, follow these steps:
curl -s http://localhost:8081/subjects/com.kpipe.customer/versions/latest | jq -r '.schema' | jq --indent 2 '.'

# Produce an Avro message using kafka-avro-console-producer
echo '{"id":1,"name":"Mariano Gonzalez","email":{"string":"mariano@example.com"},"active":true,"registrationDate":1635724800000,"address":{"com.kpipe.customer.Address":{"street":"123 Main St","city":"Chicago","zipCode":"00000","country":"USA"}},"tags":["premium","verified"],"preferences":{"notifications":"email"}}' \
| docker run -i --rm --network=host confluentinc/cp-schema-registry:7.7.1 \
kafka-avro-console-producer \
--bootstrap-server localhost:9092 \
--topic avro-topic \
--property schema.registry.url=http://localhost:8081 \
--property value.schema.id=1
cat <<'JSON' | docker run -i --rm --network kpipe_default \
-v "$PWD/lib/src/test/resources/avro/customer.avsc:/tmp/customer.avsc:ro" \
confluentinc/cp-schema-registry:8.2.0 \
sh -ec 'kafka-avro-console-producer \
--bootstrap-server kafka:9092 \
--topic avro-topic \
--property schema.registry.url=http://schema-registry:8081 \
--property value.schema="$(cat /tmp/customer.avsc)"'
{"id":1,"name":"Mariano Gonzalez","email":{"string":"mariano@example.com"},"active":true,"registrationDate":1635724800000,"address":{"com.kpipe.customer.Address":{"street":"123 Main St","city":"Chicago","zipCode":"00000","country":"USA"}},"tags":["premium","verified"],"preferences":{"notifications":"email"}}
JSON
```

Kafka consumer will:
Expand Down
5 changes: 4 additions & 1 deletion app/avro/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar
import org.gradle.api.artifacts.VersionCatalogsExtension

tasks.named<ShadowJar>("shadowJar") {
archiveClassifier.set("all")
Expand All @@ -9,8 +10,10 @@ tasks.named<ShadowJar>("shadowJar") {
}
}

val libsCatalog = rootProject.extensions.getByType<VersionCatalogsExtension>().named("libs")

description = "KPipe - Kafka Consumer Application Using Avro"

dependencies {
implementation("org.apache.avro:avro:1.12.1")
implementation(libsCatalog.findLibrary("avro").get())
}
195 changes: 138 additions & 57 deletions app/avro/src/test/java/org/kpipe/AppIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,25 @@
import static org.junit.jupiter.api.Assertions.*;

import com.dslplatform.json.DslJson;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpServer;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.lang.System.Logger;
import java.lang.System.Logger.Level;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
Expand All @@ -29,63 +36,78 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.kpipe.config.AppConfig;
import org.kpipe.sink.MessageSink;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.utility.DockerImageName;

@Testcontainers
class AppIntegrationTest {

private static final Logger log = System.getLogger(AppIntegrationTest.class.getName());

static Network network = Network.newNetwork();
private static final String CONFLUENT_PLATFORM_VERSION = System.getProperty("confluentPlatformVersion", "8.2.0");
private static final String AVRO_TOPIC = "avro-topic";
private static final String SCHEMA_CONTENT_TYPE = "application/vnd.schemaregistry.v1+json";
private static final String SCHEMA_SUBJECT = "com.kpipe.customer";
private static final String SCHEMA_VERSIONS_PATH = "/subjects/" + SCHEMA_SUBJECT + "/versions";
private static final String SCHEMA_LATEST_PATH = SCHEMA_VERSIONS_PATH + "/latest";
private static final String SUBJECTS_EMPTY_JSON = """
[]
""";
private static final String REGISTERED_SCHEMA_RESPONSE_JSON = """
{"id":1}
""";
private static final String SCHEMA_NOT_FOUND_JSON = """
{"error_code":40403,"message":"Schema not found"}
""";
private static final String ROUTE_NOT_FOUND_JSON = """
{"message":"Not found"}
""";
private static final DslJson<Object> JSON = new DslJson<>();
private static final HttpClient HTTP_CLIENT = HttpClient.newHttpClient();
private static volatile String latestSchema;
private static HttpServer schemaRegistryStub;

@Container
static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.7.1").asCompatibleSubstituteFor("apache/kafka")
static ConfluentKafkaContainer kafka = new ConfluentKafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:%s".formatted(CONFLUENT_PLATFORM_VERSION))
)
.withNetwork(network)
.withNetworkAliases("kafka")
.waitingFor(Wait.forListeningPort());
.withStartupAttempts(3);

@Container
static GenericContainer<?> schemaRegistry = new GenericContainer<>(
DockerImageName
.parse("confluentinc/cp-schema-registry:7.7.1")
.asCompatibleSubstituteFor("confluentinc/cp-schema-registry")
)
.withNetwork(network)
.withNetworkAliases("schema-registry")
.withExposedPorts(8081)
.withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry")
.withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://kafka:9092")
.dependsOn(kafka)
.waitingFor(Wait.forHttp("/subjects").forStatusCode(200).withStartupTimeout(Duration.ofMinutes(2)));
@BeforeAll
static void startSchemaRegistryStub() throws Exception {
schemaRegistryStub = HttpServer.create(new InetSocketAddress(0), 0);
schemaRegistryStub.createContext("/subjects", AppIntegrationTest::handleSchemaStubRequest);
schemaRegistryStub.start();
}

@AfterAll
static void stopSchemaRegistryStub() {
if (schemaRegistryStub != null) schemaRegistryStub.stop(0);
latestSchema = null;
}

@Test
void testAvroAppEndToEnd() throws Exception {
final var topic = "avro-topic";
final var srUrl = "http://%s:%d".formatted(schemaRegistry.getHost(), schemaRegistry.getMappedPort(8081));
final var srUrl = "http://localhost:%d".formatted(schemaRegistryStub.getAddress().getPort());

// Load and register schema before App construction (App fetches latest schema during init).
final Schema schema;
try (final var is = getClass().getClassLoader().getResourceAsStream("avro/customer.avsc")) {
assertNotNull(is, "Schema file not found");
schema = new Schema.Parser().parse(is);
}
registerSchema(srUrl, "com.kpipe.customer", schema.toString());
registerSchema(srUrl, schema.toString());

final var config = new AppConfig(
kafka.getBootstrapServers(),
"test-group",
topic,
AVRO_TOPIC,
"avro-app",
Duration.ofMillis(100),
Duration.ofSeconds(1),
Expand All @@ -111,34 +133,14 @@ void testAvroAppEndToEnd() throws Exception {
}
});

// Produce an Avro message with Confluent Wire Format (Magic Byte 0 + Schema ID)
final var record = new GenericData.Record(schema);
record.put("id", 1L);
record.put("name", "Test User");
record.put("email", "test.user@example.com");
record.put("active", true);
record.put("registrationDate", System.currentTimeMillis());
record.put("address", null);
record.put("tags", new GenericData.Array<>(schema.getField("tags").schema(), Collections.emptyList()));
record.put("preferences", Collections.emptyMap());

final var out = new ByteArrayOutputStream();
out.write(0); // Magic byte
out.write(ByteBuffer.allocate(4).putInt(1).array()); // Schema ID 1

final var encoder = EncoderFactory.get().binaryEncoder(out, null);
final var writer = new GenericDatumWriter<GenericRecord>(schema);
writer.write(record, encoder);
encoder.flush();

final var producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

// Retry produce during the warm-up window so we do not miss messages while the consumer
// finishes initial group assignment.
produceUntilConsumed(out.toByteArray(), producerProps, capturingSink, Duration.ofSeconds(10));
produceUntilConsumed(createConfluentWirePayload(schema), producerProps, capturingSink, Duration.ofSeconds(10));

// Verify
assertTrue(appThread.isAlive());
Expand All @@ -158,6 +160,76 @@ final var record = new GenericData.Record(schema);
}
}

private static byte[] createConfluentWirePayload(final Schema schema) throws Exception {
final var record = new GenericData.Record(schema);
record.put("id", 1L);
record.put("name", "Test User");
record.put("email", "test.user@example.com");
record.put("active", true);
record.put("registrationDate", System.currentTimeMillis());
record.put("address", null);
record.put("tags", new GenericData.Array<>(schema.getField("tags").schema(), Collections.emptyList()));
record.put("preferences", Collections.emptyMap());

final var out = new ByteArrayOutputStream();
out.write(0); // Magic byte
out.write(ByteBuffer.allocate(4).putInt(1).array()); // Schema ID 1

final var encoder = EncoderFactory.get().binaryEncoder(out, null);
final var writer = new GenericDatumWriter<GenericRecord>(schema);
writer.write(record, encoder);
encoder.flush();
return out.toByteArray();
}

private static void handleSchemaStubRequest(final HttpExchange exchange) {
try (exchange) {
final var method = exchange.getRequestMethod();
final var path = exchange.getRequestURI().getPath();

if ("GET".equals(method) && "/subjects".equals(path)) {
writeJson(exchange, 200, SUBJECTS_EMPTY_JSON);
return;
}

if ("POST".equals(method) && SCHEMA_VERSIONS_PATH.equals(path)) {
final var bodyBytes = exchange.getRequestBody().readAllBytes();
final var payload = JSON.deserialize(Map.class, new ByteArrayInputStream(bodyBytes));
latestSchema = payload != null && payload.get("schema") != null ? payload.get("schema").toString() : null;
writeJson(exchange, 200, REGISTERED_SCHEMA_RESPONSE_JSON);
return;
}

if ("GET".equals(method) && SCHEMA_LATEST_PATH.equals(path)) {
if (latestSchema == null) {
writeJson(exchange, 404, SCHEMA_NOT_FOUND_JSON);
return;
}

final var response = new HashMap<String, Object>();
response.put("subject", SCHEMA_SUBJECT);
response.put("version", 1);
response.put("id", 1);
response.put("schema", latestSchema);

final byte[] json;
try (final var out = new ByteArrayOutputStream()) {
JSON.serialize(response, out);
json = out.toByteArray();
}

exchange.getResponseHeaders().set("Content-Type", SCHEMA_CONTENT_TYPE);
exchange.sendResponseHeaders(200, json.length);
exchange.getResponseBody().write(json);
return;
}

writeJson(exchange, 404, ROUTE_NOT_FOUND_JSON);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}

private static void produceUntilConsumed(
final byte[] payload,
final Properties producerProps,
Expand All @@ -167,38 +239,47 @@ private static void produceUntilConsumed(
final var deadline = System.nanoTime() + timeout.toNanos();
try (final var producer = new KafkaProducer<byte[], byte[]>(producerProps)) {
while (System.nanoTime() < deadline) {
producer.send(new ProducerRecord<>("avro-topic", payload)).get();
producer.send(new ProducerRecord<>(AVRO_TOPIC, payload)).get();
if (sink.size() >= 1) return;
TimeUnit.MILLISECONDS.sleep(250);
}
}
throw new AssertionError("Timed out waiting for consumer to receive produced message(s)");
}

private static void registerSchema(final String schemaRegistryUrl, final String subject, final String schemaJson)
throws Exception {
final var json = new DslJson<>();
private static void registerSchema(final String schemaRegistryUrl, final String schemaJson) throws Exception {
final var payloadMap = Collections.singletonMap("schema", schemaJson);
final byte[] payload;
try (final var out = new ByteArrayOutputStream()) {
json.serialize(payloadMap, out);
JSON.serialize(payloadMap, out);
payload = out.toByteArray();
}

final var request = HttpRequest
.newBuilder()
.uri(URI.create("%s/subjects/%s/versions".formatted(schemaRegistryUrl, subject)))
.header("Content-Type", "application/vnd.schemaregistry.v1+json")
.uri(URI.create("%s%s".formatted(schemaRegistryUrl, SCHEMA_VERSIONS_PATH)))
.header("Content-Type", SCHEMA_CONTENT_TYPE)
.POST(HttpRequest.BodyPublishers.ofByteArray(payload))
.build();

final var response = HttpClient.newHttpClient().send(request, HttpResponse.BodyHandlers.ofString());
final var response = HTTP_CLIENT.send(request, HttpResponse.BodyHandlers.ofString());
assertTrue(
response.statusCode() == 200 || response.statusCode() == 409,
"Schema registration failed: HTTP %d body=%s".formatted(response.statusCode(), response.body())
);
}

private static void writeJson(final HttpExchange exchange, final int status, final String body) {
try {
final var payload = body.getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().set("Content-Type", SCHEMA_CONTENT_TYPE);
exchange.sendResponseHeaders(status, payload.length);
exchange.getResponseBody().write(payload);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}

private static class CapturingSink implements MessageSink<byte[], byte[]> {

private final List<byte[]> messages = new ArrayList<>();
Expand Down
27 changes: 12 additions & 15 deletions app/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,32 +1,29 @@
import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar
import org.gradle.api.artifacts.VersionCatalogsExtension
import org.gradle.api.tasks.testing.logging.TestExceptionFormat.FULL

plugins {
java
id("com.gradleup.shadow") version "8.3.5" apply false
}

val kafkaVersion = "3.9.0"
val slf4jVersion = "2.0.9"
val junitVersion = "5.10.0"
val testcontainersVersion = "2.0.3"
val dslJsonVersion = "2.0.2"
val libsCatalog = rootProject.extensions.getByType<VersionCatalogsExtension>().named("libs")

subprojects {
apply(plugin = "java")
apply(plugin = "com.gradleup.shadow")

dependencies {
"implementation"(project(":lib"))
"implementation"("org.apache.kafka:kafka-clients:$kafkaVersion")
"implementation"("org.slf4j:slf4j-simple:$slf4jVersion")

"testImplementation"("org.junit.jupiter:junit-jupiter:$junitVersion")
"testImplementation"("org.testcontainers:testcontainers:$testcontainersVersion")
"testImplementation"("org.testcontainers:testcontainers-junit-jupiter:$testcontainersVersion")
"testImplementation"("org.testcontainers:testcontainers-kafka:$testcontainersVersion")
"testImplementation"("com.dslplatform:dsl-json:$dslJsonVersion")
"testRuntimeOnly"("org.junit.platform:junit-platform-launcher")
add("implementation", project(":lib"))
add("implementation", libsCatalog.findLibrary("kafkaClients").get())
add("implementation", libsCatalog.findLibrary("slf4jSimple").get())

add("testImplementation", libsCatalog.findLibrary("junitJupiter").get())
add("testImplementation", libsCatalog.findLibrary("testcontainers").get())
add("testImplementation", libsCatalog.findLibrary("testcontainersJunitJupiter").get())
add("testImplementation", libsCatalog.findLibrary("testcontainersKafka").get())
add("testImplementation", libsCatalog.findLibrary("dslJson").get())
add("testRuntimeOnly", libsCatalog.findLibrary("junitPlatformLauncher").get())
}

tasks.withType<Test> {
Expand Down
Loading