Skip to content

Commit 669e48f

Browse files
garyrussellartembilan
authored andcommitted
GH-908: Don't cache dedicated consumer producers
Fixes #908 Zombie-fenced producers, running on container threads with topic/partition/group transactional ids must not be added to he general producer cache for use by other arbitrary producer operations. These producers are maintained in their own cache, keyed by the transactional id. Add tests to verify these producers are not cached and that a producer used within a nested transaction is added to the cache. **cherry-pick to 2.1.x, 2.0.x; backport PR will be published for 1.3.x after review/merge** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java
1 parent e36d458 commit 669e48f

File tree

2 files changed

+179
-21
lines changed

2 files changed

+179
-21
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

+66-19
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.springframework.beans.factory.DisposableBean;
4949
import org.springframework.context.Lifecycle;
5050
import org.springframework.kafka.support.TransactionSupport;
51+
import org.springframework.lang.Nullable;
5152
import org.springframework.util.Assert;
5253

5354
/**
@@ -304,12 +305,13 @@ protected Producer<K, V> createTransactionalProducer() {
304305
}
305306

306307
private CloseSafeProducer<K, V> doCreateTxProducer(String suffix, Consumer<CloseSafeProducer<K, V>> remover) {
307-
Producer<K, V> producer;
308-
Map<String, Object> configs = new HashMap<>(this.configs);
309-
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, this.transactionIdPrefix + suffix);
310-
producer = new KafkaProducer<K, V>(configs, this.keySerializer, this.valueSerializer);
311-
producer.initTransactions();
312-
return new CloseSafeProducer<K, V>(producer, this.cache, remover);
308+
Producer<K, V> newProducer;
309+
Map<String, Object> newProducerConfigs = new HashMap<>(this.configs);
310+
newProducerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, this.transactionIdPrefix + suffix);
311+
newProducer = new KafkaProducer<K, V>(newProducerConfigs, this.keySerializer, this.valueSerializer);
312+
newProducer.initTransactions();
313+
return new CloseSafeProducer<K, V>(newProducer, this.cache, remover,
314+
(String) newProducerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
313315
}
314316

315317
protected BlockingQueue<CloseSafeProducer<K, V>> getCache() {
@@ -343,6 +345,8 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
343345

344346
private final Consumer<CloseSafeProducer<K, V>> removeConsumerProducer;
345347

348+
private final String txId;
349+
346350
private volatile boolean txFailed;
347351

348352
CloseSafeProducer(Producer<K, V> delegate) {
@@ -356,9 +360,17 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
356360

357361
CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache,
358362
Consumer<CloseSafeProducer<K, V>> removeConsumerProducer) {
363+
364+
this(delegate, cache, removeConsumerProducer, null);
365+
}
366+
367+
CloseSafeProducer(Producer<K, V> delegate, @Nullable BlockingQueue<CloseSafeProducer<K, V>> cache,
368+
@Nullable Consumer<CloseSafeProducer<K, V>> removeConsumerProducer, @Nullable String txId) {
369+
359370
this.delegate = delegate;
360371
this.cache = cache;
361372
this.removeConsumerProducer = removeConsumerProducer;
373+
this.txId = txId;
362374
}
363375

364376
@Override
@@ -393,10 +405,16 @@ public void initTransactions() {
393405

394406
@Override
395407
public void beginTransaction() throws ProducerFencedException {
408+
if (logger.isDebugEnabled()) {
409+
logger.debug("beginTransaction: " + this);
410+
}
396411
try {
397412
this.delegate.beginTransaction();
398413
}
399414
catch (RuntimeException e) {
415+
if (logger.isErrorEnabled()) {
416+
logger.error("beginTransaction failed: " + this, e);
417+
}
400418
this.txFailed = true;
401419
throw e;
402420
}
@@ -405,61 +423,90 @@ public void beginTransaction() throws ProducerFencedException {
405423
@Override
406424
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId)
407425
throws ProducerFencedException {
426+
408427
this.delegate.sendOffsetsToTransaction(offsets, consumerGroupId);
409428
}
410429

411430
@Override
412431
public void commitTransaction() throws ProducerFencedException {
432+
if (logger.isDebugEnabled()) {
433+
logger.debug("commitTransaction: " + this);
434+
}
413435
try {
414436
this.delegate.commitTransaction();
415437
}
416438
catch (RuntimeException e) {
439+
if (logger.isErrorEnabled()) {
440+
logger.error("commitTransaction failed: " + this, e);
441+
}
417442
this.txFailed = true;
418443
throw e;
419444
}
420445
}
421446

422447
@Override
423448
public void abortTransaction() throws ProducerFencedException {
449+
if (logger.isDebugEnabled()) {
450+
logger.debug("abortTransaction: " + this);
451+
}
424452
try {
425453
this.delegate.abortTransaction();
426454
}
427455
catch (RuntimeException e) {
456+
if (logger.isErrorEnabled()) {
457+
logger.error("Abort failed: " + this, e);
458+
}
428459
this.txFailed = true;
429460
throw e;
430461
}
431462
}
432463

433464
@Override
434465
public void close() {
466+
close(0, null);
467+
}
468+
469+
@Override
470+
public void close(long timeout, @Nullable TimeUnit unit) {
435471
if (this.cache != null) {
436472
if (this.txFailed) {
437-
logger.warn("Error during transactional operation; producer removed from cache; possible cause: "
438-
+ "broker restarted during transaction");
439-
440-
this.delegate.close();
473+
if (logger.isWarnEnabled()) {
474+
logger.warn("Error during transactional operation; producer removed from cache; possible cause: "
475+
+ "broker restarted during transaction: " + this);
476+
}
477+
if (unit == null) {
478+
this.delegate.close();
479+
}
480+
else {
481+
this.delegate.close(timeout, unit);
482+
}
441483
if (this.removeConsumerProducer != null) {
442484
this.removeConsumerProducer.accept(this);
443485
}
444486
}
445487
else {
446-
synchronized (this) {
447-
if (!this.cache.contains(this)) {
448-
this.cache.offer(this);
488+
if (this.removeConsumerProducer == null) { // dedicated consumer producers are not cached
489+
synchronized (this) {
490+
if (!this.cache.contains(this)
491+
&& !this.cache.offer(this)) {
492+
if (unit == null) {
493+
this.delegate.close();
494+
}
495+
else {
496+
this.delegate.close(timeout, unit);
497+
}
498+
}
449499
}
450500
}
451501
}
452502
}
453503
}
454504

455-
@Override
456-
public void close(long timeout, TimeUnit unit) {
457-
close();
458-
}
459-
460505
@Override
461506
public String toString() {
462-
return "CloseSafeProducer [delegate=" + this.delegate + "]";
507+
return "CloseSafeProducer [delegate=" + this.delegate + ""
508+
+ (this.txId != null ? ", txId=" + this.txId : "")
509+
+ "]";
463510
}
464511

465512
}

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java

+113-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,22 +20,50 @@
2020

2121
import java.util.Collections;
2222
import java.util.Map;
23+
import java.util.concurrent.BlockingQueue;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.TimeUnit;
2326

2427
import org.apache.kafka.clients.consumer.ConsumerConfig;
2528
import org.apache.kafka.clients.consumer.KafkaConsumer;
29+
import org.apache.kafka.clients.producer.ProducerConfig;
2630
import org.junit.Test;
31+
import org.junit.runner.RunWith;
32+
33+
import org.springframework.beans.factory.annotation.Autowired;
34+
import org.springframework.context.annotation.Configuration;
35+
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
36+
import org.springframework.kafka.listener.MessageListener;
37+
import org.springframework.kafka.listener.config.ContainerProperties;
38+
import org.springframework.kafka.support.SendResult;
39+
import org.springframework.kafka.test.context.EmbeddedKafka;
40+
import org.springframework.kafka.test.rule.KafkaEmbedded;
41+
import org.springframework.kafka.test.utils.KafkaTestUtils;
42+
import org.springframework.kafka.transaction.KafkaTransactionManager;
43+
import org.springframework.test.context.junit4.SpringRunner;
44+
import org.springframework.util.concurrent.ListenableFuture;
2745

2846
/**
2947
* @author Gary Russell
3048
* @since 1.0.6
3149
*
3250
*/
51+
@EmbeddedKafka(topics = { "txCache1", "txCache2", "txCacheSendFromListener" },
52+
brokerProperties = {
53+
"transaction.state.log.replication.factor=1",
54+
"transaction.state.log.min.isr=1" }
55+
)
56+
@RunWith(SpringRunner.class)
3357
public class DefaultKafkaConsumerFactoryTests {
3458

59+
@Autowired
60+
private KafkaEmbedded embeddedKafka;
61+
3562
@Test
3663
public void testClientId() {
3764
Map<String, Object> configs = Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, "foo");
38-
DefaultKafkaConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<String, String>(configs) {
65+
DefaultKafkaConsumerFactory<String, String> factory =
66+
new DefaultKafkaConsumerFactory<String, String>(configs) {
3967

4068
@Override
4169
protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object> configs) {
@@ -47,4 +75,87 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
4775
factory.createConsumer("-1");
4876
}
4977

78+
@SuppressWarnings("unchecked")
79+
@Test
80+
public void testNestedTxProducerIsCached() throws Exception {
81+
Map<String, Object> producerProps = KafkaTestUtils.producerProps(this.embeddedKafka);
82+
producerProps.put(ProducerConfig.RETRIES_CONFIG, 1);
83+
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(producerProps);
84+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
85+
DefaultKafkaProducerFactory<Integer, String> pfTx = new DefaultKafkaProducerFactory<>(producerProps);
86+
pfTx.setTransactionIdPrefix("fooTx.");
87+
KafkaTemplate<Integer, String> templateTx = new KafkaTemplate<>(pfTx);
88+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("txCache1Group", "false", this.embeddedKafka);
89+
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
90+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
91+
ContainerProperties containerProps = new ContainerProperties("txCache1");
92+
CountDownLatch latch = new CountDownLatch(1);
93+
containerProps.setMessageListener((MessageListener<Integer, String>) r -> {
94+
templateTx.executeInTransaction(t -> t.send("txCacheSendFromListener", "bar"));
95+
templateTx.executeInTransaction(t -> t.send("txCacheSendFromListener", "baz"));
96+
latch.countDown();
97+
});
98+
KafkaTransactionManager<Integer, String> tm = new KafkaTransactionManager<>(pfTx);
99+
containerProps.setTransactionManager(tm);
100+
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
101+
containerProps);
102+
container.start();
103+
try {
104+
ListenableFuture<SendResult<Integer, String>> future = template.send("txCache1", "foo");
105+
future.get();
106+
assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", BlockingQueue.class)).hasSize(0);
107+
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
108+
assertThat(KafkaTestUtils.getPropertyValue(pfTx, "cache", BlockingQueue.class)).hasSize(1);
109+
}
110+
finally {
111+
container.stop();
112+
pf.destroy();
113+
pfTx.destroy();
114+
}
115+
}
116+
117+
@SuppressWarnings("unchecked")
118+
@Test
119+
public void testContainerTxProducerIsNotCached() throws Exception {
120+
Map<String, Object> producerProps = KafkaTestUtils.producerProps(this.embeddedKafka);
121+
producerProps.put(ProducerConfig.RETRIES_CONFIG, 1);
122+
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(producerProps);
123+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
124+
DefaultKafkaProducerFactory<Integer, String> pfTx = new DefaultKafkaProducerFactory<>(producerProps);
125+
pfTx.setTransactionIdPrefix("fooTx.");
126+
KafkaTemplate<Integer, String> templateTx = new KafkaTemplate<>(pfTx);
127+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("txCache2Group", "false", this.embeddedKafka);
128+
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
129+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
130+
ContainerProperties containerProps = new ContainerProperties("txCache2");
131+
CountDownLatch latch = new CountDownLatch(1);
132+
containerProps.setMessageListener((MessageListener<Integer, String>) r -> {
133+
templateTx.send("txCacheSendFromListener", "bar");
134+
templateTx.send("txCacheSendFromListener", "baz");
135+
latch.countDown();
136+
});
137+
KafkaTransactionManager<Integer, String> tm = new KafkaTransactionManager<>(pfTx);
138+
containerProps.setTransactionManager(tm);
139+
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
140+
containerProps);
141+
container.start();
142+
try {
143+
ListenableFuture<SendResult<Integer, String>> future = template.send("txCache2", "foo");
144+
future.get();
145+
assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", BlockingQueue.class)).hasSize(0);
146+
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
147+
assertThat(KafkaTestUtils.getPropertyValue(pfTx, "cache", BlockingQueue.class)).hasSize(0);
148+
}
149+
finally {
150+
container.stop();
151+
pf.destroy();
152+
pfTx.destroy();
153+
}
154+
}
155+
156+
@Configuration
157+
public static class Config {
158+
159+
}
160+
50161
}

0 commit comments

Comments
 (0)