Merge pull request #30776 from TheCK

* gh-30776:
  Polish "Add a configuration property for Kafka's async acks"
  Add a configuration property for Kafka's async acks

Closes gh-30776
pull/30862/head
Andy Wilkinson 3 years ago
commit f3aa5d0773

@ -166,6 +166,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
Listener properties = this.properties.getListener(); Listener properties = this.properties.getListener();
map.from(properties::getAckMode).to(container::setAckMode); map.from(properties::getAckMode).to(container::setAckMode);
map.from(properties::getAsyncAcks).to(container::setAsyncAcks);
map.from(properties::getClientId).to(container::setClientId); map.from(properties::getClientId).to(container::setClientId);
map.from(properties::getAckCount).to(container::setAckCount); map.from(properties::getAckCount).to(container::setAckCount);
map.from(properties::getAckTime).as(Duration::toMillis).to(container::setAckTime); map.from(properties::getAckTime).as(Duration::toMillis).to(container::setAckTime);

@ -881,6 +881,12 @@ public class KafkaProperties {
*/ */
private AckMode ackMode; private AckMode ackMode;
/**
* Support for asynchronous record acknowledgements. Only applies when
* spring.kafka.listener.ack-mode is manual or manual-immediate.
*/
private Boolean asyncAcks;
/** /**
* Prefix for the listener's consumer client.id property. * Prefix for the listener's consumer client.id property.
*/ */
@ -969,6 +975,14 @@ public class KafkaProperties {
this.ackMode = ackMode; this.ackMode = ackMode;
} }
public Boolean getAsyncAcks() {
return this.asyncAcks;
}
public void setAsyncAcks(Boolean asyncAcks) {
this.asyncAcks = asyncAcks;
}
public String getClientId() { public String getClientId() {
return this.clientId; return this.clientId;
} }

@ -450,18 +450,20 @@ class KafkaAutoConfigurationTests {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Test @Test
void listenerProperties() { void listenerProperties() {
this.contextRunner.withPropertyValues("spring.kafka.template.default-topic=testTopic", this.contextRunner
"spring.kafka.template.transaction-id-prefix=txOverride", "spring.kafka.listener.ack-mode=MANUAL", .withPropertyValues("spring.kafka.template.default-topic=testTopic",
"spring.kafka.listener.client-id=client", "spring.kafka.listener.ack-count=123", "spring.kafka.template.transaction-id-prefix=txOverride",
"spring.kafka.listener.ack-time=456", "spring.kafka.listener.concurrency=3", "spring.kafka.listener.ack-mode=MANUAL", "spring.kafka.listener.client-id=client",
"spring.kafka.listener.poll-timeout=2000", "spring.kafka.listener.no-poll-threshold=2.5", "spring.kafka.listener.ack-count=123", "spring.kafka.listener.ack-time=456",
"spring.kafka.listener.type=batch", "spring.kafka.listener.idle-between-polls=1s", "spring.kafka.listener.concurrency=3", "spring.kafka.listener.poll-timeout=2000",
"spring.kafka.listener.idle-event-interval=1s", "spring.kafka.listener.no-poll-threshold=2.5", "spring.kafka.listener.type=batch",
"spring.kafka.listener.idle-partition-event-interval=1s", "spring.kafka.listener.monitor-interval=45", "spring.kafka.listener.idle-between-polls=1s", "spring.kafka.listener.idle-event-interval=1s",
"spring.kafka.listener.log-container-config=true", "spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.listener.idle-partition-event-interval=1s",
"spring.kafka.jaas.enabled=true", "spring.kafka.listener.immediate-stop=true", "spring.kafka.listener.monitor-interval=45", "spring.kafka.listener.log-container-config=true",
"spring.kafka.producer.transaction-id-prefix=foo", "spring.kafka.jaas.login-module=foo", "spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true",
"spring.kafka.jaas.control-flag=REQUISITE", "spring.kafka.jaas.options.useKeyTab=true") "spring.kafka.listener.immediate-stop=true", "spring.kafka.producer.transaction-id-prefix=foo",
"spring.kafka.jaas.login-module=foo", "spring.kafka.jaas.control-flag=REQUISITE",
"spring.kafka.jaas.options.useKeyTab=true", "spring.kafka.listener.async-acks=true")
.run((context) -> { .run((context) -> {
DefaultKafkaProducerFactory<?, ?> producerFactory = context DefaultKafkaProducerFactory<?, ?> producerFactory = context
.getBean(DefaultKafkaProducerFactory.class); .getBean(DefaultKafkaProducerFactory.class);
@ -477,6 +479,7 @@ class KafkaAutoConfigurationTests {
assertThat(kafkaListenerContainerFactory.getConsumerFactory()).isEqualTo(consumerFactory); assertThat(kafkaListenerContainerFactory.getConsumerFactory()).isEqualTo(consumerFactory);
ContainerProperties containerProperties = kafkaListenerContainerFactory.getContainerProperties(); ContainerProperties containerProperties = kafkaListenerContainerFactory.getContainerProperties();
assertThat(containerProperties.getAckMode()).isEqualTo(AckMode.MANUAL); assertThat(containerProperties.getAckMode()).isEqualTo(AckMode.MANUAL);
assertThat(containerProperties.isAsyncAcks()).isEqualTo(true);
assertThat(containerProperties.getClientId()).isEqualTo("client"); assertThat(containerProperties.getClientId()).isEqualTo("client");
assertThat(containerProperties.getAckCount()).isEqualTo(123); assertThat(containerProperties.getAckCount()).isEqualTo(123);
assertThat(containerProperties.getAckTime()).isEqualTo(456L); assertThat(containerProperties.getAckTime()).isEqualTo(456L);

Loading…
Cancel
Save