Polish "Consider RecordFilterStrategy in Kafka auto-configuration"

See gh-22973
pull/22412/head
Stephane Nicoll 4 years ago
parent f68dfde35e
commit 8c0318edc7

@ -1,5 +1,5 @@
/*
* Copyright 2012-2019 the original author or authors.
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -29,6 +29,7 @@ import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
@ -45,6 +46,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
private MessageConverter messageConverter;
private RecordFilterStrategy<Object, Object> recordFilterStrategy;
private KafkaTemplate<Object, Object> replyTemplate;
private KafkaAwareTransactionManager<Object, Object> transactionManager;
@ -75,6 +78,14 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
this.messageConverter = messageConverter;
}
/**
* Set the {@link RecordFilterStrategy} to use to filter incoming records.
* @param recordFilterStrategy the record filter strategy
*/
void setRecordFilterStrategy(RecordFilterStrategy<Object, Object> recordFilterStrategy) {
this.recordFilterStrategy = recordFilterStrategy;
}
/**
* Set the {@link KafkaTemplate} to use to send replies.
* @param replyTemplate the reply template
@ -151,6 +162,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
Listener properties = this.properties.getListener();
map.from(properties::getConcurrency).to(factory::setConcurrency);
map.from(this.messageConverter).to(factory::setMessageConverter);
map.from(this.recordFilterStrategy).to(factory::setRecordFilterStrategy);
map.from(this.replyTemplate).to(factory::setReplyTemplate);
if (properties.getType().equals(Listener.Type.BATCH)) {
factory.setBatchListener(true);

@ -54,6 +54,8 @@ class KafkaAnnotationDrivenConfiguration {
private final RecordMessageConverter messageConverter;
private final RecordFilterStrategy<Object, Object> recordFilterStrategy;
private final BatchMessageConverter batchMessageConverter;
private final KafkaTemplate<Object, Object> kafkaTemplate;
@ -72,6 +74,7 @@ class KafkaAnnotationDrivenConfiguration {
KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
ObjectProvider<RecordMessageConverter> messageConverter,
ObjectProvider<RecordFilterStrategy<Object, Object>> recordFilterStrategy,
ObjectProvider<BatchMessageConverter> batchMessageConverter,
ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
@ -81,6 +84,7 @@ class KafkaAnnotationDrivenConfiguration {
ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) {
this.properties = properties;
this.messageConverter = messageConverter.getIfUnique();
this.recordFilterStrategy = recordFilterStrategy.getIfUnique();
this.batchMessageConverter = batchMessageConverter
.getIfUnique(() -> new BatchMessagingMessageConverter(this.messageConverter));
this.kafkaTemplate = kafkaTemplate.getIfUnique();
@ -100,6 +104,7 @@ class KafkaAnnotationDrivenConfiguration {
MessageConverter messageConverterToUse = (this.properties.getListener().getType().equals(Type.BATCH))
? this.batchMessageConverter : this.messageConverter;
configurer.setMessageConverter(messageConverterToUse);
configurer.setRecordFilterStrategy(this.recordFilterStrategy);
configurer.setReplyTemplate(this.kafkaTemplate);
configurer.setTransactionManager(this.transactionManager);
configurer.setRebalanceListener(this.rebalanceListener);
@ -114,10 +119,8 @@ class KafkaAnnotationDrivenConfiguration {
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory,
ObjectProvider<RecordFilterStrategy<Object, Object>> kafkaFilterStrategyProvider) {
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
kafkaFilterStrategyProvider.ifAvailable(factory::setRecordFilterStrategy);
configurer.configure(factory, kafkaConsumerFactory
.getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
return factory;

@ -465,6 +465,25 @@ class KafkaAutoConfigurationTests {
});
}
@Test
void testConcurrentKafkaListenerContainerFactoryWithDefaultRecordFilterStrategy() {
this.contextRunner.run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(factory).hasFieldOrPropertyWithValue("recordFilterStrategy", null);
});
}
@Test
void testConcurrentKafkaListenerContainerFactoryWithCustomRecordFilterStrategy() {
this.contextRunner.withUserConfiguration(RecordFilterStrategyConfiguration.class).run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(factory).hasFieldOrPropertyWithValue("recordFilterStrategy",
context.getBean("recordFilterStrategy"));
});
}
@Test
void testConcurrentKafkaListenerContainerFactoryWithCustomErrorHandler() {
this.contextRunner.withUserConfiguration(ErrorHandlerConfiguration.class).run((context) -> {
@ -589,16 +608,6 @@ class KafkaAutoConfigurationTests {
});
}
@Test
void testConcurrentKafkaListenerContainerFactoryWithCustomRecordFilterStrategy() {
this.contextRunner.withUserConfiguration(TestRecordFilterStrategyConfiguration.class).run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(factory).hasFieldOrPropertyWithValue("recordFilterStrategy",
context.getBean("recordFilterStrategy"));
});
}
@Configuration(proxyBeanMethods = false)
static class MessageConverterConfiguration {
@ -619,6 +628,16 @@ class KafkaAutoConfigurationTests {
}
@Configuration(proxyBeanMethods = false)
static class RecordFilterStrategyConfiguration {
@Bean
RecordFilterStrategy<Object, Object> recordFilterStrategy() {
return (record) -> false;
}
}
@Configuration(proxyBeanMethods = false)
static class ErrorHandlerConfiguration {
@ -731,15 +750,4 @@ class KafkaAutoConfigurationTests {
}
@Configuration(proxyBeanMethods = false)
static class TestRecordFilterStrategyConfiguration {
@Bean
@SuppressWarnings("unchecked")
RecordFilterStrategy<Object, Object> recordFilterStrategy() {
return mock(RecordFilterStrategy.class);
}
}
}

@ -5725,7 +5725,7 @@ The following component creates a listener endpoint on the `someTopic` topic:
----
If a `KafkaTransactionManager` bean is defined, it is automatically associated to the container factory.
Similarly, if a `ErrorHandler`, `AfterRollbackProcessor` or `ConsumerAwareRebalanceListener` bean is defined, it is automatically associated to the default factory.
Similarly, if a `RecordFilterStrategy`, `ErrorHandler`, `AfterRollbackProcessor` or `ConsumerAwareRebalanceListener` bean is defined, it is automatically associated to the default factory.
Depending on the listener type, a `RecordMessageConverter` or `BatchMessageConverter` bean is associated to the default factory.
If only a `RecordMessageConverter` bean is present for a batch listener, it is wrapped in a `BatchMessageConverter`.

Loading…
Cancel
Save