diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index 7431fcf848..09a45bdb8f 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -189,6 +189,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { map.from(properties::getMonitorInterval).as(Duration::getSeconds).as(Number::intValue) .to(container::setMonitorInterval); map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig); + map.from(properties::getOnlyLogRecordMetadata).to(container::setOnlyLogRecordMetadata); map.from(properties::isMissingTopicsFatal).to(container::setMissingTopicsFatal); map.from(this.transactionManager).to(container::setTransactionManager); map.from(this.rebalanceListener).to(container::setConsumerRebalanceListener); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index a8f092cd7b..83fb5aa5a1 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -913,6 +913,12 @@ public class KafkaProperties { */ private Boolean logContainerConfig; + /** + * Whether to suppress the entire record from being written to the log when + * retries are being attempted. + */ + private Boolean onlyLogRecordMetadata; + /** * Whether the container should fail to start if at least one of the configured * topics are not present on the broker. @@ -1015,6 +1021,14 @@ public class KafkaProperties { this.logContainerConfig = logContainerConfig; } + public Boolean getOnlyLogRecordMetadata() { + return this.onlyLogRecordMetadata; + } + + public void setOnlyLogRecordMetadata(Boolean onlyLogRecordMetadata) { + this.onlyLogRecordMetadata = onlyLogRecordMetadata; + } + public boolean isMissingTopicsFatal() { return this.missingTopicsFatal; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index a5e592bf7b..c00a815286 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -392,6 +392,7 @@ class KafkaAutoConfigurationTests { "spring.kafka.listener.no-poll-threshold=2.5", "spring.kafka.listener.type=batch", "spring.kafka.listener.idle-between-polls=1s", "spring.kafka.listener.idle-event-interval=1s", "spring.kafka.listener.monitor-interval=45", "spring.kafka.listener.log-container-config=true", + "spring.kafka.listener.only-log-record-metadata=true", "spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true", "spring.kafka.producer.transaction-id-prefix=foo", "spring.kafka.jaas.login-module=foo", "spring.kafka.jaas.control-flag=REQUISITE", "spring.kafka.jaas.options.useKeyTab=true") @@ -418,6 +419,7 @@ class KafkaAutoConfigurationTests { assertThat(containerProperties.getIdleEventInterval()).isEqualTo(1000L); assertThat(containerProperties.getMonitorInterval()).isEqualTo(45); assertThat(containerProperties.isLogContainerConfig()).isTrue(); + assertThat(containerProperties.isOnlyLogRecordMetadata()).isTrue(); assertThat(containerProperties.isMissingTopicsFatal()).isTrue(); assertThat(kafkaListenerContainerFactory).extracting("concurrency").isEqualTo(3); assertThat(kafkaListenerContainerFactory.isBatchListener()).isTrue();