Merge pull request #16740 from garyrussell

* pr/16740:
  Polish "Add configuration property for Spring Kafka's missingTopicsFatal"
  Add configuration property for Spring Kafka's missingTopicsFatal
pull/16905/head
Stephane Nicoll 6 years ago
commit 7bf48ed3b7

@ -158,6 +158,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
map.from(properties::getMonitorInterval).as(Duration::getSeconds)
.as(Number::intValue).to(container::setMonitorInterval);
map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig);
map.from(properties::isMissingTopicsFatal).to(container::setMissingTopicsFatal);
map.from(this.transactionManager).to(container::setTransactionManager);
}

@ -873,6 +873,12 @@ public class KafkaProperties {
*/
private Boolean logContainerConfig;
/**
* Whether the container should fail to start if at least one of the configured
* topics are not present on the broker.
*/
private boolean missingTopicsFatal = true;
public Type getType() {
return this.type;
}
@ -961,6 +967,14 @@ public class KafkaProperties {
this.logContainerConfig = logContainerConfig;
}
public boolean isMissingTopicsFatal() {
return this.missingTopicsFatal;
}
public void setMissingTopicsFatal(boolean missingTopicsFatal) {
this.missingTopicsFatal = missingTopicsFatal;
}
}
public static class Ssl {

@ -38,6 +38,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -457,6 +458,7 @@ public class KafkaAutoConfigurationTests {
"spring.kafka.listener.idle-event-interval=1s",
"spring.kafka.listener.monitor-interval=45",
"spring.kafka.listener.log-container-config=true",
"spring.kafka.listener.missing-topics-fatal=false",
"spring.kafka.jaas.enabled=true",
"spring.kafka.producer.transaction-id-prefix=foo",
"spring.kafka.jaas.login-module=foo",
@ -491,6 +493,7 @@ public class KafkaAutoConfigurationTests {
.isEqualTo(1000L);
assertThat(containerProperties.getMonitorInterval()).isEqualTo(45);
assertThat(containerProperties.isLogContainerConfig()).isTrue();
assertThat(containerProperties.isMissingTopicsFatal()).isFalse();
assertThat(ReflectionTestUtils.getField(kafkaListenerContainerFactory,
"concurrency")).isEqualTo(3);
assertThat(kafkaListenerContainerFactory.isBatchListener()).isTrue();
@ -509,6 +512,19 @@ public class KafkaAutoConfigurationTests {
});
}
@Test
public void listenerPropertiesMatchDefaults() {
this.contextRunner.run((context) -> {
Listener listenerProperties = new KafkaProperties().getListener();
AbstractKafkaListenerContainerFactory<?, ?, ?> kafkaListenerContainerFactory = (AbstractKafkaListenerContainerFactory<?, ?, ?>) context
.getBean(KafkaListenerContainerFactory.class);
ContainerProperties containerProperties = kafkaListenerContainerFactory
.getContainerProperties();
assertThat(containerProperties.isMissingTopicsFatal())
.isEqualTo(listenerProperties.isMissingTopicsFatal());
});
}
@Test
public void testKafkaTemplateRecordMessageConverters() {
this.contextRunner.withUserConfiguration(MessageConverterConfiguration.class)

Loading…
Cancel
Save