Add configuration property for Spring Kafka's missingTopicsFatal

See gh-16740
pull/16905/head
Gary Russell 6 years ago committed by Stephane Nicoll
parent 68085c93d1
commit 1583ce8d26

@ -159,6 +159,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
.as(Number::intValue).to(container::setMonitorInterval);
map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig);
map.from(this.transactionManager).to(container::setTransactionManager);
map.from(properties::getMissingTopicsFatal).to(container::setMissingTopicsFatal);
}
}

@ -873,6 +873,11 @@ public class KafkaProperties {
*/
private Boolean logContainerConfig;
/**
* Set to false to disable checking that topic(s) exist.
*/
private Boolean missingTopicsFatal;
public Type getType() {
return this.type;
}
@ -961,6 +966,14 @@ public class KafkaProperties {
this.logContainerConfig = logContainerConfig;
}
public Boolean getMissingTopicsFatal() {
return this.missingTopicsFatal;
}
public void setMissingTopicsFatal(Boolean missingTopicsFatal) {
this.missingTopicsFatal = missingTopicsFatal;
}
}
public static class Ssl {

@ -457,6 +457,7 @@ public class KafkaAutoConfigurationTests {
"spring.kafka.listener.idle-event-interval=1s",
"spring.kafka.listener.monitor-interval=45",
"spring.kafka.listener.log-container-config=true",
"spring.kafka.listener.missing-topics-fatal=false",
"spring.kafka.jaas.enabled=true",
"spring.kafka.producer.transaction-id-prefix=foo",
"spring.kafka.jaas.login-module=foo",
@ -491,6 +492,7 @@ public class KafkaAutoConfigurationTests {
.isEqualTo(1000L);
assertThat(containerProperties.getMonitorInterval()).isEqualTo(45);
assertThat(containerProperties.isLogContainerConfig()).isTrue();
assertThat(containerProperties.isMissingTopicsFatal()).isFalse();
assertThat(ReflectionTestUtils.getField(kafkaListenerContainerFactory,
"concurrency")).isEqualTo(3);
assertThat(kafkaListenerContainerFactory.isBatchListener()).isTrue();

Loading…
Cancel
Save