Polish "Apache Kafka support" contribution

Closes gh-6961
pull/7234/merge
Stephane Nicoll 8 years ago
parent c4188c8e4a
commit 642af52d01

@ -22,11 +22,10 @@ import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.config.ContainerProperties;
/**
* Default configurer for Kafka listener container factories.
* Configure {@link ConcurrentKafkaListenerContainerFactory} with sensible defaults.
*
* @author Gary Russell
* @since 1.5
*
* @since 1.5.0
*/
public class ConcurrentKafkaListenerContainerFactoryConfigurer {
@ -43,8 +42,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
/**
* Configure the specified Kafka listener container factory. The factory can be
* further tuned and default settings can be overridden.
* @param listenerContainerFactory the {@link SimpleKafkaListenerContainerFactory} instance to
* configure
* @param listenerContainerFactory the {@link ConcurrentKafkaListenerContainerFactory}
* instance to configure
* @param consumerFactory the {@link ConsumerFactory} to use
*/
public void configure(ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory,

@ -26,11 +26,10 @@ import org.springframework.kafka.config.KafkaListenerConfigUtils;
import org.springframework.kafka.core.ConsumerFactory;
/**
* Adds {@link EnableKafka} if present on the classpath.
* Configuration for Kafka annotation-driven support.
*
* @author Gary Russell
* @since 1.5
*
* @since 1.5.0
*/
@Configuration
@ConditionalOnClass(EnableKafka.class)

@ -16,6 +16,7 @@
package org.springframework.boot.autoconfigure.kafka;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@ -31,11 +32,10 @@ import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
/**
* Auto-configuration for Spring for Apache Kafka.
* {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka.
*
* @author Gary Russell
* @since 1.5
*
* @since 1.5.0
*/
@Configuration
@ConditionalOnClass(KafkaTemplate.class)
@ -51,9 +51,11 @@ public class KafkaAutoConfiguration {
@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
public KafkaTemplate<?, ?> kafkaTemplate(
ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<Object, Object>(kafkaProducerFactory);
KafkaTemplate<Object, Object> kafkaTemplate =
new KafkaTemplate<Object, Object>(kafkaProducerFactory);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
@ -65,21 +67,18 @@ public class KafkaAutoConfiguration {
return new LoggingProducerListener<Object, Object>();
}
@Configuration
protected static class ConnectionConfig {
@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
return new DefaultKafkaConsumerFactory<Object, Object>(properties.buildConsumerProperties());
}
@Bean
@ConditionalOnMissingBean(ProducerFactory.class)
public ProducerFactory<?, ?> kafkaProducerFactory(KafkaProperties properties) {
return new DefaultKafkaProducerFactory<Object, Object>(properties.buildProducerProperties());
}
@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
return new DefaultKafkaConsumerFactory<Object, Object>(
this.properties.buildConsumerProperties());
}
@Bean
@ConditionalOnMissingBean(ProducerFactory.class)
public ProducerFactory<?, ?> kafkaProducerFactory() {
return new DefaultKafkaProducerFactory<Object, Object>(
this.properties.buildProducerProperties());
}
}

@ -35,13 +35,14 @@ import org.springframework.core.io.Resource;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
/**
* Spring for Apache Kafka Auto-configuration properties.
*
* Configuration properties for Spring for Apache Kafka.
* <p/>
* Users should refer to kafka documentation for complete descriptions of these
* properties.
*
* @author Gary Russell
* @since 1.5
* @author Stephane Nicoll
* @since 1.5.0
*/
@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaProperties {
@ -62,7 +63,8 @@ public class KafkaProperties {
* Comma-delimited list of host:port pairs to use for establishing the initial
* connection to the Kafka cluster.
*/
private List<String> bootstrapServers = new ArrayList<String>(Collections.singletonList("localhost:9092"));
private List<String> bootstrapServers = new ArrayList<String>(Collections.singletonList(
"localhost:9092"));
/**
* Id to pass to the server when making requests; used for server-side logging.
@ -117,25 +119,30 @@ public class KafkaProperties {
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.ssl.getKeyPassword());
}
if (this.ssl.getKeystoreLocation() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, resourceToPath(this.ssl.getKeystoreLocation()));
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getKeystoreLocation()));
}
if (this.ssl.getKeystorePassword() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, this.ssl.getKeystorePassword());
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
this.ssl.getKeystorePassword());
}
if (this.ssl.getTruststoreLocation() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, resourceToPath(this.ssl.getTruststoreLocation()));
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getTruststoreLocation()));
}
if (this.ssl.getTruststorePassword() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.ssl.getTruststorePassword());
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
this.ssl.getTruststorePassword());
}
return properties;
}
/**
* Use this method to create an initial map of consumer properties from the
* boot properties. This allows you to add additional properties, if necessary,
* and override the default kafkaConsumerFactory bean.
* @return the properties.
* Create an initial map of consumer properties from the state of this instance.
* <p>This allows you to add additional properties, if necessary, and override the
* default kafkaConsumerFactory bean.
* @return the consumer properties initialized with the customizations defined on
* this instance
*/
public Map<String, Object> buildConsumerProperties() {
Map<String, Object> props = buildCommonProperties();
@ -144,10 +151,11 @@ public class KafkaProperties {
}
/**
* Use this method to create an initial map of producer properties from the
* boot properties. This allows you to add additional properties, if necessary,
* and override the default kafkaProducerFactory bean.
* @return the properties.
* Create an initial map of producer properties from the state of this instance.
* <p>This allows you to add additional properties, if necessary, and override the
* default kafkaProducerFactory bean.
* @return the producer properties initialized with the customizations defined on
* this instance
*/
public Map<String, Object> buildProducerProperties() {
Map<String, Object> props = buildCommonProperties();
@ -155,12 +163,13 @@ public class KafkaProperties {
return props;
}
public static String resourceToPath(Resource resource) {
private static String resourceToPath(Resource resource) {
try {
return resource.getFile().getAbsolutePath();
}
catch (IOException e) {
throw new IllegalStateException("Resource must be on a file system", e);
catch (IOException ex) {
throw new IllegalStateException(String.format(
"Resource '%s' must be on a file system", resource), ex);
}
}
@ -172,7 +181,7 @@ public class KafkaProperties {
* Frequency in milliseconds that the consumer offsets are auto-committed to
* Kafka if 'enable.auto.commit' true.
*/
private Long autoCommitIntervalMs;
private Long autoCommitInterval;
/**
* What to do when there is no initial offset in Kafka or if the current offset
@ -197,16 +206,16 @@ public class KafkaProperties {
private Boolean enableAutoCommit;
/**
* Maximum amount of time the server will block before answering the fetch
* request if there isn't sufficient data to immediately satisfy the requirement
* given by fetch.min.bytes.
* Maximum amount of time in milliseconds the server will block before answering
* the fetch request if there isn't sufficient data to immediately satisfy the
* requirement given by "fetch.min.bytes".
*/
private Integer fetchMaxWaitMs;
private Integer fetchMaxWait;
/**
* Minimum amount of data the server should return for a fetch request.
* Minimum amount of data the server should return for a fetch request in bytes.
*/
private Integer fetchMinBytes;
private Integer fetchMinSize;
/**
* Unique string that identifies the consumer group this consumer belongs to.
@ -214,19 +223,17 @@ public class KafkaProperties {
private String groupId;
/**
* Expected time between heartbeats to the consumer coordinator.
* Expected time in milliseconds between heartbeats to the consumer coordinator.
*/
private Integer heartbeatIntervalMs;
private Integer heartbeatInterval;
/**
* Deserializer class for key that implements the
* org.apache.kafka.common.serialization.Deserializer interface.
* Deserializer class for keys.
*/
private Class<?> keyDeserializer = StringDeserializer.class;
/**
* Deserializer class for value that implements the
* org.apache.kafka.common.serialization.Deserializer interface.
* Deserializer class for values.
*/
private Class<?> valueDeserializer = StringDeserializer.class;
@ -234,12 +241,12 @@ public class KafkaProperties {
return this.ssl;
}
public Long getAutoCommitIntervalMs() {
return this.autoCommitIntervalMs;
public Long getAutoCommitInterval() {
return this.autoCommitInterval;
}
public void setAutoCommitIntervalMs(Long autoCommitIntervalMs) {
this.autoCommitIntervalMs = autoCommitIntervalMs;
public void setAutoCommitInterval(Long autoCommitInterval) {
this.autoCommitInterval = autoCommitInterval;
}
public String getAutoOffsetReset() {
@ -274,20 +281,20 @@ public class KafkaProperties {
this.enableAutoCommit = enableAutoCommit;
}
public Integer getFetchMaxWaitMs() {
return this.fetchMaxWaitMs;
public Integer getFetchMaxWait() {
return this.fetchMaxWait;
}
public void setFetchMaxWaitMs(Integer fetchMaxWaitMs) {
this.fetchMaxWaitMs = fetchMaxWaitMs;
public void setFetchMaxWait(Integer fetchMaxWait) {
this.fetchMaxWait = fetchMaxWait;
}
public Integer getFetchMinBytes() {
return this.fetchMinBytes;
public Integer getFetchMinSize() {
return this.fetchMinSize;
}
public void setFetchMinBytes(Integer fetchMinBytes) {
this.fetchMinBytes = fetchMinBytes;
public void setFetchMinSize(Integer fetchMinSize) {
this.fetchMinSize = fetchMinSize;
}
public String getGroupId() {
@ -298,12 +305,12 @@ public class KafkaProperties {
this.groupId = groupId;
}
public Integer getHeartbeatIntervalMs() {
return this.heartbeatIntervalMs;
public Integer getHeartbeatInterval() {
return this.heartbeatInterval;
}
public void setHeartbeatIntervalMs(Integer heartbeatIntervalMs) {
this.heartbeatIntervalMs = heartbeatIntervalMs;
public void setHeartbeatInterval(Integer heartbeatInterval) {
this.heartbeatInterval = heartbeatInterval;
}
public Class<?> getKeyDeserializer() {
@ -324,8 +331,8 @@ public class KafkaProperties {
public Map<String, Object> buildProperties() {
Map<String, Object> properties = new HashMap<String, Object>();
if (this.autoCommitIntervalMs != null) {
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, this.autoCommitIntervalMs);
if (this.autoCommitInterval != null) {
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, this.autoCommitInterval);
}
if (this.autoOffsetReset != null) {
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.autoOffsetReset);
@ -339,17 +346,17 @@ public class KafkaProperties {
if (this.enableAutoCommit != null) {
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, this.enableAutoCommit);
}
if (this.fetchMaxWaitMs != null) {
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, this.fetchMaxWaitMs);
if (this.fetchMaxWait != null) {
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, this.fetchMaxWait);
}
if (this.fetchMinBytes != null) {
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, this.fetchMinBytes);
if (this.fetchMinSize != null) {
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, this.fetchMinSize);
}
if (this.groupId != null) {
properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
}
if (this.heartbeatIntervalMs != null) {
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, this.heartbeatIntervalMs);
if (this.heartbeatInterval != null) {
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, this.heartbeatInterval);
}
if (this.keyDeserializer != null) {
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, this.keyDeserializer);
@ -383,8 +390,8 @@ public class KafkaProperties {
private final Ssl ssl = new Ssl();
/**
* Number of acknowledgments the producer requires the leader to have
* received before considering a request complete.
* Number of acknowledgments the producer requires the leader to have received
* before considering a request complete.
*/
private String acks;
@ -416,21 +423,19 @@ public class KafkaProperties {
private String compressionType;
/**
* Serializer class for key that implements the
* org.apache.kafka.common.serialization.Serializer interface.
* Serializer class for keys.
*/
private Class<?> keySerializer = StringSerializer.class;
/**
* When greater than zero, enables retrying of failed sends.
* Serializer class for values.
*/
private Integer retries;
private Class<?> valueSerializer = StringSerializer.class;
/**
* Serializer class for value that implements the
* org.apache.kafka.common.serialization.Serializer interface.
* When greater than zero, enables retrying of failed sends.
*/
private Class<?> valueSerializer = StringSerializer.class;
private Integer retries;
public Ssl getSsl() {
return this.ssl;
@ -492,14 +497,6 @@ public class KafkaProperties {
this.keySerializer = keySerializer;
}
public Integer getRetries() {
return this.retries;
}
public void setRetries(Integer retries) {
this.retries = retries;
}
public Class<?> getValueSerializer() {
return this.valueSerializer;
}
@ -508,6 +505,14 @@ public class KafkaProperties {
this.valueSerializer = valueSerializer;
}
public Integer getRetries() {
return this.retries;
}
public void setRetries(Integer retries) {
this.retries = retries;
}
public Map<String, Object> buildProperties() {
Map<String, Object> properties = new HashMap<String, Object>();
if (this.acks != null) {
@ -593,12 +598,14 @@ public class KafkaProperties {
private Long pollTimeout;
/**
* Number of records between offset commits when ackMode is COUNT or COUNT_TIME.
* Number of records between offset commits when ackMode is "COUNT" or
* "COUNT_TIME".
*/
private Integer ackCount;
/**
* Time in milliseconds between offset commits when ackMode is TIME or COUNT_TIME.
* Time in milliseconds between offset commits when ackMode is "TIME" or
* "COUNT_TIME".
*/
private Long ackTime;

@ -16,6 +16,5 @@
/**
* Auto-configuration for Apache Kafka.
*
*/
package org.springframework.boot.autoconfigure.kafka;

@ -409,6 +409,76 @@
}
]
},
{
"name": "spring.kafka.consumer.auto-offset-reset",
"values": [
{
"value": "earliest",
"description": "Automatically reset the offset to the earliest offset."
},
{
"value": "latest",
"description": "Automatically reset the offset to the latest offset."
},
{
"value": "none",
"description": "Throw exception to the consumer if no previous offset is found for the consumer's group."
},
{
"value": "exception",
"description": "Throw exception to the consumer."
}
],
"providers": [
{
"name": "any"
}
]
},
{
"name": "spring.kafka.consumer.key-deserializer",
"providers": [
{
"name": "handle-as",
"parameters": {
"target": "org.apache.kafka.common.serialization.Deserializer"
}
}
]
},
{
"name": "spring.kafka.consumer.value-deserializer",
"providers": [
{
"name": "handle-as",
"parameters": {
"target": "org.apache.kafka.common.serialization.Deserializer"
}
}
]
},
{
"name": "spring.kafka.producer.key-serializer",
"providers": [
{
"name": "handle-as",
"parameters": {
"target": "org.apache.kafka.common.serialization.Serializer"
}
}
]
},
{
"name": "spring.kafka.producer.value-serializer",
"providers": [
{
"name": "handle-as",
"parameters": {
"target": "org.apache.kafka.common.serialization.Serializer"
}
}
]
},
{
"name": "spring.http.converters.preferred-json-mapper",
"values": [

@ -35,18 +35,17 @@ import org.springframework.messaging.handler.annotation.Header;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests for Kafka Auto-configuration.
* Integration tests for {@link KafkaAutoConfiguration}.
*
* @author Gary Russell
* @since 1.5
*
*/
public class KafkaAutoConfigurationIntegrationTests {
private static final String TEST_TOPIC = "testTopic";
@ClassRule
public static final KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true, TEST_TOPIC);
public static final KafkaEmbedded kafkaEmbedded =
new KafkaEmbedded(1, true, TEST_TOPIC);
private AnnotationConfigApplicationContext context;
@ -59,7 +58,8 @@ public class KafkaAutoConfigurationIntegrationTests {
@Test
public void testEndToEnd() throws Exception {
load(KafkaConfig.class, "spring.kafka.bootstrap-servers:" + kafkaEmbedded.getBrokersAsString(),
load(KafkaConfig.class,
"spring.kafka.bootstrap-servers:" + kafkaEmbedded.getBrokersAsString(),
"spring.kafka.consumer.group-id=testGroup",
"spring.kafka.consumer.auto-offset-reset=earliest");
@SuppressWarnings("unchecked")
@ -103,7 +103,8 @@ public class KafkaAutoConfigurationIntegrationTests {
private volatile String key;
@KafkaListener(topics = TEST_TOPIC)
public void listen(String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
public void listen(String foo,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
this.received = foo;
this.key = key;
this.latch.countDown();

@ -17,7 +17,7 @@
package org.springframework.boot.autoconfigure.kafka;
import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@ -27,7 +27,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.junit.Before;
import org.junit.After;
import org.junit.Test;
import org.springframework.beans.DirectFieldAccessor;
@ -43,77 +43,54 @@ import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests for {@link KafkaAutoConfiguration}.
*
* @author Gary Russell
* @author Stephane Nicoll
* @since 1.5
*/
public class KafkaPropertiesTests {
public class KafkaAutoConfigurationTests {
private AnnotationConfigApplicationContext context;
@Before
public void load() {
this.context = new AnnotationConfigApplicationContext();
this.context.register(KafkaAutoConfiguration.class);
EnvironmentTestUtils.addEnvironment(context,
"spring.kafka.bootstrap-servers=foo:1234",
"spring.kafka.clientId=cid",
@After
public void closeContext() {
if (this.context != null) {
this.context.close();
}
}
@Test
public void consumerProperties() {
load("spring.kafka.bootstrap-servers=foo:1234",
"spring.kafka.ssl.key-password=p1",
"spring.kafka.ssl.keystore-location=classpath:ksLoc",
"spring.kafka.ssl.keystore-password=p2",
"spring.kafka.ssl.truststore-location=classpath:tsLoc",
"spring.kafka.ssl.truststore-password=p3",
"spring.kafka.consumer.auto-commit-interval-ms=123",
"spring.kafka.consumer.auto-commit-interval=123",
"spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.consumer.client-id=ccid", // test override common
"spring.kafka.consumer.enable-auto-commit=false",
"spring.kafka.consumer.fetch-max-wait-ms=456",
"spring.kafka.consumer.fetch-min-bytes=789",
"spring.kafka.consumer.fetch-max-wait=456",
"spring.kafka.consumer.fetch-min-size=789",
"spring.kafka.consumer.group-id=bar",
"spring.kafka.consumer.heartbeat-interval-ms=234",
"spring.kafka.consumer.heartbeat-interval=234",
"spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer",
"spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer",
"spring.kafka.producer.acks=all",
"spring.kafka.producer.batch-size=20",
"spring.kafka.producer.bootstrap-servers=bar:1234", // test override common
"spring.kafka.producer.buffer-memory=12345",
"spring.kafka.producer.compression-type=gzip",
"spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer",
"spring.kafka.producer.retries=2",
"spring.kafka.producer.ssl.key-password=p4",
"spring.kafka.producer.ssl.keystore-location=classpath:ksLocP",
"spring.kafka.producer.ssl.keystore-password=p5",
"spring.kafka.producer.ssl.truststore-location=classpath:tsLocP",
"spring.kafka.producer.ssl.truststore-password=p6",
"spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.IntegerSerializer",
"spring.kafka.template.default-topic=testTopic",
"spring.kafka.listener.ack-mode=MANUAL",
"spring.kafka.listener.ack-count=123",
"spring.kafka.listener.ack-time=456",
"spring.kafka.listener.concurrency=3",
"spring.kafka.listener.poll-timeout=2000"
);
this.context.refresh();
}
@Test
public void testConsumerProps() {
"spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer");
DefaultKafkaConsumerFactory<?, ?> consumerFactory = this.context.getBean(DefaultKafkaConsumerFactory.class);
@SuppressWarnings("unchecked")
Map<String, Object> consumerProps = (Map<String, Object>) new DirectFieldAccessor(consumerFactory)
.getPropertyValue("configs");
// common
assertThat(consumerProps.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))
.isEqualTo(Arrays.asList(new String[] { "foo:1234" }));
.isEqualTo(Collections.singletonList("foo:1234"));
assertThat(consumerProps.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p1");
assertThat((String) consumerProps.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "ksLoc");
.endsWith(File.separator + "ksLoc");
assertThat(consumerProps.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p2");
assertThat((String) consumerProps.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "tsLoc");
.endsWith(File.separator + "tsLoc");
assertThat(consumerProps.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).isEqualTo("p3");
// consumer
assertThat(consumerProps.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("ccid"); // override
@ -126,11 +103,25 @@ public class KafkaPropertiesTests {
assertThat(consumerProps.get(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)).isEqualTo(234);
assertThat(consumerProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)).isEqualTo(LongDeserializer.class);
assertThat(consumerProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
.isEqualTo(IntegerDeserializer.class);
.isEqualTo(IntegerDeserializer.class);
}
@Test
public void testProducerProps() {
public void producerProperties() {
load("spring.kafka.clientId=cid",
"spring.kafka.producer.acks=all",
"spring.kafka.producer.batch-size=20",
"spring.kafka.producer.bootstrap-servers=bar:1234", // test override common
"spring.kafka.producer.buffer-memory=12345",
"spring.kafka.producer.compression-type=gzip",
"spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer",
"spring.kafka.producer.retries=2",
"spring.kafka.producer.ssl.key-password=p4",
"spring.kafka.producer.ssl.keystore-location=classpath:ksLocP",
"spring.kafka.producer.ssl.keystore-password=p5",
"spring.kafka.producer.ssl.truststore-location=classpath:tsLocP",
"spring.kafka.producer.ssl.truststore-password=p6",
"spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.IntegerSerializer");
DefaultKafkaProducerFactory<?, ?> producerFactory = this.context.getBean(DefaultKafkaProducerFactory.class);
@SuppressWarnings("unchecked")
Map<String, Object> producerProps = (Map<String, Object>) new DirectFieldAccessor(producerFactory)
@ -141,38 +132,53 @@ public class KafkaPropertiesTests {
assertThat(producerProps.get(ProducerConfig.ACKS_CONFIG)).isEqualTo("all");
assertThat(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG)).isEqualTo(20);
assertThat(producerProps.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
.isEqualTo(Arrays.asList(new String[] { "bar:1234" })); // override
.isEqualTo(Collections.singletonList("bar:1234")); // override
assertThat(producerProps.get(ProducerConfig.BUFFER_MEMORY_CONFIG)).isEqualTo(12345L);
assertThat(producerProps.get(ProducerConfig.COMPRESSION_TYPE_CONFIG)).isEqualTo("gzip");
assertThat(producerProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)).isEqualTo(LongSerializer.class);
assertThat(producerProps.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4");
assertThat((String) producerProps.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "ksLocP");
.endsWith(File.separator + "ksLocP");
assertThat(producerProps.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p5");
assertThat((String) producerProps.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "tsLocP");
.endsWith(File.separator + "tsLocP");
assertThat(producerProps.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).isEqualTo("p6");
assertThat(producerProps.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2);
assertThat(producerProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)).isEqualTo(IntegerSerializer.class);
}
@Test
public void testInjected() {
public void listenerProperties() {
load("spring.kafka.template.default-topic=testTopic",
"spring.kafka.listener.ack-mode=MANUAL",
"spring.kafka.listener.ack-count=123",
"spring.kafka.listener.ack-time=456",
"spring.kafka.listener.concurrency=3",
"spring.kafka.listener.poll-timeout=2000");
DefaultKafkaProducerFactory<?, ?> producerFactory = this.context.getBean(DefaultKafkaProducerFactory.class);
DefaultKafkaConsumerFactory<?, ?> consumerFactory = this.context.getBean(DefaultKafkaConsumerFactory.class);
KafkaTemplate<?, ?> kafkaTemplate = this.context.getBean(KafkaTemplate.class);
KafkaListenerContainerFactory<?> kafkaListenerContainerFactory = this.context
.getBean(KafkaListenerContainerFactory.class);
assertThat(new DirectFieldAccessor(kafkaTemplate).getPropertyValue("producerFactory"))
.isEqualTo(producerFactory);
.isEqualTo(producerFactory);
assertThat(kafkaTemplate.getDefaultTopic()).isEqualTo("testTopic");
DirectFieldAccessor factoryAccessor = new DirectFieldAccessor(kafkaListenerContainerFactory);
assertThat(factoryAccessor.getPropertyValue("consumerFactory")).isEqualTo(consumerFactory);
assertThat(factoryAccessor.getPropertyValue("containerProperties.ackMode")).isEqualTo(AckMode.MANUAL);
assertThat(factoryAccessor.getPropertyValue("containerProperties.ackCount")).isEqualTo(123);
assertThat(factoryAccessor.getPropertyValue("containerProperties.ackTime")).isEqualTo(456L);
assertThat(factoryAccessor.getPropertyValue("concurrency")).isEqualTo(3);
assertThat(factoryAccessor.getPropertyValue("containerProperties.pollTimeout")).isEqualTo(2000L);
DirectFieldAccessor dfa = new DirectFieldAccessor(kafkaListenerContainerFactory);
assertThat(dfa.getPropertyValue("consumerFactory")).isEqualTo(consumerFactory);
assertThat(dfa.getPropertyValue("containerProperties.ackMode")).isEqualTo(AckMode.MANUAL);
assertThat(dfa.getPropertyValue("containerProperties.ackCount")).isEqualTo(123);
assertThat(dfa.getPropertyValue("containerProperties.ackTime")).isEqualTo(456L);
assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3);
assertThat(dfa.getPropertyValue("containerProperties.pollTimeout")).isEqualTo(2000L);
}
private void load(String... environment) {
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
ctx.register(KafkaAutoConfiguration.class);
EnvironmentTestUtils.addEnvironment(ctx, environment);
ctx.refresh();
this.context = ctx;
}
}

@ -857,53 +857,39 @@ content into your application; rather pick only the properties that you need.
spring.jms.template.time-to-live= # Time-to-live of a message when sending in milliseconds. Enable QoS when set.
# APACHE KAFKA ({sc-spring-boot-autoconfigure}/kafka/KafkaProperties.{sc-ext}[KafkaProperties])
spring.kafka.bootstrap-servers=localhost:9092 # Comma-delimited list of host:port pairs.
spring.kafka.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster.
spring.kafka.client-id= # Id to pass to the server when making requests; used for server-side logging.
spring.kafka.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.ssl.keystore-location= # Location (resource) of the key store file (e.g. file:my.ks).
spring.kafka.ssl.keystore-password= # Store password for the key store file.
spring.kafka.ssl.truststore-location= # Location (resource) of the trust store file (e.g. file:my.ts).
spring.kafka.ssl.truststore-password= # Store password for the trust store file.
# Consumer-specific properties:
spring.kafka.consumer.auto-commit-interval-ms= # Frequency in milliseconds that the consumer offsets are auto-committed.
spring.kafka.consumer.auto-commit-interval= # Frequency in milliseconds that the consumer offsets are auto-committed to Kafka if 'enable.auto.commit' true.
spring.kafka.consumer.auto-offset-reset= # What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server.
spring.kafka.consumer.bootstrap-servers= # Comma-delimited list of host:port pairs.
spring.kafka.consumer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster.
spring.kafka.consumer.client-id= # Id to pass to the server when making requests; used for server-side logging.
spring.kafka.consumer.enable-auto-commit= # If true the consumer's offset will be periodically committed in the background.
spring.kafka.consumer.fetch-max-wait-ms= # Maximum amount of time the server will block before answering the fetch request.
spring.kafka.consumer.fetch-min-bytes= # Minimum amount of data the server should return for a fetch request.
spring.kafka.consumer.fetch-max-wait= # Maximum amount of time in milliseconds the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by "fetch.min.bytes".
spring.kafka.consumer.fetch-min-size= # Minimum amount of data the server should return for a fetch request in bytes.
spring.kafka.consumer.group-id= # Unique string that identifies the consumer group this consumer belongs to.
spring.kafka.consumer.heartbeat-interval-ms= # Expected time between heartbeats to the consumer coordinator.
spring.kafka.consumer.key-deserializer=StringDeserializer # Deserializer class for keys.
spring.kafka.consumer.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.consumer.ssl.keystore-location= # Location (resource) of the key store file (e.g. file:my.ks).
spring.kafka.consumer.ssl.keystore-password= # Store password for the key store file.
spring.kafka.consumer.ssl.truststore-location= # Location (resource) of the trust store file (e.g. file:my.ts).
spring.kafka.consumer.ssl.truststore-password= # Store password for the trust store file.
spring.kafka.consumer.value-deserializer=StringDeserializer # Deserializer class for values.
# Listener properties - Refer to the Spring for Apache Kafka documentation
spring.kafka.listener.ack-mode=BATCH # AckMode - see the spring-kafka documentation.
spring.kafka.listener.ack-count= # Number of records between offset commits when ack-mode is COUNT or COUNT_TIME.
spring.kafka.listener.ack-time= # Time in milliseconds between offset commits when ack-mode is TIME or COUNT_TIME.
spring.kafka.listener.concurrency=1 # Number of threads to run in the listener container(s).
spring.kafka.listener.pollTimeout=1000 # Timeout in milliseconds to use when polling the consumer.
# Producer-specific properties:
spring.kafka.producer.acks= # Number of acknowledgments the producer requires the leader to have received.
spring.kafka.consumer.heartbeat-interval= # Expected time in milliseconds between heartbeats to the consumer coordinator.
spring.kafka.consumer.key-deserializer= # Deserializer class for keys.
spring.kafka.consumer.value-deserializer= # Deserializer class for values.
spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME".
spring.kafka.listener.ack-mode= # Listener AckMode; see the spring-kafka documentation.
spring.kafka.listener.ack-time= # Time in milliseconds between offset commits when ackMode is "TIME" or "COUNT_TIME".
spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.
spring.kafka.listener.poll-timeout= # Timeout in milliseconds to use when polling the consumer.
spring.kafka.producer.acks= # Number of acknowledgments the producer requires the leader to have received before considering a request complete.
spring.kafka.producer.batch-size= # Number of records to batch before sending.
spring.kafka.producer.bootstrap-servers= # Comma-delimited list of host:port pairs.
spring.kafka.producer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster.
spring.kafka.producer.buffer-memory= # Total bytes of memory the producer can use to buffer records waiting to be sent to the server.
spring.kafka.producer.client-id= # Id to pass to the server when making requests; used for server-side logging.
spring.kafka.producer.compression-type= # Compression type for all data generated by the producer.
spring.kafka.producer.key-serializer=StringSerializer # Serializer class for keys.
spring.kafka.producer.key-serializer= # Serializer class for keys.
spring.kafka.producer.retries= # When greater than zero, enables retrying of failed sends.
spring.kafka.producer.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.producer.ssl.keystore-location= # Location (resource) of the key store file (e.g. file:my.ks).
spring.kafka.producer.ssl.keystore-password= # Store password for the key store file.
spring.kafka.producer.ssl.truststore-location= # Location (resource) of the trust store file (e.g. file:my.ts).
spring.kafka.producer.ssl.truststore-password= # Store password for the trust store file.
spring.kafka.producer.value-serializer=StringSerializer # Serializer class for values.
# template properties
spring.kafka.template.default-topic= # Default topic to which messages are sent
spring.kafka.producer.value-serializer= # Serializer class for values.
spring.kafka.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.ssl.keystore-location= # Location of the key store file.
spring.kafka.ssl.keystore-password= # Store password for the key store file.
spring.kafka.ssl.truststore-location= # Location of the trust store file.
spring.kafka.ssl.truststore-password= # Store password for the trust store file.
spring.kafka.template.default-topic= # Default topic to which messages will be sent.
# RABBIT ({sc-spring-boot-autoconfigure}/amqp/RabbitProperties.{sc-ext}[RabbitProperties])
spring.rabbitmq.addresses= # Comma-separated list of addresses to which the client should connect.

@ -4452,27 +4452,34 @@ throw an `AmqpRejectAndDontRequeueException` to signal the message should be rej
This is the mechanism used when retries are enabled and the maximum delivery attempts are
reached.
[[boot-features-kafka]]
=== Apache Kafka Support
http://kafka.apache.org/[Apache Kafa] is supported by providing auto-configuration of the `spring-kafka` project.
http://kafka.apache.org/[Apache Kafa] is supported by providing auto-configuration of the
`spring-kafka` project.
Kafka configuration is controlled by external configuration properties in `spring.kafka.*`. For example, you might
declare the following section in `application.properties`:
Kafka configuration is controlled by external configuration properties in
`spring.kafka.*`. For example, you might declare the following section in
`application.properties`:
[source,properties,indent=0]
----
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
----
See {sc-spring-boot-autoconfigure}/kafka/KafkaProperties.{sc-ext}[`KafkaProperties`]
for more of the supported options.
=== Sending a Message
Spring's `KafkaTemplate` is auto-configured and you can autowire them directly in your own beans:
Spring's `KafkaTemplate` is auto-configured and you can autowire them directly in your own
beans:
[source,java,indent=0]
----
@ -4491,8 +4498,18 @@ public class MyBean {
}
----
=== Receiving a Message
When the Apache Kafka infrastructure is present, any bean can be annotated with
`@KafkaListener` to create a listener endpoint. If no `KafkaListenerContainerFactory`
has been defined, a default one is configured automatically with keys defined in
`spring.kafka.listener.*`.
The following component creates a listener endpoint on the `someTopic` topic:
[source,java,indent=0]
----
@Component
@ -4506,31 +4523,39 @@ public class MyBean {
}
----
[[kafka-extra-props]]
=== Additional Kafka Properties
The properties supported by auto configuration are shown in <<common-application-properties>>.
Note that these properties (hyphenated or camelCase) map directly to the Apache Kafka dotted properties, refer
to the Apache Kafka documentation for details.
The first few of these properties apply to both producers and consumers, but can be specified at the producer or
consumer level if you wish to use different values for each.
Apache Kafka designates properties with an importance: HIGH, MEDIUM and LOW.
Spring Boot auto configuration supports all HIGH importance properties, some selected MEDIUM and LOW,
The properties supported by auto configuration are shown in
<<common-application-properties>>. Note that these properties (hyphenated or camelCase)
map directly to the Apache Kafka dotted properties for the most part, refer to the Apache
Kafka documentation for details.
The first few of these properties apply to both producers and consumers, but can be
specified at the producer or consumer level if you wish to use different values for each.
Apache Kafka designates properties with an importance: HIGH, MEDIUM and LOW. Spring Boot
auto configuration supports all HIGH importance properties, some selected MEDIUM and LOW,
and any that do not have a default value.
Only a subset of the properties supported by Kafka are available via the `KafkaProperties` class.
If you wish to configure the producer or consumer with additional properties, you can override the producer factory
and/or consumer factory bean, adding additional properties, for example:
Only a subset of the properties supported by Kafka are available via the `KafkaProperties`
class. If you wish to configure the producer or consumer with additional properties, you
can override the producer factory and/or consumer factory bean, adding additional
properties, for example:
[source,java,indent=0]
----
@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(KafkaProperties properties) {
Map<String, Object> producerProperties = properties.buildProducerProperties();
producerProperties.put("some.property", "some.value");
return new DefaultKafkaProducerFactory<Object, Object>(producerProperties);
}
@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(KafkaProperties properties) {
Map<String, Object> producerProperties = properties.buildProducerProperties();
producerProperties.put("some.property", "some.value");
return new DefaultKafkaProducerFactory<Object, Object>(producerProperties);
}
----
[[boot-features-restclient]]
== Calling REST services
If you need to call remote REST services from your application, you can use Spring

Loading…
Cancel
Save