diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java index 5b4d253e67..7750b3fd5c 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java @@ -17,11 +17,7 @@ package org.springframework.boot.autoconfigure.kafka; import java.io.IOException; -import java.util.Map; -import org.apache.kafka.streams.StreamsBuilder; - -import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; @@ -32,17 +28,12 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; -import org.springframework.core.env.Environment; -import org.springframework.kafka.annotation.EnableKafkaStreams; -import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; -import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.core.StreamsBuilderFactoryBean; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.support.LoggingProducerListener; import org.springframework.kafka.support.ProducerListener; @@ -61,7 +52,8 @@ import org.springframework.kafka.transaction.KafkaTransactionManager; @Configuration @ConditionalOnClass(KafkaTemplate.class) @EnableConfigurationProperties(KafkaProperties.class) -@Import(KafkaAnnotationDrivenConfiguration.class) +@Import({ KafkaAnnotationDrivenConfiguration.class, + KafkaStreamsAnnotationDrivenConfiguration.class }) public class KafkaAutoConfiguration { private final KafkaProperties properties; @@ -147,57 +139,4 @@ public class KafkaAutoConfiguration { return kafkaAdmin; } - @Configuration - @ConditionalOnClass(StreamsBuilder.class) - public static class KafkaStreamsAutoConfiguration { - - @Bean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) - public KafkaStreamsConfiguration defaultKafkaStreamsConfig( - KafkaProperties properties, Environment environment) { - - Map streamsProperties = properties.buildStreamsProperties(); - if (properties.getStreams().getApplicationId() == null) { - if (environment.getProperty("spring.application.id") != null) { - streamsProperties.put("application.id", - environment.getProperty("spring.application.name")); - } - } - return new KafkaStreamsConfiguration(streamsProperties); - } - - @Bean - public KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer( - StreamsBuilderFactoryBean factoryBean, KafkaProperties properties) { - - return new KafkaStreamsFactoryBeanConfigurer(factoryBean, properties); - } - - @Configuration - @EnableKafkaStreams - public static class EnableKafkaStreamsAutoConfiguration { - - } - - static class KafkaStreamsFactoryBeanConfigurer implements InitializingBean { - - private final StreamsBuilderFactoryBean factoryBean; - - private final KafkaProperties properties; - - KafkaStreamsFactoryBeanConfigurer(StreamsBuilderFactoryBean factoryBean, - KafkaProperties properties) { - this.factoryBean = factoryBean; - this.properties = properties; - } - - @Override - public void afterPropertiesSet() throws Exception { - this.factoryBean - .setAutoStartup(this.properties.getStreams().isAutoStartup()); - } - - } - - } - } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index e77cf142f6..a02450cd62 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -665,7 +665,7 @@ public class KafkaProperties { /** * Whether or not to auto-start the streams factory bean. */ - private boolean autoStartup; + private boolean autoStartup = true; /** * Comma-delimited list of host:port pairs to use for establishing the initial diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java new file mode 100644 index 0000000000..7e48a87cee --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java @@ -0,0 +1,98 @@ +/* + * Copyright 2012-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.kafka; + +import java.util.Map; + +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; + +import org.springframework.beans.factory.InitializingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.context.properties.source.InvalidConfigurationPropertyValueException; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.env.Environment; +import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; +import org.springframework.kafka.config.KafkaStreamsConfiguration; +import org.springframework.kafka.core.StreamsBuilderFactoryBean; + +/** + * Configuration for Kafka Streams annotation-driven support. + * + * @author Gary Russell + * @author Stephane Nicoll + */ +@Configuration +@ConditionalOnClass(StreamsBuilder.class) +@ConditionalOnBean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME) +class KafkaStreamsAnnotationDrivenConfiguration { + + private final KafkaProperties properties; + + KafkaStreamsAnnotationDrivenConfiguration(KafkaProperties properties) { + this.properties = properties; + } + + @ConditionalOnMissingBean + @Bean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) + public KafkaStreamsConfiguration defaultKafkaStreamsConfig(Environment environment) { + Map streamsProperties = this.properties.buildStreamsProperties(); + if (this.properties.getStreams().getApplicationId() == null) { + String applicationName = environment.getProperty("spring.application.name"); + if (applicationName != null) { + streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, + applicationName); + } + else { + throw new InvalidConfigurationPropertyValueException( + "spring.kafka.streams.application-id", null, + "This property is mandatory and fallback 'spring.application.name' is not set either."); + } + } + return new KafkaStreamsConfiguration(streamsProperties); + } + + @Bean + public KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer( + StreamsBuilderFactoryBean factoryBean) { + return new KafkaStreamsFactoryBeanConfigurer(this.properties, factoryBean); + } + + // Separate class required to avoid BeanCurrentlyInCreationException + static class KafkaStreamsFactoryBeanConfigurer implements InitializingBean { + + private final KafkaProperties properties; + + private final StreamsBuilderFactoryBean factoryBean; + + KafkaStreamsFactoryBeanConfigurer(KafkaProperties properties, + StreamsBuilderFactoryBean factoryBean) { + this.properties = properties; + this.factoryBean = factoryBean; + } + + @Override + public void afterPropertiesSet() { + this.factoryBean.setAutoStartup(this.properties.getStreams().isAutoStartup()); + } + + } + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java index 7a6e773c16..8dad270aec 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java @@ -28,9 +28,12 @@ import org.junit.Test; import org.springframework.boot.test.util.TestPropertyValues; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafkaStreams; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.StreamsBuilderFactoryBean; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.test.rule.EmbeddedKafkaRule; import org.springframework.messaging.handler.annotation.Header; @@ -41,6 +44,7 @@ import static org.assertj.core.api.Assertions.assertThat; * Integration tests for {@link KafkaAutoConfiguration}. * * @author Gary Russell + * @author Stephane Nicoll */ public class KafkaAutoConfigurationIntegrationTests { @@ -83,6 +87,14 @@ public class KafkaAutoConfigurationIntegrationTests { producer.close(); } + @Test + public void testStreams() { + load(KafkaStreamsConfig.class, "spring.application.name:my-app", + "spring.kafka.bootstrap-servers:" + getEmbeddedKafkaBrokersAsString()); + assertThat(this.context.getBean(StreamsBuilderFactoryBean.class).isAutoStartup()) + .isTrue(); + } + private void load(Class config, String... environment) { this.context = doLoad(new Class[] { config }, environment); } @@ -101,7 +113,8 @@ public class KafkaAutoConfigurationIntegrationTests { return embeddedKafka.getEmbeddedKafka().getBrokersAsString(); } - public static class KafkaConfig { + @Configuration + static class KafkaConfig { @Bean public Listener listener() { @@ -115,6 +128,12 @@ public class KafkaAutoConfigurationIntegrationTests { } + @Configuration + @EnableKafkaStreams + static class KafkaStreamsConfig { + + } + public static class Listener { private final CountDownLatch latch = new CountDownLatch(1); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 6608107544..816559e886 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -18,6 +18,7 @@ package org.springframework.boot.autoconfigure.kafka; import java.io.File; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -31,6 +32,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.junit.Test; @@ -39,6 +41,7 @@ import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafkaStreams; import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; @@ -279,23 +282,26 @@ public class KafkaAutoConfigurationTests { @Test public void streamsProperties() { - this.contextRunner.withPropertyValues("spring.kafka.clientId=cid", - "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", - "spring.application.name=appName", - "spring.kafka.properties.foo.bar.baz=qux.fiz.buz", - "spring.kafka.streams.cache-max-bytes-buffering=42", - "spring.kafka.streams.client-id=override", - "spring.kafka.streams.properties.fiz.buz=fix.fox", - "spring.kafka.streams.replication-factor=2", - "spring.kafka.streams.state-dir=/tmp/state", - "spring.kafka.streams.ssl.key-password=p7", - "spring.kafka.streams.ssl.key-store-location=classpath:ksLocP", - "spring.kafka.streams.ssl.key-store-password=p8", - "spring.kafka.streams.ssl.key-store-type=PKCS12", - "spring.kafka.streams.ssl.trust-store-location=classpath:tsLocP", - "spring.kafka.streams.ssl.trust-store-password=p9", - "spring.kafka.streams.ssl.trust-store-type=PKCS12", - "spring.kafka.streams.ssl.protocol=TLSv1.2").run((context) -> { + this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class) + .withPropertyValues("spring.kafka.client-id=cid", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", + "spring.application.name=appName", + "spring.kafka.properties.foo.bar.baz=qux.fiz.buz", + "spring.kafka.streams.auto-startup=false", + "spring.kafka.streams.cache-max-bytes-buffering=42", + "spring.kafka.streams.client-id=override", + "spring.kafka.streams.properties.fiz.buz=fix.fox", + "spring.kafka.streams.replication-factor=2", + "spring.kafka.streams.state-dir=/tmp/state", + "spring.kafka.streams.ssl.key-password=p7", + "spring.kafka.streams.ssl.key-store-location=classpath:ksLocP", + "spring.kafka.streams.ssl.key-store-password=p8", + "spring.kafka.streams.ssl.key-store-type=PKCS12", + "spring.kafka.streams.ssl.trust-store-location=classpath:tsLocP", + "spring.kafka.streams.ssl.trust-store-password=p9", + "spring.kafka.streams.ssl.trust-store-type=PKCS12", + "spring.kafka.streams.ssl.protocol=TLSv1.2") + .run((context) -> { Properties configs = context.getBean( KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME, KafkaStreamsConfiguration.class).asProperties(); @@ -339,6 +345,63 @@ public class KafkaAutoConfigurationTests { }); } + @Test + public void streamsApplicationIdUsesMainApplicationNameByDefault() { + this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class) + .withPropertyValues("spring.application.name=my-test-app", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", + "spring.kafka.streams.auto-startup=false") + .run((context) -> { + Properties configs = context.getBean( + KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME, + KafkaStreamsConfiguration.class).asProperties(); + assertThat(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) + .isEqualTo("localhost:9092, localhost:9093"); + assertThat(configs.get(StreamsConfig.APPLICATION_ID_CONFIG)) + .isEqualTo("my-test-app"); + }); + } + + @Test + public void streamsWithCustomKafkaConfiguration() { + this.contextRunner + .withUserConfiguration(EnableKafkaStreamsConfiguration.class, + TestKafkaStreamsConfiguration.class) + .withPropertyValues("spring.application.name=my-test-app", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", + "spring.kafka.streams.auto-startup=false") + .run((context) -> { + Properties configs = context.getBean( + KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME, + KafkaStreamsConfiguration.class).asProperties(); + assertThat(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) + .isEqualTo("localhost:9094, localhost:9095"); + assertThat(configs.get(StreamsConfig.APPLICATION_ID_CONFIG)) + .isEqualTo("test-id"); + }); + } + + @Test + public void streamsApplicationIdIsMandatory() { + this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class) + .run((context) -> { + assertThat(context).hasFailed(); + assertThat(context).getFailure() + .hasMessageContaining("spring.kafka.streams.application-id") + .hasMessageContaining( + "This property is mandatory and fallback 'spring.application.name' is not set either."); + + }); + } + + @Test + public void streamsApplicationIdIsNotMandatoryIfEnableKafkaStreamsIsNotSet() { + this.contextRunner.run((context) -> { + assertThat(context).hasNotFailed(); + assertThat(context).doesNotHaveBean(StreamsBuilder.class); + }); + } + @SuppressWarnings("unchecked") @Test public void listenerProperties() { @@ -470,4 +533,25 @@ public class KafkaAutoConfigurationTests { } + @Configuration + @EnableKafkaStreams + protected static class EnableKafkaStreamsConfiguration { + + } + + @Configuration + protected static class TestKafkaStreamsConfiguration { + + @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) + public KafkaStreamsConfiguration kafkaStreamsConfiguration() { + Map streamsProperties = new HashMap<>(); + streamsProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + "localhost:9094, localhost:9095"); + streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-id"); + + return new KafkaStreamsConfiguration(streamsProperties); + } + + } + } diff --git a/spring-boot-project/spring-boot-docs/pom.xml b/spring-boot-project/spring-boot-docs/pom.xml index 96ea4a3ff5..6bbe0cd4ef 100644 --- a/spring-boot-project/spring-boot-docs/pom.xml +++ b/spring-boot-project/spring-boot-docs/pom.xml @@ -397,6 +397,11 @@ commons-dbcp2 true + + org.apache.kafka + kafka-streams + true + org.apache.logging.log4j log4j-api diff --git a/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc b/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc index d8b264da78..2ff4350624 100644 --- a/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc +++ b/spring-boot-project/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc @@ -1105,7 +1105,8 @@ content into your application. Rather, pick only the properties that you need. spring.kafka.ssl.trust-store-location= # Location of the trust store file. spring.kafka.ssl.trust-store-password= # Store password for the trust store file. spring.kafka.ssl.trust-store-type= # Type of the trust store. - spring.kafka.streams.auto-startup= # Whether or not to auto-start the streams factory bean. + spring.kafka.streams.application-id = # Kafka streams application.id property; default spring.application.name. + spring.kafka.streams.auto-startup=true # Whether or not to auto-start the streams factory bean. spring.kafka.streams.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster. Overrides the global property, for streams. spring.kafka.streams.cache-max-bytes-buffering= # Maximum number of memory bytes to be used for buffering across all threads. spring.kafka.streams.client-id= # ID to pass to the server when making requests. Used for server-side logging. diff --git a/spring-boot-project/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc b/spring-boot-project/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc index d3bfaeb479..cc83d9ae71 100644 --- a/spring-boot-project/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc +++ b/spring-boot-project/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc @@ -5634,44 +5634,34 @@ The following component creates a listener endpoint on the `someTopic` topic: } ---- -[[boot-deatures-kafka-streams]] +[[boot-features-kafka-streams]] ==== Kafka Streams - Spring for Apache Kafka provides a factory bean to create a `StreamsBuilder` object and -manage the lifecycle of its streams; the factory bean is created when -`@EnableKafkaStreams` is present on a `@Configuration` class. -The factory bean requires a `KafkaStreamsConfiguration` object for streams configuration. +manage the lifecycle of its streams. Spring Boot auto-configures the required +`KafkaStreamsConfiguration` bean as long as `kafka-streams` in on the classpath and kafka +streams is enabled via the @EnableKafkaStreams` annotation. -If Spring Boot detects the `kafka-streams` jar on the classpath, it will auto-configure -the `KafkaStreamsConfiguration` bean from the `KafkaProperties` object as well as enabling -the creation of the factory bean by spring-kafka. +Enabling Kafka Streams means that the application id and bootstrap servers must be set. +The former can be configured using `spring.kafka.streams.application-id`, defaulting to +`spring.application.name` if not set. The later can be set globally or +specifically overridden just for streams. -There are two required Kafka properties for streams (`bootstrap.servers` and -`application.id`); by default, the `application.id` is set to the `spring.application.name` -property, if present. -The `bootstrap.servers` can be set globally or specifically overridden just for streams. -Several other properties are specifically available as boot properties; other arbitrary -Kafka properties can be set using the `spring.kafka.streams.properties` property. -See <> for more information. +Several additional properties are available using dedicated properties; other arbitrary +Kafka properties can be set using the `spring.kafka.streams.properties` namespace. See +also <> for more information. -To use the factory bean, simply wire its `StreamsBuilder` into your `@Bean` s. +To use the factory bean, simply wire `StreamsBuilder` into your `@Bean` as shown in the +following example: -==== -[source, java] ----- -@Bean -public KStream kStream(StreamsBuilder streamsBuilder) { - KStream stream = streamsBuilder.stream("ks1In"); - stream.map((k, v) -> new KeyValue(k, v.toUpperCase())) - .to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>())); - return stream; -} +[source,java,indent=0] ---- -==== +include::{code-examples}/kafka/KafkaStreamsBeanExample.java[tag=configuration] +---- + +By default, the streams managed by the `StreamBuilder` object it creates are started +automatically. You can customize this behaviour using the +`spring.kafka.streams.auto-startup` property. -By default, the factory bean `autoStartup` property is false; to automatically start the -streams managed by the `StreamsBuilder` object it creates, set property -`spting.kafka.streams.auto-startup=true`. [[boot-features-kafka-extra-props]] diff --git a/spring-boot-project/spring-boot-docs/src/main/java/org/springframework/boot/docs/kafka/KafkaStreamsBeanExample.java b/spring-boot-project/spring-boot-docs/src/main/java/org/springframework/boot/docs/kafka/KafkaStreamsBeanExample.java new file mode 100644 index 0000000000..d9ddf5f195 --- /dev/null +++ b/spring-boot-project/spring-boot-docs/src/main/java/org/springframework/boot/docs/kafka/KafkaStreamsBeanExample.java @@ -0,0 +1,53 @@ +/* + * Copyright 2012-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.docs.kafka; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Produced; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafkaStreams; +import org.springframework.kafka.support.serializer.JsonSerde; + +/** + * Example to show usage of {@link StreamsBuilder}. + * + * @author Stephane Nicoll + */ +public class KafkaStreamsBeanExample { + + // tag::configuration[] + @Configuration + @EnableKafkaStreams + static class KafkaStreamsExampleConfiguration { + + @Bean + public KStream kStream(StreamsBuilder streamsBuilder) { + KStream stream = streamsBuilder.stream("ks1In"); + stream.map((k, v) -> new KeyValue(k, v.toUpperCase())).to("ks1Out", + Produced.with(Serdes.Integer(), new JsonSerde<>())); + return stream; + } + + } + // end::configuration[] + +}