From 9274baeb68481656dd91cc3a6d8f5cce979c5f24 Mon Sep 17 00:00:00 2001 From: Pascal Ayotte Date: Mon, 4 Oct 2021 09:37:34 -0400 Subject: [PATCH 1/2] Add support for IdlePartitionEventInterval See gh-28290 --- ...entKafkaListenerContainerFactoryConfigurer.java | 2 ++ .../boot/autoconfigure/kafka/KafkaProperties.java | 14 ++++++++++++++ .../kafka/KafkaAutoConfigurationTests.java | 2 ++ 3 files changed, 18 insertions(+) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index d6472ee012..5d526fdfd9 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -199,6 +199,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { map.from(properties::getNoPollThreshold).to(container::setNoPollThreshold); map.from(properties.getIdleBetweenPolls()).as(Duration::toMillis).to(container::setIdleBetweenPolls); map.from(properties::getIdleEventInterval).as(Duration::toMillis).to(container::setIdleEventInterval); + map.from(properties::getIdlePartitionEventInterval).as(Duration::toMillis) + .to(container::setIdlePartitionEventInterval); map.from(properties::getMonitorInterval).as(Duration::getSeconds).as(Number::intValue) .to(container::setMonitorInterval); map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index e7054900f5..553d5a0868 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -902,6 +902,12 @@ public class KafkaProperties { */ private Duration idleEventInterval; + /** + * Time between publishing idle partition consumer events (no data received for + * partition). + */ + private Duration idlePartitionEventInterval; + /** * Time between checks for non-responsive consumers. If a duration suffix is not * specified, seconds will be used. @@ -1006,6 +1012,14 @@ public class KafkaProperties { this.idleEventInterval = idleEventInterval; } + public Duration getIdlePartitionEventInterval() { + return this.idlePartitionEventInterval; + } + + public void setIdlePartitionEventInterval(Duration idlePartitionEventInterval) { + this.idlePartitionEventInterval = idlePartitionEventInterval; + } + public Duration getMonitorInterval() { return this.monitorInterval; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 320a53ea09..3379f9e6ef 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -389,6 +389,7 @@ class KafkaAutoConfigurationTests { "spring.kafka.listener.concurrency=3", "spring.kafka.listener.poll-timeout=2000", "spring.kafka.listener.no-poll-threshold=2.5", "spring.kafka.listener.type=batch", "spring.kafka.listener.idle-between-polls=1s", "spring.kafka.listener.idle-event-interval=1s", + "spring.kafka.listener.idle-partition-event-interval=1s", "spring.kafka.listener.monitor-interval=45", "spring.kafka.listener.log-container-config=true", "spring.kafka.listener.only-log-record-metadata=true", "spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true", @@ -415,6 +416,7 @@ class KafkaAutoConfigurationTests { assertThat(containerProperties.getNoPollThreshold()).isEqualTo(2.5f); assertThat(containerProperties.getIdleBetweenPolls()).isEqualTo(1000L); assertThat(containerProperties.getIdleEventInterval()).isEqualTo(1000L); + assertThat(containerProperties.getIdlePartitionEventInterval()).isEqualTo(1000L); assertThat(containerProperties.getMonitorInterval()).isEqualTo(45); assertThat(containerProperties.isLogContainerConfig()).isTrue(); assertThat(containerProperties.isOnlyLogRecordMetadata()).isTrue(); From 91d7295c624e7b85d8f3149da6b1647039e9e6fd Mon Sep 17 00:00:00 2001 From: Stephane Nicoll Date: Mon, 3 Jan 2022 14:20:56 +0100 Subject: [PATCH 2/2] Polish "Add support for IdlePartitionEventInterval" See gh-28290 --- .../ConcurrentKafkaListenerContainerFactoryConfigurer.java | 2 +- .../boot/autoconfigure/kafka/KafkaProperties.java | 2 +- .../boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index 5d526fdfd9..ffa5987513 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2021 the original author or authors. + * Copyright 2012-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index 553d5a0868..9f471f1fb8 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2021 the original author or authors. + * Copyright 2012-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 3379f9e6ef..6e07a53dcc 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2021 the original author or authors. + * Copyright 2012-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.