Auto-configure BatchInterceptor on ConcurrentKafkaListenerContainerFactory

See gh-32951
pull/34049/head
Thomas Kåsene 2 years ago committed by Moritz Halbritter
parent 73d2bb5063
commit d1a089ba71

@ -24,6 +24,7 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor; import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchInterceptor;
import org.springframework.kafka.listener.CommonErrorHandler; import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ContainerProperties;
@ -37,6 +38,7 @@ import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
* *
* @author Gary Russell * @author Gary Russell
* @author Eddú Meléndez * @author Eddú Meléndez
* @author Thomas Kåsene
* @since 1.5.0 * @since 1.5.0
*/ */
public class ConcurrentKafkaListenerContainerFactoryConfigurer { public class ConcurrentKafkaListenerContainerFactoryConfigurer {
@ -59,6 +61,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
private RecordInterceptor<Object, Object> recordInterceptor; private RecordInterceptor<Object, Object> recordInterceptor;
private BatchInterceptor<Object, Object> batchInterceptor;
/** /**
* Set the {@link KafkaProperties} to use. * Set the {@link KafkaProperties} to use.
* @param properties the properties * @param properties the properties
@ -133,6 +137,14 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
this.recordInterceptor = recordInterceptor; this.recordInterceptor = recordInterceptor;
} }
/**
* Set the {@link BatchInterceptor} to use.
* @param batchInterceptor the batch interceptor.
*/
void setBatchInterceptor(BatchInterceptor<Object, Object> batchInterceptor) {
this.batchInterceptor = batchInterceptor;
}
/** /**
* Configure the specified Kafka listener container factory. The factory can be * Configure the specified Kafka listener container factory. The factory can be
* further tuned and default settings can be overridden. * further tuned and default settings can be overridden.
@ -161,6 +173,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
map.from(this.commonErrorHandler).to(factory::setCommonErrorHandler); map.from(this.commonErrorHandler).to(factory::setCommonErrorHandler);
map.from(this.afterRollbackProcessor).to(factory::setAfterRollbackProcessor); map.from(this.afterRollbackProcessor).to(factory::setAfterRollbackProcessor);
map.from(this.recordInterceptor).to(factory::setRecordInterceptor); map.from(this.recordInterceptor).to(factory::setRecordInterceptor);
map.from(this.batchInterceptor).to(factory::setBatchInterceptor);
} }
private void configureContainer(ContainerProperties container) { private void configureContainer(ContainerProperties container) {

@ -29,6 +29,7 @@ import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor; import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchInterceptor;
import org.springframework.kafka.listener.CommonErrorHandler; import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.RecordInterceptor; import org.springframework.kafka.listener.RecordInterceptor;
@ -44,6 +45,7 @@ import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
* *
* @author Gary Russell * @author Gary Russell
* @author Eddú Meléndez * @author Eddú Meléndez
* @author Thomas Kåsene
*/ */
@Configuration(proxyBeanMethods = false) @Configuration(proxyBeanMethods = false)
@ConditionalOnClass(EnableKafka.class) @ConditionalOnClass(EnableKafka.class)
@ -69,6 +71,8 @@ class KafkaAnnotationDrivenConfiguration {
private final RecordInterceptor<Object, Object> recordInterceptor; private final RecordInterceptor<Object, Object> recordInterceptor;
private final BatchInterceptor<Object, Object> batchInterceptor;
KafkaAnnotationDrivenConfiguration(KafkaProperties properties, KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
ObjectProvider<RecordMessageConverter> messageConverter, ObjectProvider<RecordMessageConverter> messageConverter,
ObjectProvider<RecordFilterStrategy<Object, Object>> recordFilterStrategy, ObjectProvider<RecordFilterStrategy<Object, Object>> recordFilterStrategy,
@ -78,7 +82,8 @@ class KafkaAnnotationDrivenConfiguration {
ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener, ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener,
ObjectProvider<CommonErrorHandler> commonErrorHandler, ObjectProvider<CommonErrorHandler> commonErrorHandler,
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor, ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,
ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) { ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor,
ObjectProvider<BatchInterceptor<Object, Object>> batchInterceptor) {
this.properties = properties; this.properties = properties;
this.messageConverter = messageConverter.getIfUnique(); this.messageConverter = messageConverter.getIfUnique();
this.recordFilterStrategy = recordFilterStrategy.getIfUnique(); this.recordFilterStrategy = recordFilterStrategy.getIfUnique();
@ -90,6 +95,7 @@ class KafkaAnnotationDrivenConfiguration {
this.commonErrorHandler = commonErrorHandler.getIfUnique(); this.commonErrorHandler = commonErrorHandler.getIfUnique();
this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique(); this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();
this.recordInterceptor = recordInterceptor.getIfUnique(); this.recordInterceptor = recordInterceptor.getIfUnique();
this.batchInterceptor = batchInterceptor.getIfUnique();
} }
@Bean @Bean
@ -107,6 +113,7 @@ class KafkaAnnotationDrivenConfiguration {
configurer.setCommonErrorHandler(this.commonErrorHandler); configurer.setCommonErrorHandler(this.commonErrorHandler);
configurer.setAfterRollbackProcessor(this.afterRollbackProcessor); configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
configurer.setRecordInterceptor(this.recordInterceptor); configurer.setRecordInterceptor(this.recordInterceptor);
configurer.setBatchInterceptor(this.batchInterceptor);
return configurer; return configurer;
} }

@ -63,6 +63,7 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor; import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchInterceptor;
import org.springframework.kafka.listener.CommonErrorHandler; import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ContainerProperties;
@ -95,6 +96,7 @@ import static org.mockito.Mockito.never;
* @author Eddú Meléndez * @author Eddú Meléndez
* @author Nakul Mishra * @author Nakul Mishra
* @author Tomaz Fernandes * @author Tomaz Fernandes
* @author Thomas Kåsene
*/ */
class KafkaAutoConfigurationTests { class KafkaAutoConfigurationTests {
@ -645,6 +647,15 @@ class KafkaAutoConfigurationTests {
}); });
} }
@Test
void testConcurrentKafkaListenerContainerFactoryWithCustomBatchInterceptor() {
this.contextRunner.withUserConfiguration(BatchInterceptorConfiguration.class).run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(factory).hasFieldOrPropertyWithValue("batchInterceptor", context.getBean("batchInterceptor"));
});
}
@Test @Test
void testConcurrentKafkaListenerContainerFactoryWithCustomRebalanceListener() { void testConcurrentKafkaListenerContainerFactoryWithCustomRebalanceListener() {
this.contextRunner.withUserConfiguration(RebalanceListenerConfiguration.class).run((context) -> { this.contextRunner.withUserConfiguration(RebalanceListenerConfiguration.class).run((context) -> {
@ -764,6 +775,16 @@ class KafkaAutoConfigurationTests {
} }
@Configuration(proxyBeanMethods = false)
static class BatchInterceptorConfiguration {
@Bean
BatchInterceptor<Object, Object> batchInterceptor() {
return (batch, consumer) -> batch;
}
}
@Configuration(proxyBeanMethods = false) @Configuration(proxyBeanMethods = false)
static class RebalanceListenerConfiguration { static class RebalanceListenerConfiguration {

Loading…
Cancel
Save