Merge branch '2.1.x'

pull/16341/head
Stephane Nicoll 6 years ago
commit bece962711

@ -22,6 +22,7 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
@ -68,7 +69,7 @@ class KafkaStreamsAnnotationDrivenConfiguration {
@Bean
public KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer(
StreamsBuilderFactoryBean factoryBean) {
@Qualifier(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME) StreamsBuilderFactoryBean factoryBean) {
return new KafkaStreamsFactoryBeanConfigurer(this.properties, factoryBean);
}

@ -47,6 +47,7 @@ import org.springframework.kafka.config.AbstractKafkaListenerContainerFactory;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
@ -70,6 +71,8 @@ import org.springframework.transaction.PlatformTransactionManager;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
/**
* Tests for {@link KafkaAutoConfiguration}.
@ -387,6 +390,29 @@ public class KafkaAutoConfigurationTests {
});
}
@Test
public void streamsWithSeveralStreamsBuilderFactoryBeans() {
this.contextRunner
.withUserConfiguration(EnableKafkaStreamsConfiguration.class,
TestStreamsBuilderFactoryBeanConfiguration.class)
.withPropertyValues("spring.application.name=my-test-app",
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093",
"spring.kafka.streams.auto-startup=false")
.run((context) -> {
Properties configs = context.getBean(
KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME,
KafkaStreamsConfiguration.class).asProperties();
assertThat(configs.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG))
.isEqualTo("localhost:9092, localhost:9093");
verify(context.getBean("&firstStreamsBuilderFactoryBean",
StreamsBuilderFactoryBean.class), never())
.setAutoStartup(false);
verify(context.getBean("&secondStreamsBuilderFactoryBean",
StreamsBuilderFactoryBean.class), never())
.setAutoStartup(false);
});
}
@Test
public void streamsApplicationIdIsMandatory() {
this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class)
@ -680,4 +706,19 @@ public class KafkaAutoConfigurationTests {
}
@Configuration
protected static class TestStreamsBuilderFactoryBeanConfiguration {
@Bean
public StreamsBuilderFactoryBean firstStreamsBuilderFactoryBean() {
return mock(StreamsBuilderFactoryBean.class);
}
@Bean
public StreamsBuilderFactoryBean secondStreamsBuilderFactoryBean() {
return mock(StreamsBuilderFactoryBean.class);
}
}
}

Loading…
Cancel
Save