Skip to content

Commit 8fe9406

Browse files
artembilangaryrussell
authored andcommitted
Process errorHandler in class level KafkaListener
The `errorHandler` attributed has been missed from the `KafkaListenerAnnotationBeanPostProcessor.processMultiMethodListeners()` logic. * Move `errorHandler` and `BeanFactory` population into the `processListener()` method in the `KafkaListenerAnnotationBeanPostProcessor` **Cherry-pick to 2.1.x, 2.0.x & 1.3.x**
1 parent 87ec582 commit 8fe9406

File tree

2 files changed

+24
-8
lines changed

2 files changed

+24
-8
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,6 @@ private void processMultiMethodListeners(Collection<KafkaListener> classLevelLis
325325
for (KafkaListener classLevelListener : classLevelListeners) {
326326
MultiMethodKafkaListenerEndpoint<K, V> endpoint = new MultiMethodKafkaListenerEndpoint<>(checkedMethods,
327327
bean);
328-
endpoint.setBeanFactory(this.beanFactory);
329328
processListener(endpoint, classLevelListener, bean, bean.getClass(), beanName);
330329
}
331330
}
@@ -334,11 +333,6 @@ protected void processKafkaListener(KafkaListener kafkaListener, Method method,
334333
Method methodToUse = checkProxy(method, bean);
335334
MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
336335
endpoint.setMethod(methodToUse);
337-
endpoint.setBeanFactory(this.beanFactory);
338-
String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
339-
if (StringUtils.hasText(errorHandlerBeanName)) {
340-
endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));
341-
}
342336
processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
343337
}
344338

@@ -405,6 +399,11 @@ protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, Kafka
405399
}
406400
}
407401

402+
endpoint.setBeanFactory(this.beanFactory);
403+
String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
404+
if (StringUtils.hasText(errorHandlerBeanName)) {
405+
endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));
406+
}
408407
this.registrar.registerEndpoint(endpoint, factory);
409408
}
410409

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,8 @@ public void testMulti() throws Exception {
281281
template.flush();
282282
assertThat(this.multiListener.latch1.await(60, TimeUnit.SECONDS)).isTrue();
283283
assertThat(this.multiListener.latch2.await(60, TimeUnit.SECONDS)).isTrue();
284+
template.send("annotated8", 0, 1, "junk");
285+
assertThat(this.multiListener.errorLatch.await(60, TimeUnit.SECONDS)).isTrue();
284286
}
285287

286288
@Test
@@ -1273,18 +1275,33 @@ public CountDownLatch getLatch2() {
12731275
return latch2;
12741276
}
12751277

1278+
@Bean
1279+
public KafkaListenerErrorHandler consumeMultiMethodException(MultiListenerBean listener) {
1280+
return (m, e) -> {
1281+
listener.errorLatch.countDown();
1282+
return null;
1283+
};
1284+
}
1285+
12761286
}
12771287

1278-
@KafkaListener(id = "multi", topics = "annotated8")
1288+
@KafkaListener(id = "multi", topics = "annotated8", errorHandler = "consumeMultiMethodException")
12791289
static class MultiListenerBean {
12801290

12811291
private final CountDownLatch latch1 = new CountDownLatch(1);
12821292

12831293
private final CountDownLatch latch2 = new CountDownLatch(1);
12841294

1295+
private final CountDownLatch errorLatch = new CountDownLatch(1);
1296+
12851297
@KafkaHandler
12861298
public void bar(@NonNull String bar) {
1287-
latch1.countDown();
1299+
if ("junk".equals(bar)) {
1300+
throw new RuntimeException("intentional");
1301+
}
1302+
else {
1303+
this.latch1.countDown();
1304+
}
12881305
}
12891306

12901307
@KafkaHandler

0 commit comments

Comments
 (0)