Auto-configure Kafka listener container with rebalance listener

This commit associates a `ConsumerAwareRebalanceListener` to the
auto-configured listener container factory if a single instance is found
in the context.

See gh-16755
pull/16930/head
Gary Russell 6 years ago committed by Stephane Nicoll
parent 0635d86cf4
commit abdc2e1b4f

@ -25,6 +25,7 @@ import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchErrorHandler;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.support.converter.MessageConverter;
@ -53,6 +54,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
private AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
private ConsumerAwareRebalanceListener rebalanceListener;
/**
* Set the {@link KafkaProperties} to use.
* @param properties the properties
@ -111,6 +114,15 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
this.afterRollbackProcessor = afterRollbackProcessor;
}
/**
* Set the {@link ConsumerAwareRebalanceListener} to use.
* @param rebalanceListener the rebalance listener.
* @since 2.2
*/
void setRebalanceListener(ConsumerAwareRebalanceListener rebalanceListener) {
this.rebalanceListener = rebalanceListener;
}
/**
* Configure the specified Kafka listener container factory. The factory can be
* further tuned and default settings can be overridden.
@ -160,6 +172,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig);
map.from(properties::isMissingTopicsFatal).to(container::setMissingTopicsFatal);
map.from(this.transactionManager).to(container::setTransactionManager);
map.from(this.rebalanceListener).to(container::setConsumerRebalanceListener);
}
}

@ -29,6 +29,7 @@ import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchErrorHandler;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
@ -63,6 +64,8 @@ class KafkaAnnotationDrivenConfiguration {
private final AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
private final ConsumerAwareRebalanceListener rebalanceListener;
KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
ObjectProvider<RecordMessageConverter> messageConverter,
ObjectProvider<BatchMessageConverter> batchMessageConverter,
@ -70,7 +73,8 @@ class KafkaAnnotationDrivenConfiguration {
ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
ObjectProvider<ErrorHandler> errorHandler,
ObjectProvider<BatchErrorHandler> batchErrorHandler,
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor) {
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,
ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener) {
this.properties = properties;
this.messageConverter = messageConverter.getIfUnique();
this.batchMessageConverter = batchMessageConverter.getIfUnique(
@ -80,6 +84,7 @@ class KafkaAnnotationDrivenConfiguration {
this.errorHandler = errorHandler.getIfUnique();
this.batchErrorHandler = batchErrorHandler.getIfUnique();
this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();
this.rebalanceListener = rebalanceListener.getIfUnique();
}
@Bean
@ -95,6 +100,7 @@ class KafkaAnnotationDrivenConfiguration {
configurer.setErrorHandler(this.errorHandler);
configurer.setBatchErrorHandler(this.batchErrorHandler);
configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
configurer.setRebalanceListener(this.rebalanceListener);
return configurer;
}

@ -55,6 +55,7 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler;
@ -674,6 +675,18 @@ public class KafkaAutoConfigurationTests {
});
}
@Test
public void testConcurrentKafkaListenerContainerFactoryWithCustomRebalanceListener() {
this.contextRunner.withUserConfiguration(RebalanceListenerConfiguration.class)
.run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(factory.getContainerProperties())
.hasFieldOrPropertyWithValue("consumerRebalanceListener",
context.getBean("rebalanceListener"));
});
}
@Test
public void testConcurrentKafkaListenerContainerFactoryWithKafkaTemplate() {
this.contextRunner.run((context) -> {
@ -749,6 +762,17 @@ public class KafkaAutoConfigurationTests {
}
@Configuration(proxyBeanMethods = false)
protected static class RebalanceListenerConfiguration {
@Bean
public ConsumerAwareRebalanceListener rebalanceListener() {
return new ConsumerAwareRebalanceListener() {
};
}
}
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
protected static class EnableKafkaStreamsConfiguration {

Loading…
Cancel
Save