Skip to content

Commit 8671900

Browse files
committed
Adding custom proto serdes
1 parent 8f5da9e commit 8671900

File tree

3 files changed

+37
-0
lines changed

3 files changed

+37
-0
lines changed

kafka-streams-serdes/build.gradle.kts

+2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
plugins {
22
`java-library`
33
jacoco
4+
id("com.google.protobuf") version "0.9.4"
45
id("org.hypertrace.avro-plugin")
56
id("org.hypertrace.publish-plugin")
67
id("org.hypertrace.jacoco-report-plugin")
@@ -15,6 +16,7 @@ dependencies {
1516

1617
api("org.apache.kafka:kafka-clients")
1718
api("org.apache.avro:avro")
19+
implementation("com.google.protobuf:protobuf-java:3.23.0") // Adjust the version as needed
1820

1921
testImplementation("org.junit.jupiter:junit-jupiter:5.8.2")
2022
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package org.hypertrace.core.kafkastreams.framework.serdes.proto;
2+
3+
import com.google.protobuf.InvalidProtocolBufferException;
4+
import com.google.protobuf.Message;
5+
import com.google.protobuf.Parser;
6+
import org.apache.kafka.common.serialization.Deserializer;
7+
8+
public class ProtoDeserializer<T extends Message> implements Deserializer<T> {
9+
10+
private final Parser<T> parser;
11+
12+
public ProtoDeserializer(Parser<T> parser) {
13+
this.parser = parser;
14+
}
15+
16+
@Override
17+
public T deserialize(String s, byte[] bytes) {
18+
try {
19+
return parser.parseFrom(bytes);
20+
} catch (InvalidProtocolBufferException e) {
21+
throw new RuntimeException(e);
22+
}
23+
}
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package org.hypertrace.core.kafkastreams.framework.serdes.proto;
2+
3+
import com.google.protobuf.Message;
4+
import org.apache.kafka.common.serialization.Serializer;
5+
6+
public class ProtoSerializer<T extends Message> implements Serializer<T> {
7+
@Override
8+
public byte[] serialize(String topic, T data) {
9+
return data.toByteArray();
10+
}
11+
}

0 commit comments

Comments
 (0)