Merge branch '2.2.x'

Closes gh-20616
pull/17274/head
Stephane Nicoll 5 years ago
commit ec8d2c5843

@ -1,5 +1,5 @@
/*
* Copyright 2012-2019 the original author or authors.
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -26,6 +26,7 @@ import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerConfigUtils;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchErrorHandler;
@ -112,9 +113,10 @@ class KafkaAnnotationDrivenConfiguration {
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
configurer.configure(factory, kafkaConsumerFactory
.getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
return factory;
}

@ -51,6 +51,7 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
@ -575,6 +576,16 @@ class KafkaAutoConfigurationTests {
});
}
@Test
void testConcurrentKafkaListenerContainerFactoryWithCustomConsumerFactory() {
this.contextRunner.withUserConfiguration(ConsumerFactoryConfiguration.class).run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(kafkaListenerContainerFactory.getConsumerFactory())
.isNotSameAs(context.getBean(ConsumerFactoryConfiguration.class).consumerFactory);
});
}
@Test
void specificSecurityProtocolOverridesCommonSecurityProtocol() {
this.contextRunner.withPropertyValues("spring.kafka.security.protocol=SSL",
@ -653,6 +664,18 @@ class KafkaAutoConfigurationTests {
}
@Configuration(proxyBeanMethods = false)
static class ConsumerFactoryConfiguration {
private final ConsumerFactory<String, Object> consumerFactory = mock(ConsumerFactory.class);
@Bean
ConsumerFactory<String, Object> myConsumerFactory() {
return this.consumerFactory;
}
}
@Configuration(proxyBeanMethods = false)
static class RecordInterceptorConfiguration {

Loading…
Cancel
Save