Skip to content

Commit 87ec582

Browse files
committed
GH-900: Use correct txId for initial commit
Fixes #900 Use the proper zombie-fencing `transational.id` when committing initial offsets after assignment. **cherry-pick to all, 1.3.x will probable need a backport** * Fix - set TL before invoking the transaction template.
1 parent 7891bc0 commit 87ec582

File tree

2 files changed

+48
-22
lines changed

2 files changed

+48
-22
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,12 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
374374

375375
ConsumerRebalanceListener rebalanceListener = createRebalanceListener(consumer);
376376

377+
if (this.transactionManager != null) {
378+
this.transactionTemplate = new TransactionTemplate(this.transactionManager);
379+
}
380+
else {
381+
this.transactionTemplate = null;
382+
}
377383
if (KafkaMessageListenerContainer.this.topicPartitions == null) {
378384
if (this.containerProperties.getTopicPattern() != null) {
379385
consumer.subscribe(this.containerProperties.getTopicPattern(), rebalanceListener);
@@ -424,12 +430,6 @@ else if (listener instanceof MessageListener) {
424430
this.batchErrorHandler = new BatchLoggingErrorHandler();
425431
}
426432
Assert.state(!this.isBatchListener || !this.isRecordAck, "Cannot use AckMode.RECORD with a batch listener");
427-
if (this.transactionManager != null) {
428-
this.transactionTemplate = new TransactionTemplate(this.transactionManager);
429-
}
430-
else {
431-
this.transactionTemplate = null;
432-
}
433433
if (this.containerProperties.getScheduler() != null) {
434434
this.taskScheduler = this.containerProperties.getScheduler();
435435
this.taskSchedulerExplicitlySet = true;
@@ -512,18 +512,29 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
512512
}
513513
if (ListenerConsumer.this.transactionTemplate != null &&
514514
ListenerConsumer.this.kafkaTxManager != null) {
515-
ListenerConsumer.this.transactionTemplate.execute(new TransactionCallbackWithoutResult() {
516-
517-
@SuppressWarnings({ "unchecked", "rawtypes" })
518-
@Override
519-
protected void doInTransactionWithoutResult(TransactionStatus status) {
520-
((KafkaResourceHolder) TransactionSynchronizationManager
521-
.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory()))
522-
.getProducer().sendOffsetsToTransaction(offsets,
523-
ListenerConsumer.this.consumerGroupId);
524-
}
525-
526-
});
515+
try {
516+
offsets.forEach((partition, offsetAndMetadata) -> {
517+
TransactionSupport.setTransactionIdSuffix(
518+
zombieFenceTxIdSuffix(partition.topic(), partition.partition()));
519+
ListenerConsumer.this.transactionTemplate
520+
.execute(new TransactionCallbackWithoutResult() {
521+
522+
@SuppressWarnings({ "unchecked", "rawtypes" })
523+
@Override
524+
protected void doInTransactionWithoutResult(TransactionStatus status) {
525+
((KafkaResourceHolder) TransactionSynchronizationManager
526+
.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory()))
527+
.getProducer().sendOffsetsToTransaction(
528+
Collections.singletonMap(partition, offsetAndMetadata),
529+
ListenerConsumer.this.consumerGroupId);
530+
}
531+
532+
});
533+
});
534+
}
535+
finally {
536+
TransactionSupport.clearTransactionIdSuffix();
537+
}
527538
}
528539
else if (KafkaMessageListenerContainer.this.getContainerProperties().isSyncCommits()) {
529540
ListenerConsumer.this.consumer.commitSync(offsets);

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
import static org.mockito.Mockito.never;
3030
import static org.mockito.Mockito.times;
3131
import static org.mockito.Mockito.verify;
32+
import static org.mockito.Mockito.verifyNoMoreInteractions;
3233

34+
import java.util.ArrayList;
3335
import java.util.Arrays;
3436
import java.util.Collection;
3537
import java.util.Collections;
@@ -67,6 +69,7 @@
6769
import org.springframework.kafka.core.ProducerFactoryUtils;
6870
import org.springframework.kafka.listener.config.ContainerProperties;
6971
import org.springframework.kafka.support.TopicPartitionInitialOffset;
72+
import org.springframework.kafka.support.TransactionSupport;
7073
import org.springframework.kafka.test.rule.KafkaEmbedded;
7174
import org.springframework.kafka.test.utils.KafkaTestUtils;
7275
import org.springframework.kafka.transaction.KafkaTransactionManager;
@@ -132,14 +135,18 @@ private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handl
132135
ConsumerFactory cf = mock(ConsumerFactory.class);
133136
willReturn(consumer).given(cf).createConsumer("group", null);
134137
Producer producer = mock(Producer.class);
135-
final CountDownLatch closeLatch = new CountDownLatch(1);
138+
final CountDownLatch closeLatch = new CountDownLatch(2);
136139
willAnswer(i -> {
137140
closeLatch.countDown();
138141
return null;
139142
}).given(producer).close();
140143
ProducerFactory pf = mock(ProducerFactory.class);
141144
given(pf.transactionCapable()).willReturn(true);
142-
given(pf.createProducer()).willReturn(producer);
145+
final List<String> transactionalIds = new ArrayList<>();
146+
willAnswer(i -> {
147+
transactionalIds.add(TransactionSupport.getTransactionIdSuffix());
148+
return producer;
149+
}).given(pf).createProducer();
143150
KafkaTransactionManager tm = new KafkaTransactionManager(pf);
144151
ContainerProperties props = new ContainerProperties("foo");
145152
props.setGroupId("group");
@@ -161,6 +168,11 @@ private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handl
161168
assertThat(closeLatch.await(10, TimeUnit.SECONDS)).isTrue();
162169
InOrder inOrder = inOrder(producer);
163170
inOrder.verify(producer).beginTransaction();
171+
inOrder.verify(producer).sendOffsetsToTransaction(Collections.singletonMap(topicPartition,
172+
new OffsetAndMetadata(0)), "group");
173+
inOrder.verify(producer).commitTransaction();
174+
inOrder.verify(producer).close();
175+
inOrder.verify(producer).beginTransaction();
164176
ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
165177
inOrder.verify(producer).send(captor.capture(), any(Callback.class));
166178
assertThat(captor.getValue()).isEqualTo(new ProducerRecord("bar", "baz"));
@@ -169,7 +181,10 @@ private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handl
169181
inOrder.verify(producer).commitTransaction();
170182
inOrder.verify(producer).close();
171183
container.stop();
172-
verify(pf, times(1)).createProducer();
184+
verify(pf, times(2)).createProducer();
185+
verifyNoMoreInteractions(producer);
186+
assertThat(transactionalIds.get(0)).isEqualTo("group.foo.0");
187+
assertThat(transactionalIds.get(0)).isEqualTo("group.foo.0");
173188
}
174189

175190
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -409,7 +424,7 @@ public void testRollbackRecord() throws Exception {
409424
}
410425
});
411426

412-
@SuppressWarnings({ "rawtypes", "unchecked" })
427+
@SuppressWarnings({ "rawtypes" })
413428
KafkaTransactionManager tm = new KafkaTransactionManager(pf);
414429
containerProps.setTransactionManager(tm);
415430
KafkaMessageListenerContainer<Integer, String> container =

0 commit comments

Comments
 (0)