From c2f9e7dd4385ad833d1e066a9d82614ca28e7c72 Mon Sep 17 00:00:00 2001 From: Stephane Nicoll Date: Wed, 27 Mar 2019 10:17:49 +0100 Subject: [PATCH] Fix StreamsBuilderFactoryBean injection definition Closes gh-16329 --- ...aStreamsAnnotationDrivenConfiguration.java | 5 ++- .../kafka/KafkaAutoConfigurationTests.java | 43 ++++++++++++++++++- 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java index c8e8c54132..ffcbedf37e 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2018 the original author or authors. + * Copyright 2012-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -68,7 +69,7 @@ class KafkaStreamsAnnotationDrivenConfiguration { @Bean public KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer( - StreamsBuilderFactoryBean factoryBean) { + @Qualifier(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME) StreamsBuilderFactoryBean factoryBean) { return new KafkaStreamsFactoryBeanConfigurer(this.properties, factoryBean); } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index b1bb44695f..b2b95cb61f 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2018 the original author or authors. + * Copyright 2012-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,6 +47,7 @@ import org.springframework.kafka.config.AbstractKafkaListenerContainerFactory; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaStreamsConfiguration; +import org.springframework.kafka.config.StreamsBuilderFactoryBean; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaAdmin; @@ -68,6 +69,8 @@ import org.springframework.transaction.PlatformTransactionManager; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.entry; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; /** * Tests for {@link KafkaAutoConfiguration}. @@ -401,6 +404,29 @@ public class KafkaAutoConfigurationTests { }); } + @Test + public void streamsWithSeveralStreamsBuilderFactoryBeans() { + this.contextRunner + .withUserConfiguration(EnableKafkaStreamsConfiguration.class, + TestStreamsBuilderFactoryBeanConfiguration.class) + .withPropertyValues("spring.application.name=my-test-app", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", + "spring.kafka.streams.auto-startup=false") + .run((context) -> { + Properties configs = context.getBean( + KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME, + KafkaStreamsConfiguration.class).asProperties(); + assertThat(configs.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG)) + .isEqualTo("localhost:9092, localhost:9093"); + verify(context.getBean("&firstStreamsBuilderFactoryBean", + StreamsBuilderFactoryBean.class), never()) + .setAutoStartup(false); + verify(context.getBean("&secondStreamsBuilderFactoryBean", + StreamsBuilderFactoryBean.class), never()) + .setAutoStartup(false); + }); + } + @Test public void streamsApplicationIdIsMandatory() { this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class) @@ -639,4 +665,19 @@ public class KafkaAutoConfigurationTests { } + @Configuration + protected static class TestStreamsBuilderFactoryBeanConfiguration { + + @Bean + public StreamsBuilderFactoryBean firstStreamsBuilderFactoryBean() { + return mock(StreamsBuilderFactoryBean.class); + } + + @Bean + public StreamsBuilderFactoryBean secondStreamsBuilderFactoryBean() { + return mock(StreamsBuilderFactoryBean.class); + } + + } + }