Skip to content

Commit

Permalink
Add client.id config to channel consumer and producer
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck committed Jun 7, 2023
1 parent 3972664 commit a490eba
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 1 deletion.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ subprojects {
apply plugin: "maven-publish"

group "io.tabular.connect"
version "0.3.1-SNAPSHOT"
version "0.3.1"

repositories {
mavenCentral()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand All @@ -42,6 +43,7 @@ public KafkaClientFactory(Map<String, String> kafkaProps) {
public Producer<String, byte[]> createProducer(String transactionalId) {
Map<String, Object> producerProps = new HashMap<>(kafkaProps);
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
KafkaProducer<String, byte[]> result =
new KafkaProducer<>(producerProps, new StringSerializer(), new ByteArraySerializer());
result.initTransactions();
Expand All @@ -54,6 +56,7 @@ public Consumer<String, byte[]> createConsumer(String consumerGroupId) {
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
return new KafkaConsumer<>(
consumerProps, new StringDeserializer(), new ByteArrayDeserializer());
}
Expand Down

0 comments on commit a490eba

Please sign in to comment.