From 52c3f1c7ef16c5f8cac94058806891635ea378eb Mon Sep 17 00:00:00 2001 From: TheCK Date: Fri, 22 Apr 2022 20:27:25 +0200 Subject: [PATCH] Add a configuration property for Kafka's async acks See gh-30776 --- ...fkaListenerContainerFactoryConfigurer.java | 1 + .../autoconfigure/kafka/KafkaProperties.java | 15 +++++++++++ .../kafka/KafkaAutoConfigurationTests.java | 27 ++++++++++--------- 3 files changed, 31 insertions(+), 12 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index 33efab12c5..550e3f424d 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -166,6 +166,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); Listener properties = this.properties.getListener(); map.from(properties::getAckMode).to(container::setAckMode); + map.from(properties::getAsyncAcks).to(container::setAsyncAcks); map.from(properties::getClientId).to(container::setClientId); map.from(properties::getAckCount).to(container::setAckCount); map.from(properties::getAckTime).as(Duration::toMillis).to(container::setAckTime); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index adc9991c27..9845110a82 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -881,6 +881,13 @@ public class KafkaProperties { */ private AckMode ackMode; + /** + * Support for asynchronous record acknowledgments. Only applies with + * ContainerProperties.AckMode.MANUAL or + * ContainerProperties.AckMode.MANUAL_IMMEDIATE. + */ + private Boolean asyncAcks; + /** * Prefix for the listener's consumer client.id property. */ @@ -969,6 +976,14 @@ public class KafkaProperties { this.ackMode = ackMode; } + public Boolean getAsyncAcks() { + return this.asyncAcks; + } + + public void setAsyncAcks(Boolean asyncAcks) { + this.asyncAcks = asyncAcks; + } + public String getClientId() { return this.clientId; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 913c390252..b61bc1528e 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -450,18 +450,20 @@ class KafkaAutoConfigurationTests { @SuppressWarnings("unchecked") @Test void listenerProperties() { - this.contextRunner.withPropertyValues("spring.kafka.template.default-topic=testTopic", - "spring.kafka.template.transaction-id-prefix=txOverride", "spring.kafka.listener.ack-mode=MANUAL", - "spring.kafka.listener.client-id=client", "spring.kafka.listener.ack-count=123", - "spring.kafka.listener.ack-time=456", "spring.kafka.listener.concurrency=3", - "spring.kafka.listener.poll-timeout=2000", "spring.kafka.listener.no-poll-threshold=2.5", - "spring.kafka.listener.type=batch", "spring.kafka.listener.idle-between-polls=1s", - "spring.kafka.listener.idle-event-interval=1s", - "spring.kafka.listener.idle-partition-event-interval=1s", "spring.kafka.listener.monitor-interval=45", - "spring.kafka.listener.log-container-config=true", "spring.kafka.listener.missing-topics-fatal=true", - "spring.kafka.jaas.enabled=true", "spring.kafka.listener.immediate-stop=true", - "spring.kafka.producer.transaction-id-prefix=foo", "spring.kafka.jaas.login-module=foo", - "spring.kafka.jaas.control-flag=REQUISITE", "spring.kafka.jaas.options.useKeyTab=true") + this.contextRunner + .withPropertyValues("spring.kafka.template.default-topic=testTopic", + "spring.kafka.template.transaction-id-prefix=txOverride", + "spring.kafka.listener.ack-mode=MANUAL", "spring.kafka.listener.client-id=client", + "spring.kafka.listener.ack-count=123", "spring.kafka.listener.ack-time=456", + "spring.kafka.listener.concurrency=3", "spring.kafka.listener.poll-timeout=2000", + "spring.kafka.listener.no-poll-threshold=2.5", "spring.kafka.listener.type=batch", + "spring.kafka.listener.idle-between-polls=1s", "spring.kafka.listener.idle-event-interval=1s", + "spring.kafka.listener.idle-partition-event-interval=1s", + "spring.kafka.listener.monitor-interval=45", "spring.kafka.listener.log-container-config=true", + "spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true", + "spring.kafka.listener.immediate-stop=true", "spring.kafka.producer.transaction-id-prefix=foo", + "spring.kafka.jaas.login-module=foo", "spring.kafka.jaas.control-flag=REQUISITE", + "spring.kafka.jaas.options.useKeyTab=true", "spring.kafka.listener.async-acks=true") .run((context) -> { DefaultKafkaProducerFactory producerFactory = context .getBean(DefaultKafkaProducerFactory.class); @@ -477,6 +479,7 @@ class KafkaAutoConfigurationTests { assertThat(kafkaListenerContainerFactory.getConsumerFactory()).isEqualTo(consumerFactory); ContainerProperties containerProperties = kafkaListenerContainerFactory.getContainerProperties(); assertThat(containerProperties.getAckMode()).isEqualTo(AckMode.MANUAL); + assertThat(containerProperties.isAsyncAcks()).isEqualTo(true); assertThat(containerProperties.getClientId()).isEqualTo("client"); assertThat(containerProperties.getAckCount()).isEqualTo(123); assertThat(containerProperties.getAckTime()).isEqualTo(456L);