From 74208bb1a7d593759d52280be7748815b90741b4 Mon Sep 17 00:00:00 2001 From: Stephane Nicoll Date: Tue, 21 May 2019 09:49:34 +0200 Subject: [PATCH] Polish "Auto-configure Kafka listener container with rebalance listener" Closes gh-16755 --- ...fkaListenerContainerFactoryConfigurer.java | 22 +++++++++---------- .../KafkaAnnotationDrivenConfiguration.java | 12 +++++----- .../kafka/KafkaAutoConfigurationTests.java | 3 +-- .../main/asciidoc/spring-boot-features.adoc | 5 +++-- 4 files changed, 21 insertions(+), 21 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index e5775b6b61..78d614f999 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -48,14 +48,14 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { private KafkaAwareTransactionManager transactionManager; + private ConsumerAwareRebalanceListener rebalanceListener; + private ErrorHandler errorHandler; private BatchErrorHandler batchErrorHandler; private AfterRollbackProcessor afterRollbackProcessor; - private ConsumerAwareRebalanceListener rebalanceListener; - /** * Set the {@link KafkaProperties} to use. * @param properties the properties @@ -89,6 +89,15 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { this.transactionManager = transactionManager; } + /** + * Set the {@link ConsumerAwareRebalanceListener} to use. + * @param rebalanceListener the rebalance listener. + * @since 2.2 + */ + void setRebalanceListener(ConsumerAwareRebalanceListener rebalanceListener) { + this.rebalanceListener = rebalanceListener; + } + /** * Set the {@link ErrorHandler} to use. * @param errorHandler the error handler @@ -114,15 +123,6 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { this.afterRollbackProcessor = afterRollbackProcessor; } - /** - * Set the {@link ConsumerAwareRebalanceListener} to use. - * @param rebalanceListener the rebalance listener. - * @since 2.2 - */ - void setRebalanceListener(ConsumerAwareRebalanceListener rebalanceListener) { - this.rebalanceListener = rebalanceListener; - } - /** * Configure the specified Kafka listener container factory. The factory can be * further tuned and default settings can be overridden. diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java index 5f8b009803..89dca80442 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java @@ -58,33 +58,33 @@ class KafkaAnnotationDrivenConfiguration { private final KafkaAwareTransactionManager transactionManager; + private final ConsumerAwareRebalanceListener rebalanceListener; + private final ErrorHandler errorHandler; private final BatchErrorHandler batchErrorHandler; private final AfterRollbackProcessor afterRollbackProcessor; - private final ConsumerAwareRebalanceListener rebalanceListener; - KafkaAnnotationDrivenConfiguration(KafkaProperties properties, ObjectProvider messageConverter, ObjectProvider batchMessageConverter, ObjectProvider> kafkaTemplate, ObjectProvider> kafkaTransactionManager, + ObjectProvider rebalanceListener, ObjectProvider errorHandler, ObjectProvider batchErrorHandler, - ObjectProvider> afterRollbackProcessor, - ObjectProvider rebalanceListener) { + ObjectProvider> afterRollbackProcessor) { this.properties = properties; this.messageConverter = messageConverter.getIfUnique(); this.batchMessageConverter = batchMessageConverter.getIfUnique( () -> new BatchMessagingMessageConverter(this.messageConverter)); this.kafkaTemplate = kafkaTemplate.getIfUnique(); this.transactionManager = kafkaTransactionManager.getIfUnique(); + this.rebalanceListener = rebalanceListener.getIfUnique(); this.errorHandler = errorHandler.getIfUnique(); this.batchErrorHandler = batchErrorHandler.getIfUnique(); this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique(); - this.rebalanceListener = rebalanceListener.getIfUnique(); } @Bean @@ -97,10 +97,10 @@ class KafkaAnnotationDrivenConfiguration { configurer.setMessageConverter(messageConverterToUse); configurer.setReplyTemplate(this.kafkaTemplate); configurer.setTransactionManager(this.transactionManager); + configurer.setRebalanceListener(this.rebalanceListener); configurer.setErrorHandler(this.errorHandler); configurer.setBatchErrorHandler(this.batchErrorHandler); configurer.setAfterRollbackProcessor(this.afterRollbackProcessor); - configurer.setRebalanceListener(this.rebalanceListener); return configurer; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 585675c265..5c1c385904 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -767,8 +767,7 @@ public class KafkaAutoConfigurationTests { @Bean public ConsumerAwareRebalanceListener rebalanceListener() { - return new ConsumerAwareRebalanceListener() { - }; + return mock(ConsumerAwareRebalanceListener.class); } } diff --git a/spring-boot-project/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc b/spring-boot-project/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc index 15550b8e3c..728d4a4caa 100644 --- a/spring-boot-project/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc +++ b/spring-boot-project/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc @@ -6158,8 +6158,9 @@ The following component creates a listener endpoint on the `someTopic` topic: ---- If a `KafkaTransactionManager` bean is defined, it is automatically associated to the -container factory. Similarly, if a `ErrorHandler` or `AfterRollbackProcessor` bean is -defined, it is automatically associated to the default factory. +container factory. Similarly, if a `ErrorHandler`, `AfterRollbackProcessor` or +`ConsumerAwareRebalanceListener` bean is defined, it is automatically associated to the +default factory. Depending on the listener type, a `RecordMessageConverter` or `BatchMessageConverter` bean is associated to the default factory. If only a `RecordMessageConverter` bean is present