diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java index 996b2199d8..f52f142649 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2022 the original author or authors. + * Copyright 2012-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.ContainerCustomizer; import org.springframework.kafka.config.KafkaListenerConfigUtils; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; @@ -31,6 +32,7 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.AfterRollbackProcessor; import org.springframework.kafka.listener.BatchInterceptor; import org.springframework.kafka.listener.CommonErrorHandler; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.kafka.listener.RecordInterceptor; import org.springframework.kafka.listener.adapter.RecordFilterStrategy; @@ -121,10 +123,12 @@ class KafkaAnnotationDrivenConfiguration { @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory") ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, - ObjectProvider> kafkaConsumerFactory) { + ObjectProvider> kafkaConsumerFactory, + ObjectProvider>> kafkaContainerCustomizer) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); configurer.configure(factory, kafkaConsumerFactory .getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties()))); + kafkaContainerCustomizer.ifAvailable(factory::setContainerCustomizer); return factory; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index ec53a2d850..ccbb9e1040 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -53,6 +53,7 @@ import org.springframework.kafka.annotation.EnableKafkaStreams; import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; import org.springframework.kafka.config.AbstractKafkaListenerContainerFactory; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.ContainerCustomizer; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.config.StreamsBuilderFactoryBean; @@ -65,6 +66,7 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.AfterRollbackProcessor; import org.springframework.kafka.listener.BatchInterceptor; import org.springframework.kafka.listener.CommonErrorHandler; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ContainerProperties.AckMode; @@ -696,6 +698,17 @@ class KafkaAutoConfigurationTests { }); } + @Test + void testConcurrentKafkaListenerContainerFactoryWithCustomContainerCustomizer() { + this.contextRunner.withUserConfiguration(ObservationEnabledContainerCustomizerConfiguration.class) + .run((context) -> { + ConcurrentKafkaListenerContainerFactory factory = context + .getBean(ConcurrentKafkaListenerContainerFactory.class); + ConcurrentMessageListenerContainer container = factory.createContainer("someTopic"); + assertThat(container.getContainerProperties().isObservationEnabled()).isEqualTo(true); + }); + } + @Test void specificSecurityProtocolOverridesCommonSecurityProtocol() { this.contextRunner.withPropertyValues("spring.kafka.security.protocol=SSL", @@ -765,6 +778,16 @@ class KafkaAutoConfigurationTests { } + @Configuration(proxyBeanMethods = false) + static class ObservationEnabledContainerCustomizerConfiguration { + + @Bean + ContainerCustomizer> myContainerCustomizer() { + return (container) -> container.getContainerProperties().setObservationEnabled(true); + } + + } + @Configuration(proxyBeanMethods = false) static class RecordInterceptorConfiguration {