diff --git a/kafka-bom/build.gradle.kts b/kafka-bom/build.gradle.kts index 5d47c6d..1f4d72d 100644 --- a/kafka-bom/build.gradle.kts +++ b/kafka-bom/build.gradle.kts @@ -15,7 +15,7 @@ dependencies { because("[https://nvd.nist.gov/vuln/detail/CVE-2023-34455] in 'org.apache.kafka:kafka-clients:*'") because("[https://nvd.nist.gov/vuln/detail/CVE-2023-43642]") } - api("com.google.protobuf:protobuf-java-util:3.21.7") { + api("com.google.protobuf:protobuf-java-util:$protobufVersion") { because("https://nvd.nist.gov/vuln/detail/CVE-2022-3171") } api("com.squareup.okio:okio:3.4.0") { diff --git a/kafka-streams-serdes/build.gradle.kts b/kafka-streams-serdes/build.gradle.kts index 3dfc3f8..bae032d 100644 --- a/kafka-streams-serdes/build.gradle.kts +++ b/kafka-streams-serdes/build.gradle.kts @@ -15,6 +15,7 @@ dependencies { api("org.apache.kafka:kafka-clients") api("org.apache.avro:avro") + api("com.google.protobuf:protobuf-java-util:3.25.4") testImplementation("org.junit.jupiter:junit-jupiter:5.8.2") } diff --git a/kafka-streams-serdes/src/main/java/org/hypertrace/core/kafkastreams/framework/serdes/proto/ProtoDeserializer.java b/kafka-streams-serdes/src/main/java/org/hypertrace/core/kafkastreams/framework/serdes/proto/ProtoDeserializer.java new file mode 100644 index 0000000..0280894 --- /dev/null +++ b/kafka-streams-serdes/src/main/java/org/hypertrace/core/kafkastreams/framework/serdes/proto/ProtoDeserializer.java @@ -0,0 +1,61 @@ +package org.hypertrace.core.kafkastreams.framework.serdes.proto; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.Parser; +import org.apache.kafka.common.serialization.Deserializer; + +/** + * Custom Proto Deserializer for Kafka. + * + *

This class provides a deserialization mechanism for Kafka messages using Protocol Buffers + * without schema validation. It extends the Kafka Deserializer interface and allows for direct + * deserialization of byte arrays into Proto message objects by utilizing the provided Parser for + * the specific Proto message type. + * + *

Motivation: Since the proto. configurations are usually shared between the producer and the + * consumers,the field descriptors are well-known to both the parties. In cases when there are other + * mechanisms to validate proto. compatibilities schema validation becomes redundant and this class + * can be used in such cases. The built-in {@code kafkaProtoSerdes} from Confluent performs schema + * validation via the schema registry service, which introduces overhead. This custom deserializer + * eliminates that overhead, simplifying the processing flow by bypassing schema validation. + * + *

Usage: To use this class, create a subclass specifying the Proto message type, pass the + * corresponding Parser to the superclass constructor, and configure Kafka to use the custom + * deserializer. + * + *

Example: + * + *

{@code
+ * public class MyProtoMessageDeserializer extends ProtoDeserializer {
+ *     public MyProtoMessageDeserializer() {
+ *         super(MyProtoMessage.parser());
+ *     }
+ * }
+ * }
+ * + * Then, configure Kafka to use this deserializer: + * + *
{@code
+ * key.deserializer=com.example.MyProtoMessageDeserializer
+ * }
+ * + * @param The Proto message type to be deserialized. + */ +public class ProtoDeserializer implements Deserializer { + + private final Parser parser; + + public ProtoDeserializer(Parser parser) { + this.parser = parser; + } + + @Override + public T deserialize(String s, byte[] bytes) { + try { + return parser.parseFrom(bytes); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } +} diff --git a/kafka-streams-serdes/src/main/java/org/hypertrace/core/kafkastreams/framework/serdes/proto/ProtoSerializer.java b/kafka-streams-serdes/src/main/java/org/hypertrace/core/kafkastreams/framework/serdes/proto/ProtoSerializer.java new file mode 100644 index 0000000..edc86eb --- /dev/null +++ b/kafka-streams-serdes/src/main/java/org/hypertrace/core/kafkastreams/framework/serdes/proto/ProtoSerializer.java @@ -0,0 +1,45 @@ +package org.hypertrace.core.kafkastreams.framework.serdes.proto; + +import com.google.protobuf.Message; +import org.apache.kafka.common.serialization.Serializer; + +/** + * Custom Proto Serializer for Kafka. + * + *

This class provides a serialization mechanism for Kafka messages using Protocol Buffers + * without schema validation. It extends the Kafka Serializer interface and allows for direct + * serialization of byte arrays into Proto message objects by utilizing the provided Parser for the + * specific Proto message type. + * + *

Motivation: Since the proto. configurations are usually shared between the producer and the + * consumers,the field descriptors are well-known to both the parties. In cases when there are other + * mechanisms to validate proto. compatibilities schema validation becomes redundant and this class + * can be used in such cases. The built-in {@code kafkaProtoSerdes} from Confluent performs schema + * validation via the schema registry service, which introduces overhead. This custom serializer + * eliminates that overhead, simplifying the processing flow by bypassing schema validation. + * + *

Usage: To use this class, create a subclass specifying the Proto message type, and configure + * Kafka to use the custom serializer. + * + *

Example: + * + *

{@code
+ * public class MyProtoMessageSerializer extends ProtoSerializer {
+ *
+ * }
+ * }
+ * + * Then, configure Kafka to use this serializer: + * + *
{@code
+ * key.serializer=com.example.MyProtoMessageSerializer
+ * }
+ * + * @param The Proto message type to be serialized. + */ +public class ProtoSerializer implements Serializer { + @Override + public byte[] serialize(String topic, T data) { + return data.toByteArray(); + } +} diff --git a/kafka-streams-serdes/src/test/java/org/hypertrace/core/kafkastreams/framework/serdes/ProtoSerdeTest.java b/kafka-streams-serdes/src/test/java/org/hypertrace/core/kafkastreams/framework/serdes/ProtoSerdeTest.java new file mode 100644 index 0000000..7367e56 --- /dev/null +++ b/kafka-streams-serdes/src/test/java/org/hypertrace/core/kafkastreams/framework/serdes/ProtoSerdeTest.java @@ -0,0 +1,38 @@ +package org.hypertrace.core.kafkastreams.framework.serdes; + +import com.google.protobuf.Value; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.hypertrace.core.kafkastreams.framework.serdes.proto.ProtoDeserializer; +import org.hypertrace.core.kafkastreams.framework.serdes.proto.ProtoSerializer; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class ProtoSerdeTest { + + private static final String TEST_TOPIC = "test-topic"; + + // Subclass for testing with proto deserialization + public static class TestProtoRecordDeserializer extends ProtoDeserializer { + public TestProtoRecordDeserializer() { + super(Value.parser()); + } + } + + @Test + public void testSerialize() { + Serializer serializer = new ProtoSerializer<>(); + + Deserializer deserializer = new TestProtoRecordDeserializer(); + Value message = Value.newBuilder().setStringValue("id").build(); + + byte[] serializedData = serializer.serialize(TEST_TOPIC, message); + + Assertions.assertNotNull(serializedData); + Assertions.assertTrue(serializedData.length > 0); + + Value deserializedMessage = deserializer.deserialize(TEST_TOPIC, serializedData); + + Assertions.assertEquals(message.getStringValue(), deserializedMessage.getStringValue()); + } +}