Skip to content

Commit 147cd04

Browse files
committed
resolved comments
1 parent 8671900 commit 147cd04

File tree

3 files changed

+39
-3
lines changed

3 files changed

+39
-3
lines changed

kafka-bom/build.gradle.kts

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ dependencies {
1515
because("[https://nvd.nist.gov/vuln/detail/CVE-2023-34455] in 'org.apache.kafka:kafka-clients:*'")
1616
because("[https://nvd.nist.gov/vuln/detail/CVE-2023-43642]")
1717
}
18-
api("com.google.protobuf:protobuf-java-util:3.21.7") {
18+
api("com.google.protobuf:protobuf-java-util:$protobufVersion") {
1919
because("https://nvd.nist.gov/vuln/detail/CVE-2022-3171")
2020
}
2121
api("com.squareup.okio:okio:3.4.0") {

kafka-streams-serdes/build.gradle.kts

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
plugins {
22
`java-library`
33
jacoco
4-
id("com.google.protobuf") version "0.9.4"
54
id("org.hypertrace.avro-plugin")
65
id("org.hypertrace.publish-plugin")
76
id("org.hypertrace.jacoco-report-plugin")
@@ -16,7 +15,7 @@ dependencies {
1615

1716
api("org.apache.kafka:kafka-clients")
1817
api("org.apache.avro:avro")
19-
implementation("com.google.protobuf:protobuf-java:3.23.0") // Adjust the version as needed
18+
api("com.google.protobuf:protobuf-java-util")
2019

2120
testImplementation("org.junit.jupiter:junit-jupiter:5.8.2")
2221
}

kafka-streams-serdes/src/main/java/org/hypertrace/core/kafkastreams/framework/serdes/proto/ProtoDeserializer.java

+37
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,43 @@
55
import com.google.protobuf.Parser;
66
import org.apache.kafka.common.serialization.Deserializer;
77

8+
/**
9+
* Custom Proto Deserializer for Kafka.
10+
*
11+
* <p>This class provides a deserialization mechanism for Kafka messages using Protocol Buffers
12+
* without schema validation. It extends the Kafka Deserializer interface and allows for direct
13+
* deserialization of byte arrays into Proto message objects by utilizing the provided Parser for
14+
* the specific Proto message type.
15+
*
16+
* <p>Motivation: In setups where both producers and consumers use the same Proto schemas, the need
17+
* for schema validation becomes redundant. The built-in {@code kafkaProtoSerdes} from Confluent
18+
* performs schema validation via the schema registry service, which introduces overhead. This
19+
* custom deserializer eliminates that overhead, simplifying the processing flow by bypassing schema
20+
* validation.
21+
*
22+
*
23+
* <p>Usage: To use this class, create a subclass specifying the Proto message type, pass the
24+
* corresponding Parser to the superclass constructor, and configure Kafka to use the custom
25+
* deserializer.
26+
*
27+
* <p>Example:
28+
*
29+
* <pre>{@code
30+
* public class MyProtoMessageDeserializer extends ProtoDeserializer<MyProtoMessage> {
31+
* public MyProtoMessageDeserializer() {
32+
* super(MyProtoMessage.parser());
33+
* }
34+
* }
35+
* }</pre>
36+
*
37+
* Then, configure Kafka to use this deserializer:
38+
*
39+
* <pre>{@code
40+
* key.deserializer=com.example.MyProtoMessageDeserializer
41+
* }</pre>
42+
*
43+
* @param <T> The Proto message type to be deserialized.
44+
*/
845
public class ProtoDeserializer<T extends Message> implements Deserializer<T> {
946

1047
private final Parser<T> parser;

0 commit comments

Comments
 (0)