Skip to content

Commit f563d23

Browse files
authored
fix(kafka): Add support for confluent message indices. (#1902)
* fix(kafka): Add support for confluent message indices. * Make Generator classes non-instantiable (they are static utility classes). * Make generator classes final.
1 parent eebc06a commit f563d23

File tree

7 files changed

+246
-59
lines changed

7 files changed

+246
-59
lines changed

examples/powertools-examples-kafka/events/kafka-protobuf-event.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
"timestamp": 1545084650988,
2626
"timestampType": "CREATE_TIME",
2727
"key": "NDI=",
28-
"value": "COoHEgpTbWFydHBob25lGVK4HoXrv4JA",
28+
"value": "AAjpBxIGTGFwdG9wGVK4HoXrP49A",
2929
"headers": [
3030
{
3131
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
@@ -39,7 +39,7 @@
3939
"timestamp": 1545084650989,
4040
"timestampType": "CREATE_TIME",
4141
"key": null,
42-
"value": "COsHEgpIZWFkcGhvbmVzGUjhehSuv2JA",
42+
"value": "AgEACOkHEgZMYXB0b3AZUrgehes/j0A=",
4343
"headers": [
4444
{
4545
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]

examples/powertools-examples-kafka/tools/README.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ The tool will output base64-encoded values for Avro products that can be used in
4545
mvn exec:java -Dexec.mainClass="org.demo.kafka.tools.GenerateProtobufSamples"
4646
```
4747

48-
The tool will output base64-encoded values for Protobuf products that can be used in `../events/kafka-protobuf-event.json`.
48+
The tool will output base64-encoded values for Protobuf products that can be used in `../events/kafka-protobuf-event.json`. This generator creates samples with and without Confluent message-indexes to test different serialization scenarios.
4949

5050
## Output
5151

@@ -55,6 +55,13 @@ Each generator produces:
5555
2. An integer key (42) and one entry with a nullish key to test for edge-cases
5656
3. A complete sample event structure that can be used directly for testing
5757

58+
The Protobuf generators additionally create samples with different Confluent message-index formats:
59+
- Standard protobuf (no message indexes)
60+
- Simple message index (single 0 byte)
61+
- Complex message index (length-prefixed array)
62+
63+
For more information about Confluent Schema Registry serialization formats and wire format specifications, see the [Confluent documentation](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format).
64+
5865
## Example
5966

6067
After generating the samples, you can copy the output into the respective event files:

examples/powertools-examples-kafka/tools/src/main/java/org/demo/kafka/tools/GenerateAvroSamples.java

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,62 +14,68 @@
1414
* Utility class to generate base64-encoded Avro serialized products
1515
* for use in test events.
1616
*/
17-
public class GenerateAvroSamples {
17+
public final class GenerateAvroSamples {
18+
19+
private GenerateAvroSamples() {
20+
// Utility class
21+
}
1822

1923
public static void main(String[] args) throws IOException {
2024
// Create three different products
2125
AvroProduct product1 = new AvroProduct(1001, "Laptop", 999.99);
2226
AvroProduct product2 = new AvroProduct(1002, "Smartphone", 599.99);
2327
AvroProduct product3 = new AvroProduct(1003, "Headphones", 149.99);
24-
28+
2529
// Serialize and encode each product
2630
String encodedProduct1 = serializeAndEncode(product1);
2731
String encodedProduct2 = serializeAndEncode(product2);
2832
String encodedProduct3 = serializeAndEncode(product3);
29-
33+
3034
// Serialize and encode an integer key
3135
String encodedKey = serializeAndEncodeInteger(42);
32-
36+
3337
// Print the results
3438
System.out.println("Base64 encoded Avro products for use in kafka-avro-event.json:");
3539
System.out.println("\nProduct 1 (with key):");
3640
System.out.println("key: \"" + encodedKey + "\",");
3741
System.out.println("value: \"" + encodedProduct1 + "\",");
38-
42+
3943
System.out.println("\nProduct 2 (with key):");
4044
System.out.println("key: \"" + encodedKey + "\",");
4145
System.out.println("value: \"" + encodedProduct2 + "\",");
42-
46+
4347
System.out.println("\nProduct 3 (without key):");
4448
System.out.println("key: null,");
4549
System.out.println("value: \"" + encodedProduct3 + "\",");
46-
50+
4751
// Print a sample event structure
4852
System.out.println("\nSample event structure:");
4953
printSampleEvent(encodedKey, encodedProduct1, encodedProduct2, encodedProduct3);
5054
}
51-
55+
5256
private static String serializeAndEncode(AvroProduct product) throws IOException {
5357
ByteArrayOutputStream baos = new ByteArrayOutputStream();
5458
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(baos, null);
5559
DatumWriter<AvroProduct> writer = new SpecificDatumWriter<>(AvroProduct.class);
56-
60+
5761
writer.write(product, encoder);
5862
encoder.flush();
59-
63+
6064
return Base64.getEncoder().encodeToString(baos.toByteArray());
6165
}
62-
66+
6367
private static String serializeAndEncodeInteger(Integer value) throws IOException {
6468
// For simple types like integers, we'll just convert to string and encode
6569
return Base64.getEncoder().encodeToString(value.toString().getBytes());
6670
}
67-
71+
6872
private static void printSampleEvent(String key, String product1, String product2, String product3) {
6973
System.out.println("{\n" +
7074
" \"eventSource\": \"aws:kafka\",\n" +
71-
" \"eventSourceArn\": \"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4\",\n" +
72-
" \"bootstrapServers\": \"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092\",\n" +
75+
" \"eventSourceArn\": \"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4\",\n"
76+
+
77+
" \"bootstrapServers\": \"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092\",\n"
78+
+
7379
" \"records\": {\n" +
7480
" \"mytopic-0\": [\n" +
7581
" {\n" +

examples/powertools-examples-kafka/tools/src/main/java/org/demo/kafka/tools/GenerateJsonSamples.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@
1111
* Utility class to generate base64-encoded JSON serialized products
1212
* for use in test events.
1313
*/
14-
public class GenerateJsonSamples {
14+
public final class GenerateJsonSamples {
15+
16+
private GenerateJsonSamples() {
17+
// Utility class
18+
}
1519

1620
public static void main(String[] args) throws IOException {
1721
// Create three different products

examples/powertools-examples-kafka/tools/src/main/java/org/demo/kafka/tools/GenerateProtobufSamples.java

Lines changed: 77 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,73 +1,110 @@
11
package org.demo.kafka.tools;
22

3+
import java.io.ByteArrayOutputStream;
34
import java.io.IOException;
45
import java.util.Base64;
56

67
import org.demo.kafka.protobuf.ProtobufProduct;
78

9+
import com.google.protobuf.CodedOutputStream;
10+
811
/**
912
* Utility class to generate base64-encoded Protobuf serialized products
1013
* for use in test events.
1114
*/
12-
public class GenerateProtobufSamples {
15+
public final class GenerateProtobufSamples {
16+
17+
private GenerateProtobufSamples() {
18+
// Utility class
19+
}
1320

1421
public static void main(String[] args) throws IOException {
15-
// Create three different products
16-
ProtobufProduct product1 = ProtobufProduct.newBuilder()
22+
// Create a single product that will be used for all three scenarios
23+
ProtobufProduct product = ProtobufProduct.newBuilder()
1724
.setId(1001)
1825
.setName("Laptop")
1926
.setPrice(999.99)
2027
.build();
2128

22-
ProtobufProduct product2 = ProtobufProduct.newBuilder()
23-
.setId(1002)
24-
.setName("Smartphone")
25-
.setPrice(599.99)
26-
.build();
27-
28-
ProtobufProduct product3 = ProtobufProduct.newBuilder()
29-
.setId(1003)
30-
.setName("Headphones")
31-
.setPrice(149.99)
32-
.build();
33-
34-
// Serialize and encode each product
35-
String encodedProduct1 = serializeAndEncode(product1);
36-
String encodedProduct2 = serializeAndEncode(product2);
37-
String encodedProduct3 = serializeAndEncode(product3);
29+
// Create three different serializations of the same product
30+
String standardProduct = serializeAndEncode(product);
31+
String productWithSimpleIndex = serializeWithSimpleMessageIndex(product);
32+
String productWithComplexIndex = serializeWithComplexMessageIndex(product);
3833

39-
// Serialize and encode an integer key
34+
// Serialize and encode an integer key (same for all records)
4035
String encodedKey = serializeAndEncodeInteger(42);
4136

4237
// Print the results
43-
System.out.println("Base64 encoded Protobuf products for use in kafka-protobuf-event.json:");
44-
System.out.println("\nProduct 1 (with key):");
45-
System.out.println("key: \"" + encodedKey + "\",");
46-
System.out.println("value: \"" + encodedProduct1 + "\",");
47-
48-
System.out.println("\nProduct 2 (with key):");
49-
System.out.println("key: \"" + encodedKey + "\",");
50-
System.out.println("value: \"" + encodedProduct2 + "\",");
51-
52-
System.out.println("\nProduct 3 (without key):");
53-
System.out.println("key: null,");
54-
System.out.println("value: \"" + encodedProduct3 + "\",");
55-
56-
// Print a sample event structure
57-
System.out.println("\nSample event structure:");
58-
printSampleEvent(encodedKey, encodedProduct1, encodedProduct2, encodedProduct3);
38+
System.out.println("Base64 encoded Protobuf products with different message index scenarios:");
39+
System.out.println("\n1. Standard Protobuf (no message index):");
40+
System.out.println("value: \"" + standardProduct + "\"");
41+
42+
System.out.println("\n2. Simple Message Index (single 0):");
43+
System.out.println("value: \"" + productWithSimpleIndex + "\"");
44+
45+
System.out.println("\n3. Complex Message Index (array [1,0]):");
46+
System.out.println("value: \"" + productWithComplexIndex + "\"");
47+
48+
// Print the merged event structure
49+
System.out.println("\n" + "=".repeat(80));
50+
System.out.println("MERGED EVENT WITH ALL THREE SCENARIOS");
51+
System.out.println("=".repeat(80));
52+
printSampleEvent(encodedKey, standardProduct, productWithSimpleIndex, productWithComplexIndex);
5953
}
6054

6155
private static String serializeAndEncode(ProtobufProduct product) {
6256
return Base64.getEncoder().encodeToString(product.toByteArray());
6357
}
6458

59+
/**
60+
* Serializes a protobuf product with a simple Confluent message index (single 0).
61+
* Format: [0][protobuf_data]
62+
*
63+
* @see {@link https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format}
64+
*/
65+
private static String serializeWithSimpleMessageIndex(ProtobufProduct product) throws IOException {
66+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
67+
CodedOutputStream codedOutput = CodedOutputStream.newInstance(baos);
68+
69+
// Write simple message index (single 0)
70+
codedOutput.writeUInt32NoTag(0);
71+
72+
// Write the protobuf data
73+
product.writeTo(codedOutput);
74+
75+
codedOutput.flush();
76+
return Base64.getEncoder().encodeToString(baos.toByteArray());
77+
}
78+
79+
/**
80+
* Serializes a protobuf product with a complex Confluent message index (array [1,0]).
81+
* Format: [2][1][0][protobuf_data] where 2 is the array length
82+
*
83+
* @see {@link https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format}
84+
*/
85+
private static String serializeWithComplexMessageIndex(ProtobufProduct product) throws IOException {
86+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
87+
CodedOutputStream codedOutput = CodedOutputStream.newInstance(baos);
88+
89+
// Write complex message index array [1,0]
90+
codedOutput.writeUInt32NoTag(2); // Array length
91+
codedOutput.writeUInt32NoTag(1); // First index value
92+
codedOutput.writeUInt32NoTag(0); // Second index value
93+
94+
// Write the protobuf data
95+
product.writeTo(codedOutput);
96+
97+
codedOutput.flush();
98+
return Base64.getEncoder().encodeToString(baos.toByteArray());
99+
}
100+
65101
private static String serializeAndEncodeInteger(Integer value) {
66102
// For simple types like integers, we'll just convert to string and encode
67103
return Base64.getEncoder().encodeToString(value.toString().getBytes());
68104
}
69105

70-
private static void printSampleEvent(String key, String product1, String product2, String product3) {
106+
private static void printSampleEvent(String key, String standardProduct, String simpleIndexProduct,
107+
String complexIndexProduct) {
71108
System.out.println("{\n" +
72109
" \"eventSource\": \"aws:kafka\",\n" +
73110
" \"eventSourceArn\": \"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4\",\n"
@@ -83,7 +120,7 @@ private static void printSampleEvent(String key, String product1, String product
83120
" \"timestamp\": 1545084650987,\n" +
84121
" \"timestampType\": \"CREATE_TIME\",\n" +
85122
" \"key\": \"" + key + "\",\n" +
86-
" \"value\": \"" + product1 + "\",\n" +
123+
" \"value\": \"" + standardProduct + "\",\n" +
87124
" \"headers\": [\n" +
88125
" {\n" +
89126
" \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" +
@@ -97,7 +134,7 @@ private static void printSampleEvent(String key, String product1, String product
97134
" \"timestamp\": 1545084650988,\n" +
98135
" \"timestampType\": \"CREATE_TIME\",\n" +
99136
" \"key\": \"" + key + "\",\n" +
100-
" \"value\": \"" + product2 + "\",\n" +
137+
" \"value\": \"" + simpleIndexProduct + "\",\n" +
101138
" \"headers\": [\n" +
102139
" {\n" +
103140
" \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" +
@@ -111,7 +148,7 @@ private static void printSampleEvent(String key, String product1, String product
111148
" \"timestamp\": 1545084650989,\n" +
112149
" \"timestampType\": \"CREATE_TIME\",\n" +
113150
" \"key\": null,\n" +
114-
" \"value\": \"" + product3 + "\",\n" +
151+
" \"value\": \"" + complexIndexProduct + "\",\n" +
115152
" \"headers\": [\n" +
116153
" {\n" +
117154
" \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" +

powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/KafkaProtobufDeserializer.java

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,27 @@
1313
package software.amazon.lambda.powertools.kafka.serializers;
1414

1515
import java.io.IOException;
16+
17+
import org.slf4j.Logger;
18+
import org.slf4j.LoggerFactory;
19+
20+
import com.google.protobuf.CodedInputStream;
1621
import com.google.protobuf.Message;
1722
import com.google.protobuf.Parser;
1823

1924
/**
2025
* Deserializer for Kafka records using Protocol Buffers format.
26+
* Supports both standard protobuf serialization and Confluent Schema Registry serialization using messages indices.
27+
*
28+
* For Confluent-serialized data, assumes the magic byte and schema ID have already been stripped
29+
* by the Kafka ESM, leaving only the message index (if present) and protobuf data.
30+
*
31+
* @see {@link https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format}
2132
*/
2233
public class KafkaProtobufDeserializer extends AbstractKafkaDeserializer {
2334

35+
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProtobufDeserializer.class);
36+
2437
@Override
2538
@SuppressWarnings("unchecked")
2639
protected <T> T deserializeObject(byte[] data, Class<T> type) throws IOException {
@@ -29,7 +42,9 @@ protected <T> T deserializeObject(byte[] data, Class<T> type) throws IOException
2942
try {
3043
// Get the parser from the generated Protobuf class
3144
Parser<Message> parser = (Parser<Message>) type.getMethod("parser").invoke(null);
32-
Message message = parser.parseFrom(data);
45+
46+
// Try to deserialize the data, handling potential Confluent message indices
47+
Message message = deserializeWithMessageIndexHandling(data, parser);
3348
return type.cast(message);
3449
} catch (Exception e) {
3550
throw new IOException("Failed to deserialize Protobuf data.", e);
@@ -40,4 +55,45 @@ protected <T> T deserializeObject(byte[] data, Class<T> type) throws IOException
4055
+ "Consider using an alternative Deserializer.");
4156
}
4257
}
58+
59+
private Message deserializeWithMessageIndexHandling(byte[] data, Parser<Message> parser) throws IOException {
60+
try {
61+
LOGGER.debug("Attempting to deserialize as standard protobuf data");
62+
return parser.parseFrom(data);
63+
} catch (Exception e) {
64+
LOGGER.debug("Standard protobuf parsing failed, attempting Confluent message-index handling");
65+
return deserializeWithMessageIndex(data, parser);
66+
}
67+
}
68+
69+
private Message deserializeWithMessageIndex(byte[] data, Parser<Message> parser) throws IOException {
70+
CodedInputStream codedInputStream = CodedInputStream.newInstance(data);
71+
72+
try {
73+
// https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
74+
// Read the first varint - this could be:
75+
// 1. A single 0 (simple case - first message type)
76+
// 2. The length of the message index array (complex case)
77+
int firstValue = codedInputStream.readUInt32();
78+
79+
if (firstValue == 0) {
80+
// Simple case: Single 0 byte means first message type
81+
LOGGER.debug("Found simple message-index case (single 0), parsing remaining data as protobuf");
82+
return parser.parseFrom(codedInputStream);
83+
} else {
84+
// Complex case: firstValue is the length of the message index array
85+
LOGGER.debug("Found complex message-index case with array length: {}, skipping {} message index values",
86+
firstValue, firstValue);
87+
for (int i = 0; i < firstValue; i++) {
88+
codedInputStream.readUInt32(); // Skip each message index value
89+
}
90+
// Now the remaining data should be the actual protobuf message
91+
LOGGER.debug("Finished skipping message indexes, parsing remaining data as protobuf");
92+
return parser.parseFrom(codedInputStream);
93+
}
94+
95+
} catch (Exception e) {
96+
throw new IOException("Failed to parse protobuf data with or without message index", e);
97+
}
98+
}
4399
}

0 commit comments

Comments
 (0)