From d7bc93f278a329b7fe50a51af18ecc88fdafeb0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edd=C3=BA=20Mel=C3=A9ndez?= Date: Fri, 22 Sep 2017 01:38:52 -0500 Subject: [PATCH] Auto-configure Kafka MessageConverter See gh-10380 --- ...fkaListenerContainerFactoryConfigurer.java | 15 + .../KafkaAnnotationDrivenConfiguration.java | 10 +- .../kafka/KafkaAutoConfiguration.java | 12 +- .../kafka/KafkaAutoConfigurationTests.java | 413 ++++++++++-------- 4 files changed, 271 insertions(+), 179 deletions(-) diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index 3ac3d9b0bc..625a1190ff 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -20,17 +20,21 @@ import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.config.ContainerProperties; +import org.springframework.kafka.support.converter.MessageConverter; /** * Configure {@link ConcurrentKafkaListenerContainerFactory} with sensible defaults. * * @author Gary Russell + * @author Eddú Meléndez * @since 1.5.0 */ public class ConcurrentKafkaListenerContainerFactoryConfigurer { private KafkaProperties properties; + private MessageConverter messageConverter; + /** * Set the {@link KafkaProperties} to use. * @param properties the properties @@ -39,6 +43,14 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { this.properties = properties; } + /** + * Set the {@link MessageConverter} to use. + * @param messageConverter the message converter + */ + public void setMessageConverter(MessageConverter messageConverter) { + this.messageConverter = messageConverter; + } + /** * Configure the specified Kafka listener container factory. The factory can be * further tuned and default settings can be overridden. @@ -49,6 +61,9 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { public void configure( ConcurrentKafkaListenerContainerFactory listenerContainerFactory, ConsumerFactory consumerFactory) { + if (this.messageConverter != null) { + listenerContainerFactory.setMessageConverter(this.messageConverter); + } listenerContainerFactory.setConsumerFactory(consumerFactory); Listener container = this.properties.getListener(); ContainerProperties containerProperties = listenerContainerFactory diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java index 9abe319be3..8ba10874ca 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java @@ -16,6 +16,7 @@ package org.springframework.boot.autoconfigure.kafka; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; @@ -24,11 +25,13 @@ import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerConfigUtils; import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.support.converter.MessageConverter; /** * Configuration for Kafka annotation-driven support. * * @author Gary Russell + * @author Eddú Meléndez * @since 1.5.0 */ @Configuration @@ -37,8 +40,12 @@ class KafkaAnnotationDrivenConfiguration { private final KafkaProperties properties; - KafkaAnnotationDrivenConfiguration(KafkaProperties properties) { + private final MessageConverter messageConverter; + + KafkaAnnotationDrivenConfiguration(KafkaProperties properties, + ObjectProvider messageConverter) { this.properties = properties; + this.messageConverter = messageConverter.getIfAvailable(); } @Bean @@ -46,6 +53,7 @@ class KafkaAnnotationDrivenConfiguration { public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() { ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer(); configurer.setKafkaProperties(this.properties); + configurer.setMessageConverter(this.messageConverter); return configurer; } diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java index 99c557f4ec..32de2d7a5d 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java @@ -18,6 +18,7 @@ package org.springframework.boot.autoconfigure.kafka; import java.io.IOException; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -36,12 +37,14 @@ import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.support.LoggingProducerListener; import org.springframework.kafka.support.ProducerListener; +import org.springframework.kafka.support.converter.RecordMessageConverter; /** * {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka. * * @author Gary Russell * @author Stephane Nicoll + * @author Eddú Meléndez * @since 1.5.0 */ @Configuration @@ -52,8 +55,12 @@ public class KafkaAutoConfiguration { private final KafkaProperties properties; - public KafkaAutoConfiguration(KafkaProperties properties) { + private final RecordMessageConverter messageConverter; + + public KafkaAutoConfiguration(KafkaProperties properties, + ObjectProvider messageConverter) { this.properties = properties; + this.messageConverter = messageConverter.getIfAvailable(); } @Bean @@ -63,6 +70,9 @@ public class KafkaAutoConfiguration { ProducerListener kafkaProducerListener) { KafkaTemplate kafkaTemplate = new KafkaTemplate<>( kafkaProducerFactory); + if (this.messageConverter != null) { + kafkaTemplate.setMessageConverter(this.messageConverter); + } kafkaTemplate.setProducerListener(kafkaProducerListener); kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic()); return kafkaTemplate; diff --git a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 32e4047974..8300347150 100644 --- a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -30,12 +30,14 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; -import org.junit.After; import org.junit.Test; import org.springframework.beans.DirectFieldAccessor; -import org.springframework.boot.test.util.TestPropertyValues; -import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; @@ -43,139 +45,149 @@ import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; +import org.springframework.kafka.support.converter.MessageConverter; +import org.springframework.kafka.support.converter.MessagingMessageConverter; +import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.kafka.test.utils.KafkaTestUtils; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.entry; +import static org.mockito.Mockito.mock; /** * Tests for {@link KafkaAutoConfiguration}. * * @author Gary Russell * @author Stephane Nicoll + * @author Eddú Meléndez */ public class KafkaAutoConfigurationTests { - private AnnotationConfigApplicationContext context; - - @After - public void closeContext() { - if (this.context != null) { - this.context.close(); - } - } + private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of(KafkaAutoConfiguration.class)); @Test public void consumerProperties() { - load("spring.kafka.bootstrap-servers=foo:1234", "spring.kafka.properties.foo=bar", - "spring.kafka.properties.baz=qux", - "spring.kafka.properties.foo.bar.baz=qux.fiz.buz", - "spring.kafka.ssl.key-password=p1", - "spring.kafka.ssl.keystore-location=classpath:ksLoc", - "spring.kafka.ssl.keystore-password=p2", - "spring.kafka.ssl.truststore-location=classpath:tsLoc", - "spring.kafka.ssl.truststore-password=p3", - "spring.kafka.consumer.auto-commit-interval=123", - "spring.kafka.consumer.max-poll-records=42", - "spring.kafka.consumer.auto-offset-reset=earliest", - "spring.kafka.consumer.client-id=ccid", // test override common - "spring.kafka.consumer.enable-auto-commit=false", - "spring.kafka.consumer.fetch-max-wait=456", - "spring.kafka.consumer.properties.fiz.buz=fix.fox", - "spring.kafka.consumer.fetch-min-size=789", - "spring.kafka.consumer.group-id=bar", - "spring.kafka.consumer.heartbeat-interval=234", - "spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer", - "spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer"); - DefaultKafkaConsumerFactory consumerFactory = this.context - .getBean(DefaultKafkaConsumerFactory.class); - Map configs = consumerFactory.getConfigurationProperties(); - // common - assertThat(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) - .isEqualTo(Collections.singletonList("foo:1234")); - assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p1"); - assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) - .endsWith(File.separator + "ksLoc"); - assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p2"); - assertThat((String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)) - .endsWith(File.separator + "tsLoc"); - assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)) - .isEqualTo("p3"); - // consumer - assertThat(configs.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("ccid"); // override - assertThat(configs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) - .isEqualTo(Boolean.FALSE); - assertThat(configs.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)) - .isEqualTo(123); - assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) - .isEqualTo("earliest"); - assertThat(configs.get(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG)).isEqualTo(456); - assertThat(configs.get(ConsumerConfig.FETCH_MIN_BYTES_CONFIG)).isEqualTo(789); - assertThat(configs.get(ConsumerConfig.GROUP_ID_CONFIG)).isEqualTo("bar"); - assertThat(configs.get(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)) - .isEqualTo(234); - assertThat(configs.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) - .isEqualTo(LongDeserializer.class); - assertThat(configs.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) - .isEqualTo(IntegerDeserializer.class); - assertThat(configs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)).isEqualTo(42); - assertThat(configs.get("foo")).isEqualTo("bar"); - assertThat(configs.get("baz")).isEqualTo("qux"); - assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz"); - assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox"); + this.contextRunner.withUserConfiguration(TestConfiguration.class) + .withPropertyValues( + "spring.kafka.bootstrap-servers=foo:1234", + "spring.kafka.properties.foo=bar", + "spring.kafka.properties.baz=qux", + "spring.kafka.properties.foo.bar.baz=qux.fiz.buz", + "spring.kafka.ssl.key-password=p1", + "spring.kafka.ssl.keystore-location=classpath:ksLoc", + "spring.kafka.ssl.keystore-password=p2", + "spring.kafka.ssl.truststore-location=classpath:tsLoc", + "spring.kafka.ssl.truststore-password=p3", + "spring.kafka.consumer.auto-commit-interval=123", + "spring.kafka.consumer.max-poll-records=42", + "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.consumer.client-id=ccid", // test override common + "spring.kafka.consumer.enable-auto-commit=false", + "spring.kafka.consumer.fetch-max-wait=456", + "spring.kafka.consumer.properties.fiz.buz=fix.fox", + "spring.kafka.consumer.fetch-min-size=789", + "spring.kafka.consumer.group-id=bar", + "spring.kafka.consumer.heartbeat-interval=234", + "spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer", + "spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer") + .run((context) -> { + DefaultKafkaConsumerFactory consumerFactory = context + .getBean(DefaultKafkaConsumerFactory.class); + Map configs = consumerFactory.getConfigurationProperties(); + // common + assertThat(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) + .isEqualTo(Collections.singletonList("foo:1234")); + assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p1"); + assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) + .endsWith(File.separator + "ksLoc"); + assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p2"); + assertThat((String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)) + .endsWith(File.separator + "tsLoc"); + assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)) + .isEqualTo("p3"); + // consumer + assertThat(configs.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("ccid"); // override + assertThat(configs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) + .isEqualTo(Boolean.FALSE); + assertThat(configs.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)) + .isEqualTo(123); + assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) + .isEqualTo("earliest"); + assertThat(configs.get(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG)).isEqualTo(456); + assertThat(configs.get(ConsumerConfig.FETCH_MIN_BYTES_CONFIG)).isEqualTo(789); + assertThat(configs.get(ConsumerConfig.GROUP_ID_CONFIG)).isEqualTo("bar"); + assertThat(configs.get(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)) + .isEqualTo(234); + assertThat(configs.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) + .isEqualTo(LongDeserializer.class); + assertThat(configs.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) + .isEqualTo(IntegerDeserializer.class); + assertThat(configs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)).isEqualTo(42); + assertThat(configs.get("foo")).isEqualTo("bar"); + assertThat(configs.get("baz")).isEqualTo("qux"); + assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz"); + assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox"); + }); } @Test public void producerProperties() { - load("spring.kafka.clientId=cid", - "spring.kafka.properties.foo.bar.baz=qux.fiz.buz", - "spring.kafka.producer.acks=all", "spring.kafka.producer.batch-size=20", - "spring.kafka.producer.bootstrap-servers=bar:1234", // test override - "spring.kafka.producer.buffer-memory=12345", - "spring.kafka.producer.compression-type=gzip", - "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer", - "spring.kafka.producer.retries=2", - "spring.kafka.producer.properties.fiz.buz=fix.fox", - "spring.kafka.producer.ssl.key-password=p4", - "spring.kafka.producer.ssl.keystore-location=classpath:ksLocP", - "spring.kafka.producer.ssl.keystore-password=p5", - "spring.kafka.producer.ssl.truststore-location=classpath:tsLocP", - "spring.kafka.producer.ssl.truststore-password=p6", - "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.IntegerSerializer"); - DefaultKafkaProducerFactory producerFactory = this.context - .getBean(DefaultKafkaProducerFactory.class); - Map configs = producerFactory.getConfigurationProperties(); - // common - assertThat(configs.get(ProducerConfig.CLIENT_ID_CONFIG)).isEqualTo("cid"); - // producer - assertThat(configs.get(ProducerConfig.ACKS_CONFIG)).isEqualTo("all"); - assertThat(configs.get(ProducerConfig.BATCH_SIZE_CONFIG)).isEqualTo(20); - assertThat(configs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) - .isEqualTo(Collections.singletonList("bar:1234")); // override - assertThat(configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG)).isEqualTo(12345L); - assertThat(configs.get(ProducerConfig.COMPRESSION_TYPE_CONFIG)).isEqualTo("gzip"); - assertThat(configs.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) - .isEqualTo(LongSerializer.class); - assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4"); - assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) - .endsWith(File.separator + "ksLocP"); - assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p5"); - assertThat((String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)) - .endsWith(File.separator + "tsLocP"); - assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)) - .isEqualTo("p6"); - assertThat(configs.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2); - assertThat(configs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) - .isEqualTo(IntegerSerializer.class); - assertThat(this.context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)) - .isEmpty(); - assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz"); - assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox"); + this.contextRunner.withUserConfiguration(TestConfiguration.class) + .withPropertyValues( + "spring.kafka.clientId=cid", + "spring.kafka.properties.foo.bar.baz=qux.fiz.buz", + "spring.kafka.producer.acks=all", + "spring.kafka.producer.batch-size=20", + "spring.kafka.producer.bootstrap-servers=bar:1234", // test + // override + "spring.kafka.producer.buffer-memory=12345", + "spring.kafka.producer.compression-type=gzip", + "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer", + "spring.kafka.producer.retries=2", + "spring.kafka.producer.properties.fiz.buz=fix.fox", + "spring.kafka.producer.ssl.key-password=p4", + "spring.kafka.producer.ssl.keystore-location=classpath:ksLocP", + "spring.kafka.producer.ssl.keystore-password=p5", + "spring.kafka.producer.ssl.truststore-location=classpath:tsLocP", + "spring.kafka.producer.ssl.truststore-password=p6", + "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.IntegerSerializer") + .run((context) -> { + DefaultKafkaProducerFactory producerFactory = context + .getBean(DefaultKafkaProducerFactory.class); + Map configs = producerFactory.getConfigurationProperties(); + // common + assertThat(configs.get(ProducerConfig.CLIENT_ID_CONFIG)).isEqualTo("cid"); + // producer + assertThat(configs.get(ProducerConfig.ACKS_CONFIG)).isEqualTo("all"); + assertThat(configs.get(ProducerConfig.BATCH_SIZE_CONFIG)).isEqualTo(20); + assertThat(configs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) + .isEqualTo(Collections.singletonList("bar:1234")); // override + assertThat(configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG)).isEqualTo(12345L); + assertThat(configs.get(ProducerConfig.COMPRESSION_TYPE_CONFIG)).isEqualTo("gzip"); + assertThat(configs.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) + .isEqualTo(LongSerializer.class); + assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4"); + assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) + .endsWith(File.separator + "ksLocP"); + assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p5"); + assertThat((String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)) + .endsWith(File.separator + "tsLocP"); + assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)) + .isEqualTo("p6"); + assertThat(configs.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2); + assertThat(configs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) + .isEqualTo(IntegerSerializer.class); + assertThat(context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)) + .isEmpty(); + assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz"); + assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox"); + }); } @Test public void adminProperties() { - load("spring.kafka.clientId=cid", + this.contextRunner.withPropertyValues("spring.kafka.clientId=cid", "spring.kafka.properties.foo.bar.baz=qux.fiz.buz", "spring.kafka.admin.fail-fast=true", "spring.kafka.admin.properties.fiz.buz=fix.fox", @@ -183,79 +195,126 @@ public class KafkaAutoConfigurationTests { "spring.kafka.admin.ssl.keystore-location=classpath:ksLocP", "spring.kafka.admin.ssl.keystore-password=p5", "spring.kafka.admin.ssl.truststore-location=classpath:tsLocP", - "spring.kafka.admin.ssl.truststore-password=p6"); - KafkaAdmin admin = this.context.getBean(KafkaAdmin.class); - Map configs = admin.getConfig(); - // common - assertThat(configs.get(AdminClientConfig.CLIENT_ID_CONFIG)).isEqualTo("cid"); - // admin - assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4"); - assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) - .endsWith(File.separator + "ksLocP"); - assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p5"); - assertThat((String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)) - .endsWith(File.separator + "tsLocP"); - assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)) - .isEqualTo("p6"); - assertThat(this.context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)) + "spring.kafka.admin.ssl.truststore-password=p6").run((context) -> { + KafkaAdmin admin = context.getBean(KafkaAdmin.class); + Map configs = admin.getConfig(); + // common + assertThat(configs.get(AdminClientConfig.CLIENT_ID_CONFIG)).isEqualTo("cid"); + // admin + assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4"); + assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) + .endsWith(File.separator + "ksLocP"); + assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p5"); + assertThat((String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)) + .endsWith(File.separator + "tsLocP"); + assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)) + .isEqualTo("p6"); + assertThat(context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)) .isEmpty(); - assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz"); - assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox"); - assertThat(KafkaTestUtils.getPropertyValue(admin, "fatalIfBrokerNotAvailable", - Boolean.class)).isTrue(); + assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz"); + assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox"); + assertThat(KafkaTestUtils.getPropertyValue(admin, "fatalIfBrokerNotAvailable", + Boolean.class)).isTrue(); + }); } @SuppressWarnings("unchecked") @Test public void listenerProperties() { - load("spring.kafka.template.default-topic=testTopic", - "spring.kafka.listener.ack-mode=MANUAL", - "spring.kafka.listener.ack-count=123", - "spring.kafka.listener.ack-time=456", - "spring.kafka.listener.concurrency=3", - "spring.kafka.listener.poll-timeout=2000", - "spring.kafka.listener.type=batch", "spring.kafka.jaas.enabled=true", - "spring.kafka.jaas.login-module=foo", - "spring.kafka.jaas.control-flag=REQUISITE", - "spring.kafka.jaas.options.useKeyTab=true"); - DefaultKafkaProducerFactory producerFactory = this.context - .getBean(DefaultKafkaProducerFactory.class); - DefaultKafkaConsumerFactory consumerFactory = this.context - .getBean(DefaultKafkaConsumerFactory.class); - KafkaTemplate kafkaTemplate = this.context.getBean(KafkaTemplate.class); - KafkaListenerContainerFactory kafkaListenerContainerFactory = this.context - .getBean(KafkaListenerContainerFactory.class); - assertThat(new DirectFieldAccessor(kafkaTemplate) - .getPropertyValue("producerFactory")).isEqualTo(producerFactory); - assertThat(kafkaTemplate.getDefaultTopic()).isEqualTo("testTopic"); - DirectFieldAccessor dfa = new DirectFieldAccessor(kafkaListenerContainerFactory); - assertThat(dfa.getPropertyValue("consumerFactory")).isEqualTo(consumerFactory); - assertThat(dfa.getPropertyValue("containerProperties.ackMode")) - .isEqualTo(AckMode.MANUAL); - assertThat(dfa.getPropertyValue("containerProperties.ackCount")).isEqualTo(123); - assertThat(dfa.getPropertyValue("containerProperties.ackTime")).isEqualTo(456L); - assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3); - assertThat(dfa.getPropertyValue("containerProperties.pollTimeout")) - .isEqualTo(2000L); - assertThat(dfa.getPropertyValue("batchListener")).isEqualTo(true); - assertThat(this.context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)) - .hasSize(1); - KafkaJaasLoginModuleInitializer jaas = this.context - .getBean(KafkaJaasLoginModuleInitializer.class); - dfa = new DirectFieldAccessor(jaas); - assertThat(dfa.getPropertyValue("loginModule")).isEqualTo("foo"); - assertThat(dfa.getPropertyValue("controlFlag")) - .isEqualTo(AppConfigurationEntry.LoginModuleControlFlag.REQUISITE); - assertThat(((Map) dfa.getPropertyValue("options"))) - .containsExactly(entry("useKeyTab", "true")); + this.contextRunner.withUserConfiguration(TestConfiguration.class) + .withPropertyValues("spring.kafka.template.default-topic=testTopic", + "spring.kafka.listener.ack-mode=MANUAL", + "spring.kafka.listener.ack-count=123", + "spring.kafka.listener.ack-time=456", + "spring.kafka.listener.concurrency=3", + "spring.kafka.listener.poll-timeout=2000", + "spring.kafka.listener.type=batch", + "spring.kafka.jaas.enabled=true", + "spring.kafka.jaas.login-module=foo", + "spring.kafka.jaas.control-flag=REQUISITE", + "spring.kafka.jaas.options.useKeyTab=true") + .run((context) -> { + DefaultKafkaProducerFactory producerFactory = context + .getBean(DefaultKafkaProducerFactory.class); + DefaultKafkaConsumerFactory consumerFactory = context + .getBean(DefaultKafkaConsumerFactory.class); + KafkaTemplate kafkaTemplate = context.getBean(KafkaTemplate.class); + KafkaListenerContainerFactory kafkaListenerContainerFactory = context + .getBean(KafkaListenerContainerFactory.class); + assertThat(kafkaTemplate.getMessageConverter()).isInstanceOf( + MessagingMessageConverter.class); + assertThat(new DirectFieldAccessor(kafkaTemplate) + .getPropertyValue("producerFactory")).isEqualTo(producerFactory); + assertThat(kafkaTemplate.getDefaultTopic()).isEqualTo("testTopic"); + DirectFieldAccessor dfa = new DirectFieldAccessor(kafkaListenerContainerFactory); + assertThat(dfa.getPropertyValue("consumerFactory")).isEqualTo(consumerFactory); + assertThat(dfa.getPropertyValue("containerProperties.ackMode")) + .isEqualTo(AckMode.MANUAL); + assertThat(dfa.getPropertyValue("containerProperties.ackCount")).isEqualTo(123); + assertThat(dfa.getPropertyValue("containerProperties.ackTime")).isEqualTo(456L); + assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3); + assertThat(dfa.getPropertyValue("containerProperties.pollTimeout")) + .isEqualTo(2000L); + assertThat(dfa.getPropertyValue("batchListener")).isEqualTo(true); + assertThat(context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)) + .hasSize(1); + KafkaJaasLoginModuleInitializer jaas = context + .getBean(KafkaJaasLoginModuleInitializer.class); + dfa = new DirectFieldAccessor(jaas); + assertThat(dfa.getPropertyValue("loginModule")).isEqualTo("foo"); + assertThat(dfa.getPropertyValue("controlFlag")) + .isEqualTo(AppConfigurationEntry.LoginModuleControlFlag.REQUISITE); + assertThat(((Map) dfa.getPropertyValue("options"))) + .containsExactly(entry("useKeyTab", "true")); + }); + } + + @Test + public void testKafkaTemplateRecordMessageConverters() { + this.contextRunner.withUserConfiguration(RecordMessageConvertersConfiguration.class) + .run((context) -> { + KafkaTemplate kafkaTemplate = context.getBean(KafkaTemplate.class); + assertThat(kafkaTemplate.getMessageConverter()) + .isSameAs(context.getBean("myRecordMessageConverter")); + }); + } + + @Test + public void testConcurrentKafkaListenerContainerFactoryWithCustomMessageConverters() { + this.contextRunner.withUserConfiguration(MessageConvertersConfiguration.class) + .run((context) -> { + ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory = context + .getBean(ConcurrentKafkaListenerContainerFactory.class); + DirectFieldAccessor dfa = new DirectFieldAccessor( + kafkaListenerContainerFactory); + assertThat(dfa.getPropertyValue("messageConverter")) + .isSameAs(context.getBean("myMessageConverter")); + }); } - private void load(String... environment) { - AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(); - ctx.register(KafkaAutoConfiguration.class); - TestPropertyValues.of(environment).applyTo(ctx); - ctx.refresh(); - this.context = ctx; + @Configuration + protected static class TestConfiguration { + + } + + @Configuration + protected static class RecordMessageConvertersConfiguration { + + @Bean + public RecordMessageConverter myRecordMessageConverter() { + return mock(RecordMessageConverter.class); + } + + } + + @Configuration + protected static class MessageConvertersConfiguration { + + @Bean + public MessageConverter myMessageConverter() { + return mock(MessageConverter.class); + } + } }