|
|
@ -57,12 +57,12 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
|
|
|
import org.springframework.kafka.core.KafkaAdmin;
|
|
|
|
import org.springframework.kafka.core.KafkaAdmin;
|
|
|
|
import org.springframework.kafka.core.KafkaTemplate;
|
|
|
|
import org.springframework.kafka.core.KafkaTemplate;
|
|
|
|
import org.springframework.kafka.listener.AfterRollbackProcessor;
|
|
|
|
import org.springframework.kafka.listener.AfterRollbackProcessor;
|
|
|
|
|
|
|
|
import org.springframework.kafka.listener.BatchErrorHandler;
|
|
|
|
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
|
|
|
|
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
|
|
|
|
import org.springframework.kafka.listener.ContainerProperties;
|
|
|
|
import org.springframework.kafka.listener.ContainerProperties;
|
|
|
|
import org.springframework.kafka.listener.ContainerProperties.AckMode;
|
|
|
|
import org.springframework.kafka.listener.ContainerProperties.AckMode;
|
|
|
|
|
|
|
|
import org.springframework.kafka.listener.ErrorHandler;
|
|
|
|
import org.springframework.kafka.listener.RecordInterceptor;
|
|
|
|
import org.springframework.kafka.listener.RecordInterceptor;
|
|
|
|
import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler;
|
|
|
|
|
|
|
|
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
|
|
|
|
|
|
|
|
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
|
|
|
|
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
|
|
|
|
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
|
|
|
|
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
|
|
|
|
import org.springframework.kafka.support.converter.BatchMessageConverter;
|
|
|
|
import org.springframework.kafka.support.converter.BatchMessageConverter;
|
|
|
@ -508,16 +508,17 @@ class KafkaAutoConfigurationTests {
|
|
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
@Test
|
|
|
|
void testConcurrentKafkaListenerContainerFactoryWithCustomErrorHandler() {
|
|
|
|
void testConcurrentKafkaListenerContainerFactoryWithCustomErrorHandler() {
|
|
|
|
this.contextRunner.withUserConfiguration(ErrorHandlerConfiguration.class).run((context) -> {
|
|
|
|
this.contextRunner.withBean("errorHandler", ErrorHandler.class, () -> mock(ErrorHandler.class))
|
|
|
|
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
|
|
|
|
.run((context) -> {
|
|
|
|
.getBean(ConcurrentKafkaListenerContainerFactory.class);
|
|
|
|
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
|
|
|
|
assertThat(factory).hasFieldOrPropertyWithValue("errorHandler", context.getBean("errorHandler"));
|
|
|
|
.getBean(ConcurrentKafkaListenerContainerFactory.class);
|
|
|
|
});
|
|
|
|
assertThat(factory).hasFieldOrPropertyWithValue("errorHandler", context.getBean("errorHandler"));
|
|
|
|
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
@Test
|
|
|
|
void concurrentKafkaListenerContainerFactoryInBatchModeShouldUseBatchErrorHandler() {
|
|
|
|
void concurrentKafkaListenerContainerFactoryInBatchModeShouldUseBatchErrorHandler() {
|
|
|
|
this.contextRunner.withUserConfiguration(BatchErrorHandlerConfiguration.class)
|
|
|
|
this.contextRunner.withBean("batchErrorHandler", BatchErrorHandler.class, () -> mock(BatchErrorHandler.class))
|
|
|
|
.withPropertyValues("spring.kafka.listener.type=batch").run((context) -> {
|
|
|
|
.withPropertyValues("spring.kafka.listener.type=batch").run((context) -> {
|
|
|
|
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
|
|
|
|
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
|
|
|
|
.getBean(ConcurrentKafkaListenerContainerFactory.class);
|
|
|
|
.getBean(ConcurrentKafkaListenerContainerFactory.class);
|
|
|
@ -538,7 +539,7 @@ class KafkaAutoConfigurationTests {
|
|
|
|
@Test
|
|
|
|
@Test
|
|
|
|
void concurrentKafkaListenerContainerFactoryInBatchModeAndSimpleErrorHandlerShouldBeNull() {
|
|
|
|
void concurrentKafkaListenerContainerFactoryInBatchModeAndSimpleErrorHandlerShouldBeNull() {
|
|
|
|
this.contextRunner.withPropertyValues("spring.kafka.listener.type=batch")
|
|
|
|
this.contextRunner.withPropertyValues("spring.kafka.listener.type=batch")
|
|
|
|
.withUserConfiguration(ErrorHandlerConfiguration.class).run((context) -> {
|
|
|
|
.withBean("errorHandler", ErrorHandler.class, () -> mock(ErrorHandler.class)).run((context) -> {
|
|
|
|
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
|
|
|
|
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
|
|
|
|
.getBean(ConcurrentKafkaListenerContainerFactory.class);
|
|
|
|
.getBean(ConcurrentKafkaListenerContainerFactory.class);
|
|
|
|
assertThat(factory).hasFieldOrPropertyWithValue("errorHandler", null);
|
|
|
|
assertThat(factory).hasFieldOrPropertyWithValue("errorHandler", null);
|
|
|
@ -663,26 +664,6 @@ class KafkaAutoConfigurationTests {
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@Configuration(proxyBeanMethods = false)
|
|
|
|
|
|
|
|
static class ErrorHandlerConfiguration {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Bean
|
|
|
|
|
|
|
|
SeekToCurrentErrorHandler errorHandler() {
|
|
|
|
|
|
|
|
return new SeekToCurrentErrorHandler();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Configuration(proxyBeanMethods = false)
|
|
|
|
|
|
|
|
static class BatchErrorHandlerConfiguration {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Bean
|
|
|
|
|
|
|
|
SeekToCurrentBatchErrorHandler batchErrorHandler() {
|
|
|
|
|
|
|
|
return new SeekToCurrentBatchErrorHandler();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Configuration(proxyBeanMethods = false)
|
|
|
|
@Configuration(proxyBeanMethods = false)
|
|
|
|
static class AfterRollbackProcessorConfiguration {
|
|
|
|
static class AfterRollbackProcessorConfiguration {
|
|
|
|
|
|
|
|
|
|
|
|