From 1583ce8d26ce6324cedb49a0ab38c3833ab47a9b Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Tue, 7 May 2019 11:50:49 -0400 Subject: [PATCH 1/2] Add configuration property for Spring Kafka's missingTopicsFatal See gh-16740 --- ...rentKafkaListenerContainerFactoryConfigurer.java | 1 + .../boot/autoconfigure/kafka/KafkaProperties.java | 13 +++++++++++++ .../kafka/KafkaAutoConfigurationTests.java | 2 ++ 3 files changed, 16 insertions(+) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index 0fddd28961..232a7e51b4 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -159,6 +159,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { .as(Number::intValue).to(container::setMonitorInterval); map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig); map.from(this.transactionManager).to(container::setTransactionManager); + map.from(properties::getMissingTopicsFatal).to(container::setMissingTopicsFatal); } } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index 81be6990e2..e82c88e6d3 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -873,6 +873,11 @@ public class KafkaProperties { */ private Boolean logContainerConfig; + /** + * Set to false to disable checking that topic(s) exist. + */ + private Boolean missingTopicsFatal; + public Type getType() { return this.type; } @@ -961,6 +966,14 @@ public class KafkaProperties { this.logContainerConfig = logContainerConfig; } + public Boolean getMissingTopicsFatal() { + return this.missingTopicsFatal; + } + + public void setMissingTopicsFatal(Boolean missingTopicsFatal) { + this.missingTopicsFatal = missingTopicsFatal; + } + } public static class Ssl { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index ae5aa6f431..444efdc2de 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -457,6 +457,7 @@ public class KafkaAutoConfigurationTests { "spring.kafka.listener.idle-event-interval=1s", "spring.kafka.listener.monitor-interval=45", "spring.kafka.listener.log-container-config=true", + "spring.kafka.listener.missing-topics-fatal=false", "spring.kafka.jaas.enabled=true", "spring.kafka.producer.transaction-id-prefix=foo", "spring.kafka.jaas.login-module=foo", @@ -491,6 +492,7 @@ public class KafkaAutoConfigurationTests { .isEqualTo(1000L); assertThat(containerProperties.getMonitorInterval()).isEqualTo(45); assertThat(containerProperties.isLogContainerConfig()).isTrue(); + assertThat(containerProperties.isMissingTopicsFatal()).isFalse(); assertThat(ReflectionTestUtils.getField(kafkaListenerContainerFactory, "concurrency")).isEqualTo(3); assertThat(kafkaListenerContainerFactory.isBatchListener()).isTrue(); From 3c46b9e83de45f262f77fe6a59e3d04efff17c02 Mon Sep 17 00:00:00 2001 From: Stephane Nicoll Date: Mon, 20 May 2019 14:45:59 +0200 Subject: [PATCH 2/2] Polish "Add configuration property for Spring Kafka's missingTopicsFatal" Closes gh-16740 --- ...entKafkaListenerContainerFactoryConfigurer.java | 2 +- .../boot/autoconfigure/kafka/KafkaProperties.java | 9 +++++---- .../kafka/KafkaAutoConfigurationTests.java | 14 ++++++++++++++ 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index 232a7e51b4..1631f5980d 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -158,8 +158,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { map.from(properties::getMonitorInterval).as(Duration::getSeconds) .as(Number::intValue).to(container::setMonitorInterval); map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig); + map.from(properties::isMissingTopicsFatal).to(container::setMissingTopicsFatal); map.from(this.transactionManager).to(container::setTransactionManager); - map.from(properties::getMissingTopicsFatal).to(container::setMissingTopicsFatal); } } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index e82c88e6d3..1d51bc0c11 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -874,9 +874,10 @@ public class KafkaProperties { private Boolean logContainerConfig; /** - * Set to false to disable checking that topic(s) exist. + * Whether the container should fail to start if at least one of the configured + * topics are not present on the broker. */ - private Boolean missingTopicsFatal; + private boolean missingTopicsFatal = true; public Type getType() { return this.type; @@ -966,11 +967,11 @@ public class KafkaProperties { this.logContainerConfig = logContainerConfig; } - public Boolean getMissingTopicsFatal() { + public boolean isMissingTopicsFatal() { return this.missingTopicsFatal; } - public void setMissingTopicsFatal(Boolean missingTopicsFatal) { + public void setMissingTopicsFatal(boolean missingTopicsFatal) { this.missingTopicsFatal = missingTopicsFatal; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 444efdc2de..935af69698 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -38,6 +38,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.junit.jupiter.api.Test; import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener; import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -511,6 +512,19 @@ public class KafkaAutoConfigurationTests { }); } + @Test + public void listenerPropertiesMatchDefaults() { + this.contextRunner.run((context) -> { + Listener listenerProperties = new KafkaProperties().getListener(); + AbstractKafkaListenerContainerFactory kafkaListenerContainerFactory = (AbstractKafkaListenerContainerFactory) context + .getBean(KafkaListenerContainerFactory.class); + ContainerProperties containerProperties = kafkaListenerContainerFactory + .getContainerProperties(); + assertThat(containerProperties.isMissingTopicsFatal()) + .isEqualTo(listenerProperties.isMissingTopicsFatal()); + }); + } + @Test public void testKafkaTemplateRecordMessageConverters() { this.contextRunner.withUserConfiguration(MessageConverterConfiguration.class)