From 867190003d667a37886be3712cbf9bce88e68bc5 Mon Sep 17 00:00:00 2001 From: singh-viikram Date: Thu, 12 Sep 2024 15:44:45 +0530 Subject: [PATCH 1/7] Adding custom proto serdes --- kafka-streams-serdes/build.gradle.kts | 2 ++ .../serdes/proto/ProtoDeserializer.java | 24 +++++++++++++++++++ .../serdes/proto/ProtoSerializer.java | 11 +++++++++ 3 files changed, 37 insertions(+) create mode 100644 kafka-streams-serdes/src/main/java/org/hypertrace/core/kafkastreams/framework/serdes/proto/ProtoDeserializer.java create mode 100644 kafka-streams-serdes/src/main/java/org/hypertrace/core/kafkastreams/framework/serdes/proto/ProtoSerializer.java diff --git a/kafka-streams-serdes/build.gradle.kts b/kafka-streams-serdes/build.gradle.kts index 3dfc3f8..1be4633 100644 --- a/kafka-streams-serdes/build.gradle.kts +++ b/kafka-streams-serdes/build.gradle.kts @@ -1,6 +1,7 @@ plugins { `java-library` jacoco + id("com.google.protobuf") version "0.9.4" id("org.hypertrace.avro-plugin") id("org.hypertrace.publish-plugin") id("org.hypertrace.jacoco-report-plugin") @@ -15,6 +16,7 @@ dependencies { api("org.apache.kafka:kafka-clients") api("org.apache.avro:avro") + implementation("com.google.protobuf:protobuf-java:3.23.0") // Adjust the version as needed testImplementation("org.junit.jupiter:junit-jupiter:5.8.2") } diff --git a/kafka-streams-serdes/src/main/java/org/hypertrace/core/kafkastreams/framework/serdes/proto/ProtoDeserializer.java b/kafka-streams-serdes/src/main/java/org/hypertrace/core/kafkastreams/framework/serdes/proto/ProtoDeserializer.java new file mode 100644 index 0000000..82ee569 --- /dev/null +++ b/kafka-streams-serdes/src/main/java/org/hypertrace/core/kafkastreams/framework/serdes/proto/ProtoDeserializer.java @@ -0,0 +1,24 @@ +package org.hypertrace.core.kafkastreams.framework.serdes.proto; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.Parser; +import org.apache.kafka.common.serialization.Deserializer; + +public class ProtoDeserializer implements Deserializer { + + private final Parser parser; + + public ProtoDeserializer(Parser parser) { + this.parser = parser; + } + + @Override + public T deserialize(String s, byte[] bytes) { + try { + return parser.parseFrom(bytes); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } +} diff --git a/kafka-streams-serdes/src/main/java/org/hypertrace/core/kafkastreams/framework/serdes/proto/ProtoSerializer.java b/kafka-streams-serdes/src/main/java/org/hypertrace/core/kafkastreams/framework/serdes/proto/ProtoSerializer.java new file mode 100644 index 0000000..0d1754f --- /dev/null +++ b/kafka-streams-serdes/src/main/java/org/hypertrace/core/kafkastreams/framework/serdes/proto/ProtoSerializer.java @@ -0,0 +1,11 @@ +package org.hypertrace.core.kafkastreams.framework.serdes.proto; + +import com.google.protobuf.Message; +import org.apache.kafka.common.serialization.Serializer; + +public class ProtoSerializer implements Serializer { + @Override + public byte[] serialize(String topic, T data) { + return data.toByteArray(); + } +} From 147cd04f7091a23cbd9dd52b3a617cf28912399c Mon Sep 17 00:00:00 2001 From: singh-viikram Date: Thu, 12 Sep 2024 17:41:54 +0530 Subject: [PATCH 2/7] resolved comments --- kafka-bom/build.gradle.kts | 2 +- kafka-streams-serdes/build.gradle.kts | 3 +- .../serdes/proto/ProtoDeserializer.java | 37 +++++++++++++++++++ 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/kafka-bom/build.gradle.kts b/kafka-bom/build.gradle.kts index 5d47c6d..1f4d72d 100644 --- a/kafka-bom/build.gradle.kts +++ b/kafka-bom/build.gradle.kts @@ -15,7 +15,7 @@ dependencies { because("[https://nvd.nist.gov/vuln/detail/CVE-2023-34455] in 'org.apache.kafka:kafka-clients:*'") because("[https://nvd.nist.gov/vuln/detail/CVE-2023-43642]") } - api("com.google.protobuf:protobuf-java-util:3.21.7") { + api("com.google.protobuf:protobuf-java-util:$protobufVersion") { because("https://nvd.nist.gov/vuln/detail/CVE-2022-3171") } api("com.squareup.okio:okio:3.4.0") { diff --git a/kafka-streams-serdes/build.gradle.kts b/kafka-streams-serdes/build.gradle.kts index 1be4633..bbe9bb8 100644 --- a/kafka-streams-serdes/build.gradle.kts +++ b/kafka-streams-serdes/build.gradle.kts @@ -1,7 +1,6 @@ plugins { `java-library` jacoco - id("com.google.protobuf") version "0.9.4" id("org.hypertrace.avro-plugin") id("org.hypertrace.publish-plugin") id("org.hypertrace.jacoco-report-plugin") @@ -16,7 +15,7 @@ dependencies { api("org.apache.kafka:kafka-clients") api("org.apache.avro:avro") - implementation("com.google.protobuf:protobuf-java:3.23.0") // Adjust the version as needed + api("com.google.protobuf:protobuf-java-util") testImplementation("org.junit.jupiter:junit-jupiter:5.8.2") } diff --git a/kafka-streams-serdes/src/main/java/org/hypertrace/core/kafkastreams/framework/serdes/proto/ProtoDeserializer.java b/kafka-streams-serdes/src/main/java/org/hypertrace/core/kafkastreams/framework/serdes/proto/ProtoDeserializer.java index 82ee569..c2855b8 100644 --- a/kafka-streams-serdes/src/main/java/org/hypertrace/core/kafkastreams/framework/serdes/proto/ProtoDeserializer.java +++ b/kafka-streams-serdes/src/main/java/org/hypertrace/core/kafkastreams/framework/serdes/proto/ProtoDeserializer.java @@ -5,6 +5,43 @@ import com.google.protobuf.Parser; import org.apache.kafka.common.serialization.Deserializer; +/** + * Custom Proto Deserializer for Kafka. + * + *

This class provides a deserialization mechanism for Kafka messages using Protocol Buffers + * without schema validation. It extends the Kafka Deserializer interface and allows for direct + * deserialization of byte arrays into Proto message objects by utilizing the provided Parser for + * the specific Proto message type. + * + *

Motivation: In setups where both producers and consumers use the same Proto schemas, the need + * for schema validation becomes redundant. The built-in {@code kafkaProtoSerdes} from Confluent + * performs schema validation via the schema registry service, which introduces overhead. This + * custom deserializer eliminates that overhead, simplifying the processing flow by bypassing schema + * validation. + * + * + *

Usage: To use this class, create a subclass specifying the Proto message type, pass the + * corresponding Parser to the superclass constructor, and configure Kafka to use the custom + * deserializer. + * + *

Example: + * + *

{@code
+ * public class MyProtoMessageDeserializer extends ProtoDeserializer {
+ *     public MyProtoMessageDeserializer() {
+ *         super(MyProtoMessage.parser());
+ *     }
+ * }
+ * }
+ * + * Then, configure Kafka to use this deserializer: + * + *
{@code
+ * key.deserializer=com.example.MyProtoMessageDeserializer
+ * }
+ * + * @param The Proto message type to be deserialized. + */ public class ProtoDeserializer implements Deserializer { private final Parser parser; From c140cbc869d28361a688c1e145c85c94481525bc Mon Sep 17 00:00:00 2001 From: singh-viikram Date: Thu, 12 Sep 2024 18:04:29 +0530 Subject: [PATCH 3/7] nits --- .../serdes/proto/ProtoDeserializer.java | 12 +++---- .../serdes/proto/ProtoSerializer.java | 34 +++++++++++++++++++ 2 files changed, 40 insertions(+), 6 deletions(-) diff --git a/kafka-streams-serdes/src/main/java/org/hypertrace/core/kafkastreams/framework/serdes/proto/ProtoDeserializer.java b/kafka-streams-serdes/src/main/java/org/hypertrace/core/kafkastreams/framework/serdes/proto/ProtoDeserializer.java index c2855b8..0280894 100644 --- a/kafka-streams-serdes/src/main/java/org/hypertrace/core/kafkastreams/framework/serdes/proto/ProtoDeserializer.java +++ b/kafka-streams-serdes/src/main/java/org/hypertrace/core/kafkastreams/framework/serdes/proto/ProtoDeserializer.java @@ -13,12 +13,12 @@ * deserialization of byte arrays into Proto message objects by utilizing the provided Parser for * the specific Proto message type. * - *

Motivation: In setups where both producers and consumers use the same Proto schemas, the need - * for schema validation becomes redundant. The built-in {@code kafkaProtoSerdes} from Confluent - * performs schema validation via the schema registry service, which introduces overhead. This - * custom deserializer eliminates that overhead, simplifying the processing flow by bypassing schema - * validation. - * + *

Motivation: Since the proto. configurations are usually shared between the producer and the + * consumers,the field descriptors are well-known to both the parties. In cases when there are other + * mechanisms to validate proto. compatibilities schema validation becomes redundant and this class + * can be used in such cases. The built-in {@code kafkaProtoSerdes} from Confluent performs schema + * validation via the schema registry service, which introduces overhead. This custom deserializer + * eliminates that overhead, simplifying the processing flow by bypassing schema validation. * *

Usage: To use this class, create a subclass specifying the Proto message type, pass the * corresponding Parser to the superclass constructor, and configure Kafka to use the custom diff --git a/kafka-streams-serdes/src/main/java/org/hypertrace/core/kafkastreams/framework/serdes/proto/ProtoSerializer.java b/kafka-streams-serdes/src/main/java/org/hypertrace/core/kafkastreams/framework/serdes/proto/ProtoSerializer.java index 0d1754f..edc86eb 100644 --- a/kafka-streams-serdes/src/main/java/org/hypertrace/core/kafkastreams/framework/serdes/proto/ProtoSerializer.java +++ b/kafka-streams-serdes/src/main/java/org/hypertrace/core/kafkastreams/framework/serdes/proto/ProtoSerializer.java @@ -3,6 +3,40 @@ import com.google.protobuf.Message; import org.apache.kafka.common.serialization.Serializer; +/** + * Custom Proto Serializer for Kafka. + * + *

This class provides a serialization mechanism for Kafka messages using Protocol Buffers + * without schema validation. It extends the Kafka Serializer interface and allows for direct + * serialization of byte arrays into Proto message objects by utilizing the provided Parser for the + * specific Proto message type. + * + *

Motivation: Since the proto. configurations are usually shared between the producer and the + * consumers,the field descriptors are well-known to both the parties. In cases when there are other + * mechanisms to validate proto. compatibilities schema validation becomes redundant and this class + * can be used in such cases. The built-in {@code kafkaProtoSerdes} from Confluent performs schema + * validation via the schema registry service, which introduces overhead. This custom serializer + * eliminates that overhead, simplifying the processing flow by bypassing schema validation. + * + *

Usage: To use this class, create a subclass specifying the Proto message type, and configure + * Kafka to use the custom serializer. + * + *

Example: + * + *

{@code
+ * public class MyProtoMessageSerializer extends ProtoSerializer {
+ *
+ * }
+ * }
+ * + * Then, configure Kafka to use this serializer: + * + *
{@code
+ * key.serializer=com.example.MyProtoMessageSerializer
+ * }
+ * + * @param The Proto message type to be serialized. + */ public class ProtoSerializer implements Serializer { @Override public byte[] serialize(String topic, T data) { From ccb682d8b47f275d665cac1c85b3ed6bd9dc57eb Mon Sep 17 00:00:00 2001 From: singh-viikram Date: Fri, 13 Sep 2024 11:56:35 +0530 Subject: [PATCH 4/7] Added tests for proto serdes --- kafka-streams-serdes/build.gradle.kts | 9 +++++ .../framework/serdes/ProtoSerdeTest.java | 38 +++++++++++++++++++ .../src/test/proto/TestProtoRecord.proto | 7 ++++ 3 files changed, 54 insertions(+) create mode 100644 kafka-streams-serdes/src/test/java/org/hypertrace/core/kafkastreams/framework/serdes/ProtoSerdeTest.java create mode 100644 kafka-streams-serdes/src/test/proto/TestProtoRecord.proto diff --git a/kafka-streams-serdes/build.gradle.kts b/kafka-streams-serdes/build.gradle.kts index bbe9bb8..d43a872 100644 --- a/kafka-streams-serdes/build.gradle.kts +++ b/kafka-streams-serdes/build.gradle.kts @@ -1,6 +1,7 @@ plugins { `java-library` jacoco + id("com.google.protobuf") version "0.9.3" id("org.hypertrace.avro-plugin") id("org.hypertrace.publish-plugin") id("org.hypertrace.jacoco-report-plugin") @@ -24,3 +25,11 @@ dependencies { tasks.named("avroCompatibilityCheck") { enabled = false } + +sourceSets { + test { + java { + srcDirs("src/test/proto") // Generated Proto classes + } + } +} diff --git a/kafka-streams-serdes/src/test/java/org/hypertrace/core/kafkastreams/framework/serdes/ProtoSerdeTest.java b/kafka-streams-serdes/src/test/java/org/hypertrace/core/kafkastreams/framework/serdes/ProtoSerdeTest.java new file mode 100644 index 0000000..c94c7e9 --- /dev/null +++ b/kafka-streams-serdes/src/test/java/org/hypertrace/core/kafkastreams/framework/serdes/ProtoSerdeTest.java @@ -0,0 +1,38 @@ +package org.hypertrace.core.kafkastreams.framework.serdes; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.hypertrace.core.kafkastreams.framework.serdes.proto.ProtoDeserializer; +import org.hypertrace.core.kafkastreams.framework.serdes.proto.ProtoSerializer; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import proto.TestProtoRecordOuterClass.TestProtoRecord; + +public class ProtoSerdeTest { + + private static final String TEST_TOPIC = "test-topic"; + + // Subclass for testing with proto deserialization + public static class TestProtoRecordDeserializer extends ProtoDeserializer { + public TestProtoRecordDeserializer() { + super(TestProtoRecord.parser()); + } + } + + @Test + public void testSerialize() { + Serializer serializer = new ProtoSerializer<>(); + + Deserializer deserializer = new TestProtoRecordDeserializer(); + TestProtoRecord message = TestProtoRecord.newBuilder().setId("id").build(); + + byte[] serializedData = serializer.serialize(TEST_TOPIC, message); + + Assertions.assertNotNull(serializedData); + Assertions.assertTrue(serializedData.length > 0); + + TestProtoRecord deserializedMessage = deserializer.deserialize(TEST_TOPIC, serializedData); + + Assertions.assertEquals(message.getId(), deserializedMessage.getId()); + } +} diff --git a/kafka-streams-serdes/src/test/proto/TestProtoRecord.proto b/kafka-streams-serdes/src/test/proto/TestProtoRecord.proto new file mode 100644 index 0000000..4a77001 --- /dev/null +++ b/kafka-streams-serdes/src/test/proto/TestProtoRecord.proto @@ -0,0 +1,7 @@ +syntax = "proto3"; + +package proto; + +message TestProtoRecord { + string id = 1; +} From 31576cf7549e45f2a6a808013efc90b72df70dfc Mon Sep 17 00:00:00 2001 From: singh-viikram Date: Fri, 13 Sep 2024 12:11:32 +0530 Subject: [PATCH 5/7] nits --- kafka-streams-serdes/build.gradle.kts | 9 --------- .../framework/serdes/ProtoSerdeTest.java | 16 ++++++++-------- .../src/test/proto/TestProtoRecord.proto | 7 ------- 3 files changed, 8 insertions(+), 24 deletions(-) delete mode 100644 kafka-streams-serdes/src/test/proto/TestProtoRecord.proto diff --git a/kafka-streams-serdes/build.gradle.kts b/kafka-streams-serdes/build.gradle.kts index d43a872..bbe9bb8 100644 --- a/kafka-streams-serdes/build.gradle.kts +++ b/kafka-streams-serdes/build.gradle.kts @@ -1,7 +1,6 @@ plugins { `java-library` jacoco - id("com.google.protobuf") version "0.9.3" id("org.hypertrace.avro-plugin") id("org.hypertrace.publish-plugin") id("org.hypertrace.jacoco-report-plugin") @@ -25,11 +24,3 @@ dependencies { tasks.named("avroCompatibilityCheck") { enabled = false } - -sourceSets { - test { - java { - srcDirs("src/test/proto") // Generated Proto classes - } - } -} diff --git a/kafka-streams-serdes/src/test/java/org/hypertrace/core/kafkastreams/framework/serdes/ProtoSerdeTest.java b/kafka-streams-serdes/src/test/java/org/hypertrace/core/kafkastreams/framework/serdes/ProtoSerdeTest.java index c94c7e9..7367e56 100644 --- a/kafka-streams-serdes/src/test/java/org/hypertrace/core/kafkastreams/framework/serdes/ProtoSerdeTest.java +++ b/kafka-streams-serdes/src/test/java/org/hypertrace/core/kafkastreams/framework/serdes/ProtoSerdeTest.java @@ -1,38 +1,38 @@ package org.hypertrace.core.kafkastreams.framework.serdes; +import com.google.protobuf.Value; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.hypertrace.core.kafkastreams.framework.serdes.proto.ProtoDeserializer; import org.hypertrace.core.kafkastreams.framework.serdes.proto.ProtoSerializer; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import proto.TestProtoRecordOuterClass.TestProtoRecord; public class ProtoSerdeTest { private static final String TEST_TOPIC = "test-topic"; // Subclass for testing with proto deserialization - public static class TestProtoRecordDeserializer extends ProtoDeserializer { + public static class TestProtoRecordDeserializer extends ProtoDeserializer { public TestProtoRecordDeserializer() { - super(TestProtoRecord.parser()); + super(Value.parser()); } } @Test public void testSerialize() { - Serializer serializer = new ProtoSerializer<>(); + Serializer serializer = new ProtoSerializer<>(); - Deserializer deserializer = new TestProtoRecordDeserializer(); - TestProtoRecord message = TestProtoRecord.newBuilder().setId("id").build(); + Deserializer deserializer = new TestProtoRecordDeserializer(); + Value message = Value.newBuilder().setStringValue("id").build(); byte[] serializedData = serializer.serialize(TEST_TOPIC, message); Assertions.assertNotNull(serializedData); Assertions.assertTrue(serializedData.length > 0); - TestProtoRecord deserializedMessage = deserializer.deserialize(TEST_TOPIC, serializedData); + Value deserializedMessage = deserializer.deserialize(TEST_TOPIC, serializedData); - Assertions.assertEquals(message.getId(), deserializedMessage.getId()); + Assertions.assertEquals(message.getStringValue(), deserializedMessage.getStringValue()); } } diff --git a/kafka-streams-serdes/src/test/proto/TestProtoRecord.proto b/kafka-streams-serdes/src/test/proto/TestProtoRecord.proto deleted file mode 100644 index 4a77001..0000000 --- a/kafka-streams-serdes/src/test/proto/TestProtoRecord.proto +++ /dev/null @@ -1,7 +0,0 @@ -syntax = "proto3"; - -package proto; - -message TestProtoRecord { - string id = 1; -} From 7cf6d31917487df070d9030f086a38b6fc8ec4b9 Mon Sep 17 00:00:00 2001 From: singh-viikram Date: Fri, 13 Sep 2024 12:25:32 +0530 Subject: [PATCH 6/7] nits --- kafka-streams-serdes/build.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-streams-serdes/build.gradle.kts b/kafka-streams-serdes/build.gradle.kts index bbe9bb8..1b7c659 100644 --- a/kafka-streams-serdes/build.gradle.kts +++ b/kafka-streams-serdes/build.gradle.kts @@ -15,7 +15,7 @@ dependencies { api("org.apache.kafka:kafka-clients") api("org.apache.avro:avro") - api("com.google.protobuf:protobuf-java-util") + api("com.google.protobuf:protobuf-java-util:3.21.7") testImplementation("org.junit.jupiter:junit-jupiter:5.8.2") } From 2a40d9535bb2bca0e23425774d25cd918c7281a7 Mon Sep 17 00:00:00 2001 From: singh-viikram Date: Fri, 13 Sep 2024 12:34:04 +0530 Subject: [PATCH 7/7] nits --- kafka-streams-serdes/build.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-streams-serdes/build.gradle.kts b/kafka-streams-serdes/build.gradle.kts index 1b7c659..bae032d 100644 --- a/kafka-streams-serdes/build.gradle.kts +++ b/kafka-streams-serdes/build.gradle.kts @@ -15,7 +15,7 @@ dependencies { api("org.apache.kafka:kafka-clients") api("org.apache.avro:avro") - api("com.google.protobuf:protobuf-java-util:3.21.7") + api("com.google.protobuf:protobuf-java-util:3.25.4") testImplementation("org.junit.jupiter:junit-jupiter:5.8.2") }