From eeda12bd58c2b49e6a053fc545eade7168c0ff1f Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 26 Jun 2019 12:55:00 -0400 Subject: [PATCH] Auto-configure KLC with user-provided RecordInterceptor spring-kafka.2.3.0.M3 introduced a new property `RecordInterceptor` to the container factory. Auto-configure the property if a single instance is present. See gh-17322 --- ...fkaListenerContainerFactoryConfigurer.java | 12 +++++++++++ .../KafkaAnnotationDrivenConfiguration.java | 8 +++++++- .../kafka/KafkaAutoConfigurationTests.java | 20 +++++++++++++++++++ 3 files changed, 39 insertions(+), 1 deletion(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index 9ff356ca9d..28ca5b4914 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -28,6 +28,7 @@ import org.springframework.kafka.listener.BatchErrorHandler; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ErrorHandler; +import org.springframework.kafka.listener.RecordInterceptor; import org.springframework.kafka.support.converter.MessageConverter; import org.springframework.kafka.transaction.KafkaAwareTransactionManager; @@ -56,6 +57,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { private AfterRollbackProcessor afterRollbackProcessor; + private RecordInterceptor recordInterceptor; + /** * Set the {@link KafkaProperties} to use. * @param properties the properties @@ -121,6 +124,14 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { this.afterRollbackProcessor = afterRollbackProcessor; } + /** + * Set the {@link RecordInterceptor} to use. + * @param recordInterceptor the record interceptor. + */ + void setRecordInterceptor(RecordInterceptor recordInterceptor) { + this.recordInterceptor = recordInterceptor; + } + /** * Configure the specified Kafka listener container factory. The factory can be * further tuned and default settings can be overridden. @@ -149,6 +160,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { factory.setErrorHandler(this.errorHandler); } map.from(this.afterRollbackProcessor).to(factory::setAfterRollbackProcessor); + map.from(this.recordInterceptor).to(factory::setRecordInterceptor); } private void configureContainer(ContainerProperties container) { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java index d325192c87..a77d48c6ae 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java @@ -31,6 +31,7 @@ import org.springframework.kafka.listener.AfterRollbackProcessor; import org.springframework.kafka.listener.BatchErrorHandler; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.kafka.listener.ErrorHandler; +import org.springframework.kafka.listener.RecordInterceptor; import org.springframework.kafka.support.converter.BatchMessageConverter; import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; import org.springframework.kafka.support.converter.MessageConverter; @@ -66,6 +67,8 @@ class KafkaAnnotationDrivenConfiguration { private final AfterRollbackProcessor afterRollbackProcessor; + private final RecordInterceptor recordInterceptor; + KafkaAnnotationDrivenConfiguration(KafkaProperties properties, ObjectProvider messageConverter, ObjectProvider batchMessageConverter, @@ -73,7 +76,8 @@ class KafkaAnnotationDrivenConfiguration { ObjectProvider> kafkaTransactionManager, ObjectProvider rebalanceListener, ObjectProvider errorHandler, ObjectProvider batchErrorHandler, - ObjectProvider> afterRollbackProcessor) { + ObjectProvider> afterRollbackProcessor, + ObjectProvider> recordInterceptor) { this.properties = properties; this.messageConverter = messageConverter.getIfUnique(); this.batchMessageConverter = batchMessageConverter @@ -84,6 +88,7 @@ class KafkaAnnotationDrivenConfiguration { this.errorHandler = errorHandler.getIfUnique(); this.batchErrorHandler = batchErrorHandler.getIfUnique(); this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique(); + this.recordInterceptor = recordInterceptor.getIfUnique(); } @Bean @@ -100,6 +105,7 @@ class KafkaAnnotationDrivenConfiguration { configurer.setErrorHandler(this.errorHandler); configurer.setBatchErrorHandler(this.batchErrorHandler); configurer.setAfterRollbackProcessor(this.afterRollbackProcessor); + configurer.setRecordInterceptor(this.recordInterceptor); return configurer; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 36f84c9d10..43251d9492 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -58,6 +58,7 @@ import org.springframework.kafka.listener.AfterRollbackProcessor; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ContainerProperties.AckMode; +import org.springframework.kafka.listener.RecordInterceptor; import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler; import org.springframework.kafka.listener.SeekToCurrentErrorHandler; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; @@ -537,6 +538,15 @@ class KafkaAutoConfigurationTests { }); } + @Test + void testConcurrentKafkaListenerContainerFactoryWithCustomRecordInterceptor() { + this.contextRunner.withUserConfiguration(RecordInterceptorConfiguration.class).run((context) -> { + ConcurrentKafkaListenerContainerFactory factory = context + .getBean(ConcurrentKafkaListenerContainerFactory.class); + assertThat(factory).hasFieldOrPropertyWithValue("recordInterceptor", context.getBean("recordInterceptor")); + }); + } + @Test void testConcurrentKafkaListenerContainerFactoryWithCustomRebalanceListener() { this.contextRunner.withUserConfiguration(RebalanceListenerConfiguration.class).run((context) -> { @@ -621,6 +631,16 @@ class KafkaAutoConfigurationTests { } + @Configuration(proxyBeanMethods = false) + protected static class RecordInterceptorConfiguration { + + @Bean + public RecordInterceptor recordInterceptor() { + return (record) -> record; + } + + } + @Configuration(proxyBeanMethods = false) protected static class RebalanceListenerConfiguration {