Polish "Add Kafka Streams auto-configuration"

Closes gh-14021
pull/14254/head
Stephane Nicoll 6 years ago
parent a7acbbd625
commit 6d4bab911c

@ -17,11 +17,7 @@
package org.springframework.boot.autoconfigure.kafka;
import java.io.IOException;
import java.util.Map;
import org.apache.kafka.streams.StreamsBuilder;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
@ -32,17 +28,12 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
@ -61,7 +52,8 @@ import org.springframework.kafka.transaction.KafkaTransactionManager;
@Configuration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import(KafkaAnnotationDrivenConfiguration.class)
@Import({ KafkaAnnotationDrivenConfiguration.class,
KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration {
private final KafkaProperties properties;
@ -147,57 +139,4 @@ public class KafkaAutoConfiguration {
return kafkaAdmin;
}
@Configuration
@ConditionalOnClass(StreamsBuilder.class)
public static class KafkaStreamsAutoConfiguration {
@Bean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration defaultKafkaStreamsConfig(
KafkaProperties properties, Environment environment) {
Map<String, Object> streamsProperties = properties.buildStreamsProperties();
if (properties.getStreams().getApplicationId() == null) {
if (environment.getProperty("spring.application.id") != null) {
streamsProperties.put("application.id",
environment.getProperty("spring.application.name"));
}
}
return new KafkaStreamsConfiguration(streamsProperties);
}
@Bean
public KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer(
StreamsBuilderFactoryBean factoryBean, KafkaProperties properties) {
return new KafkaStreamsFactoryBeanConfigurer(factoryBean, properties);
}
@Configuration
@EnableKafkaStreams
public static class EnableKafkaStreamsAutoConfiguration {
}
static class KafkaStreamsFactoryBeanConfigurer implements InitializingBean {
private final StreamsBuilderFactoryBean factoryBean;
private final KafkaProperties properties;
KafkaStreamsFactoryBeanConfigurer(StreamsBuilderFactoryBean factoryBean,
KafkaProperties properties) {
this.factoryBean = factoryBean;
this.properties = properties;
}
@Override
public void afterPropertiesSet() throws Exception {
this.factoryBean
.setAutoStartup(this.properties.getStreams().isAutoStartup());
}
}
}
}

@ -665,7 +665,7 @@ public class KafkaProperties {
/**
* Whether or not to auto-start the streams factory bean.
*/
private boolean autoStartup;
private boolean autoStartup = true;
/**
* Comma-delimited list of host:port pairs to use for establishing the initial

@ -0,0 +1,98 @@
/*
* Copyright 2012-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.kafka;
import java.util.Map;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.source.InvalidConfigurationPropertyValueException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
/**
* Configuration for Kafka Streams annotation-driven support.
*
* @author Gary Russell
* @author Stephane Nicoll
*/
@Configuration
@ConditionalOnClass(StreamsBuilder.class)
@ConditionalOnBean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
class KafkaStreamsAnnotationDrivenConfiguration {
private final KafkaProperties properties;
KafkaStreamsAnnotationDrivenConfiguration(KafkaProperties properties) {
this.properties = properties;
}
@ConditionalOnMissingBean
@Bean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration defaultKafkaStreamsConfig(Environment environment) {
Map<String, Object> streamsProperties = this.properties.buildStreamsProperties();
if (this.properties.getStreams().getApplicationId() == null) {
String applicationName = environment.getProperty("spring.application.name");
if (applicationName != null) {
streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG,
applicationName);
}
else {
throw new InvalidConfigurationPropertyValueException(
"spring.kafka.streams.application-id", null,
"This property is mandatory and fallback 'spring.application.name' is not set either.");
}
}
return new KafkaStreamsConfiguration(streamsProperties);
}
@Bean
public KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer(
StreamsBuilderFactoryBean factoryBean) {
return new KafkaStreamsFactoryBeanConfigurer(this.properties, factoryBean);
}
// Separate class required to avoid BeanCurrentlyInCreationException
static class KafkaStreamsFactoryBeanConfigurer implements InitializingBean {
private final KafkaProperties properties;
private final StreamsBuilderFactoryBean factoryBean;
KafkaStreamsFactoryBeanConfigurer(KafkaProperties properties,
StreamsBuilderFactoryBean factoryBean) {
this.properties = properties;
this.factoryBean = factoryBean;
}
@Override
public void afterPropertiesSet() {
this.factoryBean.setAutoStartup(this.properties.getStreams().isAutoStartup());
}
}
}

@ -28,9 +28,12 @@ import org.junit.Test;
import org.springframework.boot.test.util.TestPropertyValues;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.messaging.handler.annotation.Header;
@ -41,6 +44,7 @@ import static org.assertj.core.api.Assertions.assertThat;
* Integration tests for {@link KafkaAutoConfiguration}.
*
* @author Gary Russell
* @author Stephane Nicoll
*/
public class KafkaAutoConfigurationIntegrationTests {
@ -83,6 +87,14 @@ public class KafkaAutoConfigurationIntegrationTests {
producer.close();
}
@Test
public void testStreams() {
load(KafkaStreamsConfig.class, "spring.application.name:my-app",
"spring.kafka.bootstrap-servers:" + getEmbeddedKafkaBrokersAsString());
assertThat(this.context.getBean(StreamsBuilderFactoryBean.class).isAutoStartup())
.isTrue();
}
private void load(Class<?> config, String... environment) {
this.context = doLoad(new Class<?>[] { config }, environment);
}
@ -101,7 +113,8 @@ public class KafkaAutoConfigurationIntegrationTests {
return embeddedKafka.getEmbeddedKafka().getBrokersAsString();
}
public static class KafkaConfig {
@Configuration
static class KafkaConfig {
@Bean
public Listener listener() {
@ -115,6 +128,12 @@ public class KafkaAutoConfigurationIntegrationTests {
}
@Configuration
@EnableKafkaStreams
static class KafkaStreamsConfig {
}
public static class Listener {
private final CountDownLatch latch = new CountDownLatch(1);

@ -18,6 +18,7 @@ package org.springframework.boot.autoconfigure.kafka;
import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@ -31,6 +32,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.junit.Test;
@ -39,6 +41,7 @@ import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
@ -279,23 +282,26 @@ public class KafkaAutoConfigurationTests {
@Test
public void streamsProperties() {
this.contextRunner.withPropertyValues("spring.kafka.clientId=cid",
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093",
"spring.application.name=appName",
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
"spring.kafka.streams.cache-max-bytes-buffering=42",
"spring.kafka.streams.client-id=override",
"spring.kafka.streams.properties.fiz.buz=fix.fox",
"spring.kafka.streams.replication-factor=2",
"spring.kafka.streams.state-dir=/tmp/state",
"spring.kafka.streams.ssl.key-password=p7",
"spring.kafka.streams.ssl.key-store-location=classpath:ksLocP",
"spring.kafka.streams.ssl.key-store-password=p8",
"spring.kafka.streams.ssl.key-store-type=PKCS12",
"spring.kafka.streams.ssl.trust-store-location=classpath:tsLocP",
"spring.kafka.streams.ssl.trust-store-password=p9",
"spring.kafka.streams.ssl.trust-store-type=PKCS12",
"spring.kafka.streams.ssl.protocol=TLSv1.2").run((context) -> {
this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class)
.withPropertyValues("spring.kafka.client-id=cid",
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093",
"spring.application.name=appName",
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
"spring.kafka.streams.auto-startup=false",
"spring.kafka.streams.cache-max-bytes-buffering=42",
"spring.kafka.streams.client-id=override",
"spring.kafka.streams.properties.fiz.buz=fix.fox",
"spring.kafka.streams.replication-factor=2",
"spring.kafka.streams.state-dir=/tmp/state",
"spring.kafka.streams.ssl.key-password=p7",
"spring.kafka.streams.ssl.key-store-location=classpath:ksLocP",
"spring.kafka.streams.ssl.key-store-password=p8",
"spring.kafka.streams.ssl.key-store-type=PKCS12",
"spring.kafka.streams.ssl.trust-store-location=classpath:tsLocP",
"spring.kafka.streams.ssl.trust-store-password=p9",
"spring.kafka.streams.ssl.trust-store-type=PKCS12",
"spring.kafka.streams.ssl.protocol=TLSv1.2")
.run((context) -> {
Properties configs = context.getBean(
KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME,
KafkaStreamsConfiguration.class).asProperties();
@ -339,6 +345,63 @@ public class KafkaAutoConfigurationTests {
});
}
@Test
public void streamsApplicationIdUsesMainApplicationNameByDefault() {
this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class)
.withPropertyValues("spring.application.name=my-test-app",
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093",
"spring.kafka.streams.auto-startup=false")
.run((context) -> {
Properties configs = context.getBean(
KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME,
KafkaStreamsConfiguration.class).asProperties();
assertThat(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))
.isEqualTo("localhost:9092, localhost:9093");
assertThat(configs.get(StreamsConfig.APPLICATION_ID_CONFIG))
.isEqualTo("my-test-app");
});
}
@Test
public void streamsWithCustomKafkaConfiguration() {
this.contextRunner
.withUserConfiguration(EnableKafkaStreamsConfiguration.class,
TestKafkaStreamsConfiguration.class)
.withPropertyValues("spring.application.name=my-test-app",
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093",
"spring.kafka.streams.auto-startup=false")
.run((context) -> {
Properties configs = context.getBean(
KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME,
KafkaStreamsConfiguration.class).asProperties();
assertThat(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))
.isEqualTo("localhost:9094, localhost:9095");
assertThat(configs.get(StreamsConfig.APPLICATION_ID_CONFIG))
.isEqualTo("test-id");
});
}
@Test
public void streamsApplicationIdIsMandatory() {
this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class)
.run((context) -> {
assertThat(context).hasFailed();
assertThat(context).getFailure()
.hasMessageContaining("spring.kafka.streams.application-id")
.hasMessageContaining(
"This property is mandatory and fallback 'spring.application.name' is not set either.");
});
}
@Test
public void streamsApplicationIdIsNotMandatoryIfEnableKafkaStreamsIsNotSet() {
this.contextRunner.run((context) -> {
assertThat(context).hasNotFailed();
assertThat(context).doesNotHaveBean(StreamsBuilder.class);
});
}
@SuppressWarnings("unchecked")
@Test
public void listenerProperties() {
@ -470,4 +533,25 @@ public class KafkaAutoConfigurationTests {
}
@Configuration
@EnableKafkaStreams
protected static class EnableKafkaStreamsConfiguration {
}
@Configuration
protected static class TestKafkaStreamsConfiguration {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
Map<String, Object> streamsProperties = new HashMap<>();
streamsProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9094, localhost:9095");
streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-id");
return new KafkaStreamsConfiguration(streamsProperties);
}
}
}

@ -397,6 +397,11 @@
<artifactId>commons-dbcp2</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>

@ -1105,7 +1105,8 @@ content into your application. Rather, pick only the properties that you need.
spring.kafka.ssl.trust-store-location= # Location of the trust store file.
spring.kafka.ssl.trust-store-password= # Store password for the trust store file.
spring.kafka.ssl.trust-store-type= # Type of the trust store.
spring.kafka.streams.auto-startup= # Whether or not to auto-start the streams factory bean.
spring.kafka.streams.application-id = # Kafka streams application.id property; default spring.application.name.
spring.kafka.streams.auto-startup=true # Whether or not to auto-start the streams factory bean.
spring.kafka.streams.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster. Overrides the global property, for streams.
spring.kafka.streams.cache-max-bytes-buffering= # Maximum number of memory bytes to be used for buffering across all threads.
spring.kafka.streams.client-id= # ID to pass to the server when making requests. Used for server-side logging.

@ -5634,44 +5634,34 @@ The following component creates a listener endpoint on the `someTopic` topic:
}
----
[[boot-deatures-kafka-streams]]
[[boot-features-kafka-streams]]
==== Kafka Streams
Spring for Apache Kafka provides a factory bean to create a `StreamsBuilder` object and
manage the lifecycle of its streams; the factory bean is created when
`@EnableKafkaStreams` is present on a `@Configuration` class.
The factory bean requires a `KafkaStreamsConfiguration` object for streams configuration.
manage the lifecycle of its streams. Spring Boot auto-configures the required
`KafkaStreamsConfiguration` bean as long as `kafka-streams` in on the classpath and kafka
streams is enabled via the @EnableKafkaStreams` annotation.
If Spring Boot detects the `kafka-streams` jar on the classpath, it will auto-configure
the `KafkaStreamsConfiguration` bean from the `KafkaProperties` object as well as enabling
the creation of the factory bean by spring-kafka.
Enabling Kafka Streams means that the application id and bootstrap servers must be set.
The former can be configured using `spring.kafka.streams.application-id`, defaulting to
`spring.application.name` if not set. The later can be set globally or
specifically overridden just for streams.
There are two required Kafka properties for streams (`bootstrap.servers` and
`application.id`); by default, the `application.id` is set to the `spring.application.name`
property, if present.
The `bootstrap.servers` can be set globally or specifically overridden just for streams.
Several other properties are specifically available as boot properties; other arbitrary
Kafka properties can be set using the `spring.kafka.streams.properties` property.
See <<boot-features-kafka-extra-props>> for more information.
Several additional properties are available using dedicated properties; other arbitrary
Kafka properties can be set using the `spring.kafka.streams.properties` namespace. See
also <<boot-features-kafka-extra-props>> for more information.
To use the factory bean, simply wire its `StreamsBuilder` into your `@Bean` s.
To use the factory bean, simply wire `StreamsBuilder` into your `@Bean` as shown in the
following example:
====
[source, java]
----
@Bean
public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
stream.map((k, v) -> new KeyValue(k, v.toUpperCase()))
.to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
return stream;
}
[source,java,indent=0]
----
====
include::{code-examples}/kafka/KafkaStreamsBeanExample.java[tag=configuration]
----
By default, the streams managed by the `StreamBuilder` object it creates are started
automatically. You can customize this behaviour using the
`spring.kafka.streams.auto-startup` property.
By default, the factory bean `autoStartup` property is false; to automatically start the
streams managed by the `StreamsBuilder` object it creates, set property
`spting.kafka.streams.auto-startup=true`.
[[boot-features-kafka-extra-props]]

@ -0,0 +1,53 @@
/*
* Copyright 2012-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.docs.kafka;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.support.serializer.JsonSerde;
/**
* Example to show usage of {@link StreamsBuilder}.
*
* @author Stephane Nicoll
*/
public class KafkaStreamsBeanExample {
// tag::configuration[]
@Configuration
@EnableKafkaStreams
static class KafkaStreamsExampleConfiguration {
@Bean
public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
stream.map((k, v) -> new KeyValue(k, v.toUpperCase())).to("ks1Out",
Produced.with(Serdes.Integer(), new JsonSerde<>()));
return stream;
}
}
// end::configuration[]
}
Loading…
Cancel
Save