diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index 60f9dfd73b..a8f092cd7b 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -37,7 +37,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.PropertyMapper; import org.springframework.boot.convert.DurationUnit; import org.springframework.core.io.Resource; -import org.springframework.kafka.core.CleanupConfig; import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.util.CollectionUtils; @@ -686,6 +685,8 @@ public class KafkaProperties { private final Security security = new Security(); + private final Cleanup cleanup = new Cleanup(); + /** * Kafka streams application.id property; default spring.application.name. */ @@ -723,11 +724,6 @@ public class KafkaProperties { */ private String stateDir; - /** - * Cleanup configuration for the state stores. - */ - private Cleanup cleanup; - /** * Additional Kafka properties used to configure the streams. */ @@ -741,6 +737,10 @@ public class KafkaProperties { return this.security; } + public Cleanup getCleanup() { + return this.cleanup; + } + public String getApplicationId() { return this.applicationId; } @@ -797,14 +797,6 @@ public class KafkaProperties { this.stateDir = stateDir; } - public Cleanup getCleanup() { - return cleanup; - } - - public void setCleanup(Cleanup cleanup) { - this.cleanup = cleanup; - } - public Map getProperties() { return this.properties; } @@ -1248,53 +1240,57 @@ public class KafkaProperties { } - public enum IsolationLevel { + public static class Cleanup { /** - * Read everything including aborted transactions. + * Cleanup the application’s local state directory on startup. */ - READ_UNCOMMITTED((byte) 0), + private boolean onStartup = false; /** - * Read records from committed transactions, in addition to records not part of - * transactions. + * Cleanup the application’s local state directory on shutdown. */ - READ_COMMITTED((byte) 1); + private boolean onShutdown = true; - private final byte id; + public boolean isOnStartup() { + return this.onStartup; + } - IsolationLevel(byte id) { - this.id = id; + public void setOnStartup(boolean onStartup) { + this.onStartup = onStartup; } - public byte id() { - return this.id; + public boolean isOnShutdown() { + return this.onShutdown; + } + + public void setOnShutdown(boolean onShutdown) { + this.onShutdown = onShutdown; } } - public static class Cleanup { + public enum IsolationLevel { /** - * Cleanup the application's state on start. + * Read everything including aborted transactions. */ - private boolean onStart = false; + READ_UNCOMMITTED((byte) 0), /** - * Cleanup the application's state on stop. + * Read records from committed transactions, in addition to records not part of + * transactions. */ - private boolean onStop = true; + READ_COMMITTED((byte) 1); - public CleanupConfig buildCleanupConfig() { - return new CleanupConfig(this.onStart, this.onStop); - } + private final byte id; - public boolean isOnStart() { - return onStart; + IsolationLevel(byte id) { + this.id = id; } - public boolean isOnStop() { - return onStop; + public byte id() { + return this.id; } } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java index ef36f27353..feec421f84 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java @@ -34,6 +34,7 @@ import org.springframework.core.env.Environment; import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.config.StreamsBuilderFactoryBean; +import org.springframework.kafka.core.CleanupConfig; /** * Configuration for Kafka Streams annotation-driven support. @@ -91,11 +92,9 @@ class KafkaStreamsAnnotationDrivenConfiguration { @Override public void afterPropertiesSet() { this.factoryBean.setAutoStartup(this.properties.getStreams().isAutoStartup()); - KafkaProperties.Cleanup cleanup = this.properties.getStreams().getCleanup(); - if (cleanup != null) { - this.factoryBean.setCleanupConfig(cleanup.buildCleanupConfig()); - } + CleanupConfig cleanupConfig = new CleanupConfig(cleanup.isOnStartup(), cleanup.isOnShutdown()); + this.factoryBean.setCleanupConfig(cleanupConfig); } } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 3e3e93e052..a5e592bf7b 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; import org.springframework.boot.autoconfigure.AutoConfigurations; @@ -50,6 +51,7 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.config.StreamsBuilderFactoryBean; +import org.springframework.kafka.core.CleanupConfig; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; @@ -340,6 +342,26 @@ class KafkaAutoConfigurationTests { }); } + @Test + void streamsWithCleanupConfig() { + this.contextRunner + .withUserConfiguration(EnableKafkaStreamsConfiguration.class, TestKafkaStreamsConfiguration.class) + .withPropertyValues("spring.application.name=my-test-app", + "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093", + "spring.kafka.streams.auto-startup=false", "spring.kafka.streams.cleanup.on-startup=true", + "spring.kafka.streams.cleanup.on-shutdown=false") + .run((context) -> { + StreamsBuilderFactoryBean streamsBuilderFactoryBean = context + .getBean(StreamsBuilderFactoryBean.class); + assertThat(streamsBuilderFactoryBean) + .extracting("cleanupConfig", InstanceOfAssertFactories.type(CleanupConfig.class)) + .satisfies((cleanupConfig) -> { + assertThat(cleanupConfig.cleanupOnStart()).isTrue(); + assertThat(cleanupConfig.cleanupOnStop()).isFalse(); + }); + }); + } + @Test void streamsApplicationIdIsMandatory() { this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class).run((context) -> { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaPropertiesTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaPropertiesTests.java index faa4d47020..87361f7859 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaPropertiesTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaPropertiesTests.java @@ -18,8 +18,10 @@ package org.springframework.boot.autoconfigure.kafka; import org.junit.jupiter.api.Test; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Cleanup; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.IsolationLevel; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener; +import org.springframework.kafka.core.CleanupConfig; import org.springframework.kafka.listener.ContainerProperties; import static org.assertj.core.api.Assertions.assertThat; @@ -48,4 +50,12 @@ class KafkaPropertiesTests { assertThat(listenerProperties.isMissingTopicsFatal()).isEqualTo(container.isMissingTopicsFatal()); } + @Test + void cleanupConfigDefaultValuesAreConsistent() { + CleanupConfig cleanupConfig = new CleanupConfig(); + Cleanup cleanup = new KafkaProperties().getStreams().getCleanup(); + assertThat(cleanup.isOnStartup()).isEqualTo(cleanupConfig.cleanupOnStart()); + assertThat(cleanup.isOnShutdown()).isEqualTo(cleanupConfig.cleanupOnStop()); + } + }