Auto-configure Kafka MessageConverter

See gh-10380
pull/10494/head
Eddú Meléndez 7 years ago committed by Stephane Nicoll
parent 41424e4529
commit d7bc93f278

@ -20,17 +20,21 @@ import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.config.ContainerProperties; import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.converter.MessageConverter;
/** /**
* Configure {@link ConcurrentKafkaListenerContainerFactory} with sensible defaults. * Configure {@link ConcurrentKafkaListenerContainerFactory} with sensible defaults.
* *
* @author Gary Russell * @author Gary Russell
* @author Eddú Meléndez
* @since 1.5.0 * @since 1.5.0
*/ */
public class ConcurrentKafkaListenerContainerFactoryConfigurer { public class ConcurrentKafkaListenerContainerFactoryConfigurer {
private KafkaProperties properties; private KafkaProperties properties;
private MessageConverter messageConverter;
/** /**
* Set the {@link KafkaProperties} to use. * Set the {@link KafkaProperties} to use.
* @param properties the properties * @param properties the properties
@ -39,6 +43,14 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
this.properties = properties; this.properties = properties;
} }
/**
* Set the {@link MessageConverter} to use.
* @param messageConverter the message converter
*/
public void setMessageConverter(MessageConverter messageConverter) {
this.messageConverter = messageConverter;
}
/** /**
* Configure the specified Kafka listener container factory. The factory can be * Configure the specified Kafka listener container factory. The factory can be
* further tuned and default settings can be overridden. * further tuned and default settings can be overridden.
@ -49,6 +61,9 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
public void configure( public void configure(
ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory, ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory,
ConsumerFactory<Object, Object> consumerFactory) { ConsumerFactory<Object, Object> consumerFactory) {
if (this.messageConverter != null) {
listenerContainerFactory.setMessageConverter(this.messageConverter);
}
listenerContainerFactory.setConsumerFactory(consumerFactory); listenerContainerFactory.setConsumerFactory(consumerFactory);
Listener container = this.properties.getListener(); Listener container = this.properties.getListener();
ContainerProperties containerProperties = listenerContainerFactory ContainerProperties containerProperties = listenerContainerFactory

@ -16,6 +16,7 @@
package org.springframework.boot.autoconfigure.kafka; package org.springframework.boot.autoconfigure.kafka;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
@ -24,11 +25,13 @@ import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerConfigUtils; import org.springframework.kafka.config.KafkaListenerConfigUtils;
import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.support.converter.MessageConverter;
/** /**
* Configuration for Kafka annotation-driven support. * Configuration for Kafka annotation-driven support.
* *
* @author Gary Russell * @author Gary Russell
* @author Eddú Meléndez
* @since 1.5.0 * @since 1.5.0
*/ */
@Configuration @Configuration
@ -37,8 +40,12 @@ class KafkaAnnotationDrivenConfiguration {
private final KafkaProperties properties; private final KafkaProperties properties;
KafkaAnnotationDrivenConfiguration(KafkaProperties properties) { private final MessageConverter messageConverter;
KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
ObjectProvider<MessageConverter> messageConverter) {
this.properties = properties; this.properties = properties;
this.messageConverter = messageConverter.getIfAvailable();
} }
@Bean @Bean
@ -46,6 +53,7 @@ class KafkaAnnotationDrivenConfiguration {
public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() { public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer(); ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
configurer.setKafkaProperties(this.properties); configurer.setKafkaProperties(this.properties);
configurer.setMessageConverter(this.messageConverter);
return configurer; return configurer;
} }

@ -18,6 +18,7 @@ package org.springframework.boot.autoconfigure.kafka;
import java.io.IOException; import java.io.IOException;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
@ -36,12 +37,14 @@ import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener; import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener; import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
/** /**
* {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka. * {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka.
* *
* @author Gary Russell * @author Gary Russell
* @author Stephane Nicoll * @author Stephane Nicoll
* @author Eddú Meléndez
* @since 1.5.0 * @since 1.5.0
*/ */
@Configuration @Configuration
@ -52,8 +55,12 @@ public class KafkaAutoConfiguration {
private final KafkaProperties properties; private final KafkaProperties properties;
public KafkaAutoConfiguration(KafkaProperties properties) { private final RecordMessageConverter messageConverter;
public KafkaAutoConfiguration(KafkaProperties properties,
ObjectProvider<RecordMessageConverter> messageConverter) {
this.properties = properties; this.properties = properties;
this.messageConverter = messageConverter.getIfAvailable();
} }
@Bean @Bean
@ -63,6 +70,9 @@ public class KafkaAutoConfiguration {
ProducerListener<Object, Object> kafkaProducerListener) { ProducerListener<Object, Object> kafkaProducerListener) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>( KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(
kafkaProducerFactory); kafkaProducerFactory);
if (this.messageConverter != null) {
kafkaTemplate.setMessageConverter(this.messageConverter);
}
kafkaTemplate.setProducerListener(kafkaProducerListener); kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic()); kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate; return kafkaTemplate;

@ -30,12 +30,14 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.LongSerializer;
import org.junit.After;
import org.junit.Test; import org.junit.Test;
import org.springframework.beans.DirectFieldAccessor; import org.springframework.beans.DirectFieldAccessor;
import org.springframework.boot.test.util.TestPropertyValues; import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@ -43,139 +45,149 @@ import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.kafka.test.utils.KafkaTestUtils;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry; import static org.assertj.core.api.Assertions.entry;
import static org.mockito.Mockito.mock;
/** /**
* Tests for {@link KafkaAutoConfiguration}. * Tests for {@link KafkaAutoConfiguration}.
* *
* @author Gary Russell * @author Gary Russell
* @author Stephane Nicoll * @author Stephane Nicoll
* @author Eddú Meléndez
*/ */
public class KafkaAutoConfigurationTests { public class KafkaAutoConfigurationTests {
private AnnotationConfigApplicationContext context; private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(KafkaAutoConfiguration.class));
@After
public void closeContext() {
if (this.context != null) {
this.context.close();
}
}
@Test @Test
public void consumerProperties() { public void consumerProperties() {
load("spring.kafka.bootstrap-servers=foo:1234", "spring.kafka.properties.foo=bar", this.contextRunner.withUserConfiguration(TestConfiguration.class)
"spring.kafka.properties.baz=qux", .withPropertyValues(
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz", "spring.kafka.bootstrap-servers=foo:1234",
"spring.kafka.ssl.key-password=p1", "spring.kafka.properties.foo=bar",
"spring.kafka.ssl.keystore-location=classpath:ksLoc", "spring.kafka.properties.baz=qux",
"spring.kafka.ssl.keystore-password=p2", "spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
"spring.kafka.ssl.truststore-location=classpath:tsLoc", "spring.kafka.ssl.key-password=p1",
"spring.kafka.ssl.truststore-password=p3", "spring.kafka.ssl.keystore-location=classpath:ksLoc",
"spring.kafka.consumer.auto-commit-interval=123", "spring.kafka.ssl.keystore-password=p2",
"spring.kafka.consumer.max-poll-records=42", "spring.kafka.ssl.truststore-location=classpath:tsLoc",
"spring.kafka.consumer.auto-offset-reset=earliest", "spring.kafka.ssl.truststore-password=p3",
"spring.kafka.consumer.client-id=ccid", // test override common "spring.kafka.consumer.auto-commit-interval=123",
"spring.kafka.consumer.enable-auto-commit=false", "spring.kafka.consumer.max-poll-records=42",
"spring.kafka.consumer.fetch-max-wait=456", "spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.consumer.properties.fiz.buz=fix.fox", "spring.kafka.consumer.client-id=ccid", // test override common
"spring.kafka.consumer.fetch-min-size=789", "spring.kafka.consumer.enable-auto-commit=false",
"spring.kafka.consumer.group-id=bar", "spring.kafka.consumer.fetch-max-wait=456",
"spring.kafka.consumer.heartbeat-interval=234", "spring.kafka.consumer.properties.fiz.buz=fix.fox",
"spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer", "spring.kafka.consumer.fetch-min-size=789",
"spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer"); "spring.kafka.consumer.group-id=bar",
DefaultKafkaConsumerFactory<?, ?> consumerFactory = this.context "spring.kafka.consumer.heartbeat-interval=234",
.getBean(DefaultKafkaConsumerFactory.class); "spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer",
Map<String, Object> configs = consumerFactory.getConfigurationProperties(); "spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer")
// common .run((context) -> {
assertThat(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) DefaultKafkaConsumerFactory<?, ?> consumerFactory = context
.isEqualTo(Collections.singletonList("foo:1234")); .getBean(DefaultKafkaConsumerFactory.class);
assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p1"); Map<String, Object> configs = consumerFactory.getConfigurationProperties();
assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) // common
.endsWith(File.separator + "ksLoc"); assertThat(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p2"); .isEqualTo(Collections.singletonList("foo:1234"));
assertThat((String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)) assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p1");
.endsWith(File.separator + "tsLoc"); assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)) .endsWith(File.separator + "ksLoc");
.isEqualTo("p3"); assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p2");
// consumer assertThat((String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
assertThat(configs.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("ccid"); // override .endsWith(File.separator + "tsLoc");
assertThat(configs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG))
.isEqualTo(Boolean.FALSE); .isEqualTo("p3");
assertThat(configs.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)) // consumer
.isEqualTo(123); assertThat(configs.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("ccid"); // override
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) assertThat(configs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))
.isEqualTo("earliest"); .isEqualTo(Boolean.FALSE);
assertThat(configs.get(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG)).isEqualTo(456); assertThat(configs.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG))
assertThat(configs.get(ConsumerConfig.FETCH_MIN_BYTES_CONFIG)).isEqualTo(789); .isEqualTo(123);
assertThat(configs.get(ConsumerConfig.GROUP_ID_CONFIG)).isEqualTo("bar"); assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
assertThat(configs.get(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)) .isEqualTo("earliest");
.isEqualTo(234); assertThat(configs.get(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG)).isEqualTo(456);
assertThat(configs.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) assertThat(configs.get(ConsumerConfig.FETCH_MIN_BYTES_CONFIG)).isEqualTo(789);
.isEqualTo(LongDeserializer.class); assertThat(configs.get(ConsumerConfig.GROUP_ID_CONFIG)).isEqualTo("bar");
assertThat(configs.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) assertThat(configs.get(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG))
.isEqualTo(IntegerDeserializer.class); .isEqualTo(234);
assertThat(configs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)).isEqualTo(42); assertThat(configs.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
assertThat(configs.get("foo")).isEqualTo("bar"); .isEqualTo(LongDeserializer.class);
assertThat(configs.get("baz")).isEqualTo("qux"); assertThat(configs.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz"); .isEqualTo(IntegerDeserializer.class);
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox"); assertThat(configs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)).isEqualTo(42);
assertThat(configs.get("foo")).isEqualTo("bar");
assertThat(configs.get("baz")).isEqualTo("qux");
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
});
} }
@Test @Test
public void producerProperties() { public void producerProperties() {
load("spring.kafka.clientId=cid", this.contextRunner.withUserConfiguration(TestConfiguration.class)
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz", .withPropertyValues(
"spring.kafka.producer.acks=all", "spring.kafka.producer.batch-size=20", "spring.kafka.clientId=cid",
"spring.kafka.producer.bootstrap-servers=bar:1234", // test override "spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
"spring.kafka.producer.buffer-memory=12345", "spring.kafka.producer.acks=all",
"spring.kafka.producer.compression-type=gzip", "spring.kafka.producer.batch-size=20",
"spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer", "spring.kafka.producer.bootstrap-servers=bar:1234", // test
"spring.kafka.producer.retries=2", // override
"spring.kafka.producer.properties.fiz.buz=fix.fox", "spring.kafka.producer.buffer-memory=12345",
"spring.kafka.producer.ssl.key-password=p4", "spring.kafka.producer.compression-type=gzip",
"spring.kafka.producer.ssl.keystore-location=classpath:ksLocP", "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer",
"spring.kafka.producer.ssl.keystore-password=p5", "spring.kafka.producer.retries=2",
"spring.kafka.producer.ssl.truststore-location=classpath:tsLocP", "spring.kafka.producer.properties.fiz.buz=fix.fox",
"spring.kafka.producer.ssl.truststore-password=p6", "spring.kafka.producer.ssl.key-password=p4",
"spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.IntegerSerializer"); "spring.kafka.producer.ssl.keystore-location=classpath:ksLocP",
DefaultKafkaProducerFactory<?, ?> producerFactory = this.context "spring.kafka.producer.ssl.keystore-password=p5",
.getBean(DefaultKafkaProducerFactory.class); "spring.kafka.producer.ssl.truststore-location=classpath:tsLocP",
Map<String, Object> configs = producerFactory.getConfigurationProperties(); "spring.kafka.producer.ssl.truststore-password=p6",
// common "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.IntegerSerializer")
assertThat(configs.get(ProducerConfig.CLIENT_ID_CONFIG)).isEqualTo("cid"); .run((context) -> {
// producer DefaultKafkaProducerFactory<?, ?> producerFactory = context
assertThat(configs.get(ProducerConfig.ACKS_CONFIG)).isEqualTo("all"); .getBean(DefaultKafkaProducerFactory.class);
assertThat(configs.get(ProducerConfig.BATCH_SIZE_CONFIG)).isEqualTo(20); Map<String, Object> configs = producerFactory.getConfigurationProperties();
assertThat(configs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) // common
.isEqualTo(Collections.singletonList("bar:1234")); // override assertThat(configs.get(ProducerConfig.CLIENT_ID_CONFIG)).isEqualTo("cid");
assertThat(configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG)).isEqualTo(12345L); // producer
assertThat(configs.get(ProducerConfig.COMPRESSION_TYPE_CONFIG)).isEqualTo("gzip"); assertThat(configs.get(ProducerConfig.ACKS_CONFIG)).isEqualTo("all");
assertThat(configs.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) assertThat(configs.get(ProducerConfig.BATCH_SIZE_CONFIG)).isEqualTo(20);
.isEqualTo(LongSerializer.class); assertThat(configs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4"); .isEqualTo(Collections.singletonList("bar:1234")); // override
assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) assertThat(configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG)).isEqualTo(12345L);
.endsWith(File.separator + "ksLocP"); assertThat(configs.get(ProducerConfig.COMPRESSION_TYPE_CONFIG)).isEqualTo("gzip");
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p5"); assertThat(configs.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
assertThat((String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)) .isEqualTo(LongSerializer.class);
.endsWith(File.separator + "tsLocP"); assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4");
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)) assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
.isEqualTo("p6"); .endsWith(File.separator + "ksLocP");
assertThat(configs.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2); assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p5");
assertThat(configs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) assertThat((String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
.isEqualTo(IntegerSerializer.class); .endsWith(File.separator + "tsLocP");
assertThat(this.context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)) assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG))
.isEmpty(); .isEqualTo("p6");
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz"); assertThat(configs.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2);
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox"); assertThat(configs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
.isEqualTo(IntegerSerializer.class);
assertThat(context.getBeansOfType(KafkaJaasLoginModuleInitializer.class))
.isEmpty();
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
});
} }
@Test @Test
public void adminProperties() { public void adminProperties() {
load("spring.kafka.clientId=cid", this.contextRunner.withPropertyValues("spring.kafka.clientId=cid",
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz", "spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
"spring.kafka.admin.fail-fast=true", "spring.kafka.admin.fail-fast=true",
"spring.kafka.admin.properties.fiz.buz=fix.fox", "spring.kafka.admin.properties.fiz.buz=fix.fox",
@ -183,79 +195,126 @@ public class KafkaAutoConfigurationTests {
"spring.kafka.admin.ssl.keystore-location=classpath:ksLocP", "spring.kafka.admin.ssl.keystore-location=classpath:ksLocP",
"spring.kafka.admin.ssl.keystore-password=p5", "spring.kafka.admin.ssl.keystore-password=p5",
"spring.kafka.admin.ssl.truststore-location=classpath:tsLocP", "spring.kafka.admin.ssl.truststore-location=classpath:tsLocP",
"spring.kafka.admin.ssl.truststore-password=p6"); "spring.kafka.admin.ssl.truststore-password=p6").run((context) -> {
KafkaAdmin admin = this.context.getBean(KafkaAdmin.class); KafkaAdmin admin = context.getBean(KafkaAdmin.class);
Map<String, Object> configs = admin.getConfig(); Map<String, Object> configs = admin.getConfig();
// common // common
assertThat(configs.get(AdminClientConfig.CLIENT_ID_CONFIG)).isEqualTo("cid"); assertThat(configs.get(AdminClientConfig.CLIENT_ID_CONFIG)).isEqualTo("cid");
// admin // admin
assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4"); assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4");
assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "ksLocP"); .endsWith(File.separator + "ksLocP");
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p5"); assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p5");
assertThat((String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)) assertThat((String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "tsLocP"); .endsWith(File.separator + "tsLocP");
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)) assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG))
.isEqualTo("p6"); .isEqualTo("p6");
assertThat(this.context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)) assertThat(context.getBeansOfType(KafkaJaasLoginModuleInitializer.class))
.isEmpty(); .isEmpty();
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz"); assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox"); assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
assertThat(KafkaTestUtils.getPropertyValue(admin, "fatalIfBrokerNotAvailable", assertThat(KafkaTestUtils.getPropertyValue(admin, "fatalIfBrokerNotAvailable",
Boolean.class)).isTrue(); Boolean.class)).isTrue();
});
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Test @Test
public void listenerProperties() { public void listenerProperties() {
load("spring.kafka.template.default-topic=testTopic", this.contextRunner.withUserConfiguration(TestConfiguration.class)
"spring.kafka.listener.ack-mode=MANUAL", .withPropertyValues("spring.kafka.template.default-topic=testTopic",
"spring.kafka.listener.ack-count=123", "spring.kafka.listener.ack-mode=MANUAL",
"spring.kafka.listener.ack-time=456", "spring.kafka.listener.ack-count=123",
"spring.kafka.listener.concurrency=3", "spring.kafka.listener.ack-time=456",
"spring.kafka.listener.poll-timeout=2000", "spring.kafka.listener.concurrency=3",
"spring.kafka.listener.type=batch", "spring.kafka.jaas.enabled=true", "spring.kafka.listener.poll-timeout=2000",
"spring.kafka.jaas.login-module=foo", "spring.kafka.listener.type=batch",
"spring.kafka.jaas.control-flag=REQUISITE", "spring.kafka.jaas.enabled=true",
"spring.kafka.jaas.options.useKeyTab=true"); "spring.kafka.jaas.login-module=foo",
DefaultKafkaProducerFactory<?, ?> producerFactory = this.context "spring.kafka.jaas.control-flag=REQUISITE",
.getBean(DefaultKafkaProducerFactory.class); "spring.kafka.jaas.options.useKeyTab=true")
DefaultKafkaConsumerFactory<?, ?> consumerFactory = this.context .run((context) -> {
.getBean(DefaultKafkaConsumerFactory.class); DefaultKafkaProducerFactory<?, ?> producerFactory = context
KafkaTemplate<?, ?> kafkaTemplate = this.context.getBean(KafkaTemplate.class); .getBean(DefaultKafkaProducerFactory.class);
KafkaListenerContainerFactory<?> kafkaListenerContainerFactory = this.context DefaultKafkaConsumerFactory<?, ?> consumerFactory = context
.getBean(KafkaListenerContainerFactory.class); .getBean(DefaultKafkaConsumerFactory.class);
assertThat(new DirectFieldAccessor(kafkaTemplate) KafkaTemplate<?, ?> kafkaTemplate = context.getBean(KafkaTemplate.class);
.getPropertyValue("producerFactory")).isEqualTo(producerFactory); KafkaListenerContainerFactory<?> kafkaListenerContainerFactory = context
assertThat(kafkaTemplate.getDefaultTopic()).isEqualTo("testTopic"); .getBean(KafkaListenerContainerFactory.class);
DirectFieldAccessor dfa = new DirectFieldAccessor(kafkaListenerContainerFactory); assertThat(kafkaTemplate.getMessageConverter()).isInstanceOf(
assertThat(dfa.getPropertyValue("consumerFactory")).isEqualTo(consumerFactory); MessagingMessageConverter.class);
assertThat(dfa.getPropertyValue("containerProperties.ackMode")) assertThat(new DirectFieldAccessor(kafkaTemplate)
.isEqualTo(AckMode.MANUAL); .getPropertyValue("producerFactory")).isEqualTo(producerFactory);
assertThat(dfa.getPropertyValue("containerProperties.ackCount")).isEqualTo(123); assertThat(kafkaTemplate.getDefaultTopic()).isEqualTo("testTopic");
assertThat(dfa.getPropertyValue("containerProperties.ackTime")).isEqualTo(456L); DirectFieldAccessor dfa = new DirectFieldAccessor(kafkaListenerContainerFactory);
assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3); assertThat(dfa.getPropertyValue("consumerFactory")).isEqualTo(consumerFactory);
assertThat(dfa.getPropertyValue("containerProperties.pollTimeout")) assertThat(dfa.getPropertyValue("containerProperties.ackMode"))
.isEqualTo(2000L); .isEqualTo(AckMode.MANUAL);
assertThat(dfa.getPropertyValue("batchListener")).isEqualTo(true); assertThat(dfa.getPropertyValue("containerProperties.ackCount")).isEqualTo(123);
assertThat(this.context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)) assertThat(dfa.getPropertyValue("containerProperties.ackTime")).isEqualTo(456L);
.hasSize(1); assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3);
KafkaJaasLoginModuleInitializer jaas = this.context assertThat(dfa.getPropertyValue("containerProperties.pollTimeout"))
.getBean(KafkaJaasLoginModuleInitializer.class); .isEqualTo(2000L);
dfa = new DirectFieldAccessor(jaas); assertThat(dfa.getPropertyValue("batchListener")).isEqualTo(true);
assertThat(dfa.getPropertyValue("loginModule")).isEqualTo("foo"); assertThat(context.getBeansOfType(KafkaJaasLoginModuleInitializer.class))
assertThat(dfa.getPropertyValue("controlFlag")) .hasSize(1);
.isEqualTo(AppConfigurationEntry.LoginModuleControlFlag.REQUISITE); KafkaJaasLoginModuleInitializer jaas = context
assertThat(((Map<String, String>) dfa.getPropertyValue("options"))) .getBean(KafkaJaasLoginModuleInitializer.class);
.containsExactly(entry("useKeyTab", "true")); dfa = new DirectFieldAccessor(jaas);
assertThat(dfa.getPropertyValue("loginModule")).isEqualTo("foo");
assertThat(dfa.getPropertyValue("controlFlag"))
.isEqualTo(AppConfigurationEntry.LoginModuleControlFlag.REQUISITE);
assertThat(((Map<String, String>) dfa.getPropertyValue("options")))
.containsExactly(entry("useKeyTab", "true"));
});
}
@Test
public void testKafkaTemplateRecordMessageConverters() {
this.contextRunner.withUserConfiguration(RecordMessageConvertersConfiguration.class)
.run((context) -> {
KafkaTemplate kafkaTemplate = context.getBean(KafkaTemplate.class);
assertThat(kafkaTemplate.getMessageConverter())
.isSameAs(context.getBean("myRecordMessageConverter"));
});
}
@Test
public void testConcurrentKafkaListenerContainerFactoryWithCustomMessageConverters() {
this.contextRunner.withUserConfiguration(MessageConvertersConfiguration.class)
.run((context) -> {
ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
DirectFieldAccessor dfa = new DirectFieldAccessor(
kafkaListenerContainerFactory);
assertThat(dfa.getPropertyValue("messageConverter"))
.isSameAs(context.getBean("myMessageConverter"));
});
} }
private void load(String... environment) { @Configuration
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(); protected static class TestConfiguration {
ctx.register(KafkaAutoConfiguration.class);
TestPropertyValues.of(environment).applyTo(ctx); }
ctx.refresh();
this.context = ctx; @Configuration
protected static class RecordMessageConvertersConfiguration {
@Bean
public RecordMessageConverter myRecordMessageConverter() {
return mock(RecordMessageConverter.class);
}
}
@Configuration
protected static class MessageConvertersConfiguration {
@Bean
public MessageConverter myMessageConverter() {
return mock(MessageConverter.class);
}
} }
} }

Loading…
Cancel
Save