diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java index 1fb2e2643d..66c048db49 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2019 the original author or authors. + * Copyright 2012-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerConfigUtils; import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.AfterRollbackProcessor; import org.springframework.kafka.listener.BatchErrorHandler; @@ -112,9 +113,10 @@ class KafkaAnnotationDrivenConfiguration { @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory") ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, - ConsumerFactory kafkaConsumerFactory) { + ObjectProvider> kafkaConsumerFactory) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - configurer.configure(factory, kafkaConsumerFactory); + configurer.configure(factory, kafkaConsumerFactory + .getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties()))); return factory; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 887bb4aab3..66ac0f1a6f 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -51,6 +51,7 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.config.StreamsBuilderFactoryBean; +import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaAdmin; @@ -575,6 +576,16 @@ class KafkaAutoConfigurationTests { }); } + @Test + void testConcurrentKafkaListenerContainerFactoryWithCustomConsumerFactory() { + this.contextRunner.withUserConfiguration(ConsumerFactoryConfiguration.class).run((context) -> { + ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory = context + .getBean(ConcurrentKafkaListenerContainerFactory.class); + assertThat(kafkaListenerContainerFactory.getConsumerFactory()) + .isNotSameAs(context.getBean(ConsumerFactoryConfiguration.class).consumerFactory); + }); + } + @Test void specificSecurityProtocolOverridesCommonSecurityProtocol() { this.contextRunner.withPropertyValues("spring.kafka.security.protocol=SSL", @@ -653,6 +664,18 @@ class KafkaAutoConfigurationTests { } + @Configuration(proxyBeanMethods = false) + static class ConsumerFactoryConfiguration { + + private final ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + + @Bean + ConsumerFactory myConsumerFactory() { + return this.consumerFactory; + } + + } + @Configuration(proxyBeanMethods = false) static class RecordInterceptorConfiguration {