Allow to configure Kafka Listener's onlyLogRecordMetadata

See gh-24582
pull/24604/head
Martín Dacosta 4 years ago committed by Stephane Nicoll
parent 1d7bda8ce9
commit b539e2e749

@ -189,6 +189,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
map.from(properties::getMonitorInterval).as(Duration::getSeconds).as(Number::intValue) map.from(properties::getMonitorInterval).as(Duration::getSeconds).as(Number::intValue)
.to(container::setMonitorInterval); .to(container::setMonitorInterval);
map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig); map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig);
map.from(properties::getOnlyLogRecordMetadata).to(container::setOnlyLogRecordMetadata);
map.from(properties::isMissingTopicsFatal).to(container::setMissingTopicsFatal); map.from(properties::isMissingTopicsFatal).to(container::setMissingTopicsFatal);
map.from(this.transactionManager).to(container::setTransactionManager); map.from(this.transactionManager).to(container::setTransactionManager);
map.from(this.rebalanceListener).to(container::setConsumerRebalanceListener); map.from(this.rebalanceListener).to(container::setConsumerRebalanceListener);

@ -913,6 +913,12 @@ public class KafkaProperties {
*/ */
private Boolean logContainerConfig; private Boolean logContainerConfig;
/**
* Whether to suppress the entire record from being written to the log when
* retries are being attempted.
*/
private Boolean onlyLogRecordMetadata;
/** /**
* Whether the container should fail to start if at least one of the configured * Whether the container should fail to start if at least one of the configured
* topics are not present on the broker. * topics are not present on the broker.
@ -1015,6 +1021,14 @@ public class KafkaProperties {
this.logContainerConfig = logContainerConfig; this.logContainerConfig = logContainerConfig;
} }
public Boolean getOnlyLogRecordMetadata() {
return this.onlyLogRecordMetadata;
}
public void setOnlyLogRecordMetadata(Boolean onlyLogRecordMetadata) {
this.onlyLogRecordMetadata = onlyLogRecordMetadata;
}
public boolean isMissingTopicsFatal() { public boolean isMissingTopicsFatal() {
return this.missingTopicsFatal; return this.missingTopicsFatal;
} }

@ -392,6 +392,7 @@ class KafkaAutoConfigurationTests {
"spring.kafka.listener.no-poll-threshold=2.5", "spring.kafka.listener.type=batch", "spring.kafka.listener.no-poll-threshold=2.5", "spring.kafka.listener.type=batch",
"spring.kafka.listener.idle-between-polls=1s", "spring.kafka.listener.idle-event-interval=1s", "spring.kafka.listener.idle-between-polls=1s", "spring.kafka.listener.idle-event-interval=1s",
"spring.kafka.listener.monitor-interval=45", "spring.kafka.listener.log-container-config=true", "spring.kafka.listener.monitor-interval=45", "spring.kafka.listener.log-container-config=true",
"spring.kafka.listener.only-log-record-metadata=true",
"spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true", "spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true",
"spring.kafka.producer.transaction-id-prefix=foo", "spring.kafka.jaas.login-module=foo", "spring.kafka.producer.transaction-id-prefix=foo", "spring.kafka.jaas.login-module=foo",
"spring.kafka.jaas.control-flag=REQUISITE", "spring.kafka.jaas.options.useKeyTab=true") "spring.kafka.jaas.control-flag=REQUISITE", "spring.kafka.jaas.options.useKeyTab=true")
@ -418,6 +419,7 @@ class KafkaAutoConfigurationTests {
assertThat(containerProperties.getIdleEventInterval()).isEqualTo(1000L); assertThat(containerProperties.getIdleEventInterval()).isEqualTo(1000L);
assertThat(containerProperties.getMonitorInterval()).isEqualTo(45); assertThat(containerProperties.getMonitorInterval()).isEqualTo(45);
assertThat(containerProperties.isLogContainerConfig()).isTrue(); assertThat(containerProperties.isLogContainerConfig()).isTrue();
assertThat(containerProperties.isOnlyLogRecordMetadata()).isTrue();
assertThat(containerProperties.isMissingTopicsFatal()).isTrue(); assertThat(containerProperties.isMissingTopicsFatal()).isTrue();
assertThat(kafkaListenerContainerFactory).extracting("concurrency").isEqualTo(3); assertThat(kafkaListenerContainerFactory).extracting("concurrency").isEqualTo(3);
assertThat(kafkaListenerContainerFactory.isBatchListener()).isTrue(); assertThat(kafkaListenerContainerFactory.isBatchListener()).isTrue();

Loading…
Cancel
Save