Improve Kafka Auto-configuration

- transaction manager
- error handler
- after rollback processor

See gh-14215
pull/14228/head
Gary Russell 6 years ago committed by Stephane Nicoll
parent e8d21fc964
commit 59c6dc5c7a

@ -23,8 +23,11 @@ import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
/** /**
* Configure {@link ConcurrentKafkaListenerContainerFactory} with sensible defaults. * Configure {@link ConcurrentKafkaListenerContainerFactory} with sensible defaults.
@ -41,6 +44,12 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
private KafkaTemplate<Object, Object> replyTemplate; private KafkaTemplate<Object, Object> replyTemplate;
private KafkaAwareTransactionManager<Object, Object> transactionManager;
private ErrorHandler errorHandler;
private AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
/** /**
* Set the {@link KafkaProperties} to use. * Set the {@link KafkaProperties} to use.
* @param properties the properties * @param properties the properties
@ -65,6 +74,32 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
this.replyTemplate = replyTemplate; this.replyTemplate = replyTemplate;
} }
/**
* Set the {@link KafkaAwareTransactionManager} to use.
* @param transactionManager the transaction manager
*/
public void setTransactionManager(
KafkaAwareTransactionManager<Object, Object> transactionManager) {
this.transactionManager = transactionManager;
}
/**
* Set the {@link ErrorHandler} to use.
* @param errorHandler the error handler
*/
public void setErrorHandler(ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}
/**
* Set the {@link AfterRollbackProcessor} to use.
* @param afterRollbackProcessor the after rollback processor
*/
public void setAfterRollbackProcessor(
AfterRollbackProcessor<Object, Object> afterRollbackProcessor) {
this.afterRollbackProcessor = afterRollbackProcessor;
}
/** /**
* Configure the specified Kafka listener container factory. The factory can be * Configure the specified Kafka listener container factory. The factory can be
* further tuned and default settings can be overridden. * further tuned and default settings can be overridden.
@ -89,6 +124,9 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
map.from(this.replyTemplate).whenNonNull().to(factory::setReplyTemplate); map.from(this.replyTemplate).whenNonNull().to(factory::setReplyTemplate);
map.from(properties::getType).whenEqualTo(Listener.Type.BATCH) map.from(properties::getType).whenEqualTo(Listener.Type.BATCH)
.toCall(() -> factory.setBatchListener(true)); .toCall(() -> factory.setBatchListener(true));
map.from(this.errorHandler).whenNonNull().to(factory::setErrorHandler);
map.from(this.afterRollbackProcessor).whenNonNull()
.to(factory::setAfterRollbackProcessor);
} }
private void configureContainer(ContainerProperties container) { private void configureContainer(ContainerProperties container) {
@ -109,6 +147,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
.as(Number::intValue).to(container::setMonitorInterval); .as(Number::intValue).to(container::setMonitorInterval);
map.from(properties::getLogContainerConfig).whenNonNull() map.from(properties::getLogContainerConfig).whenNonNull()
.to(container::setLogContainerConfig); .to(container::setLogContainerConfig);
map.from(this.transactionManager).whenNonNull()
.to(container::setTransactionManager);
} }
} }

@ -26,7 +26,10 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerConfigUtils; import org.springframework.kafka.config.KafkaListenerConfigUtils;
import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
/** /**
* Configuration for Kafka annotation-driven support. * Configuration for Kafka annotation-driven support.
@ -45,12 +48,24 @@ class KafkaAnnotationDrivenConfiguration {
private final KafkaTemplate<Object, Object> kafkaTemplate; private final KafkaTemplate<Object, Object> kafkaTemplate;
private final KafkaAwareTransactionManager<Object, Object> transactionManager;
private final ErrorHandler errorHandler;
private final AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
KafkaAnnotationDrivenConfiguration(KafkaProperties properties, KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
ObjectProvider<RecordMessageConverter> messageConverter, ObjectProvider<RecordMessageConverter> messageConverter,
ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate) { ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
ObjectProvider<ErrorHandler> errorHandler,
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor) {
this.properties = properties; this.properties = properties;
this.messageConverter = messageConverter.getIfUnique(); this.messageConverter = messageConverter.getIfUnique();
this.kafkaTemplate = kafkaTemplate.getIfUnique(); this.kafkaTemplate = kafkaTemplate.getIfUnique();
this.transactionManager = kafkaTransactionManager.getIfUnique();
this.errorHandler = errorHandler.getIfUnique();
this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();
} }
@Bean @Bean
@ -60,6 +75,9 @@ class KafkaAnnotationDrivenConfiguration {
configurer.setKafkaProperties(this.properties); configurer.setKafkaProperties(this.properties);
configurer.setMessageConverter(this.messageConverter); configurer.setMessageConverter(this.messageConverter);
configurer.setReplyTemplate(this.kafkaTemplate); configurer.setReplyTemplate(this.kafkaTemplate);
configurer.setErrorHandler(this.errorHandler);
configurer.setTransactionManager(this.transactionManager);
configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
return configurer; return configurer;
} }

@ -41,6 +41,7 @@ import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.EnableKafkaStreams; import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
@ -50,12 +51,16 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.converter.MessagingMessageConverter; import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.kafka.transaction.ChainedKafkaTransactionManager;
import org.springframework.kafka.transaction.KafkaTransactionManager; import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry; import static org.assertj.core.api.Assertions.entry;
@ -155,6 +160,10 @@ public class KafkaAutoConfigurationTests {
assertThat(configs.get("baz")).isEqualTo("qux"); assertThat(configs.get("baz")).isEqualTo("qux");
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz"); assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox"); assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(KafkaTestUtils.getPropertyValue(factory, "errorHandler"))
.isSameAs(context.getBean("errorHandler"));
}); });
} }
@ -485,6 +494,7 @@ public class KafkaAutoConfigurationTests {
@Test @Test
public void testKafkaTemplateRecordMessageConverters() { public void testKafkaTemplateRecordMessageConverters() {
this.contextRunner.withUserConfiguration(MessageConverterConfiguration.class) this.contextRunner.withUserConfiguration(MessageConverterConfiguration.class)
.withPropertyValues("spring.kafka.producer.transaction-id-prefix=test")
.run((context) -> { .run((context) -> {
KafkaTemplate<?, ?> kafkaTemplate = context KafkaTemplate<?, ?> kafkaTemplate = context
.getBean(KafkaTemplate.class); .getBean(KafkaTemplate.class);
@ -496,6 +506,7 @@ public class KafkaAutoConfigurationTests {
@Test @Test
public void testConcurrentKafkaListenerContainerFactoryWithCustomMessageConverters() { public void testConcurrentKafkaListenerContainerFactoryWithCustomMessageConverters() {
this.contextRunner.withUserConfiguration(MessageConverterConfiguration.class) this.contextRunner.withUserConfiguration(MessageConverterConfiguration.class)
.withPropertyValues("spring.kafka.producer.transaction-id-prefix=test")
.run((context) -> { .run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class); .getBean(ConcurrentKafkaListenerContainerFactory.class);
@ -503,6 +514,11 @@ public class KafkaAutoConfigurationTests {
kafkaListenerContainerFactory); kafkaListenerContainerFactory);
assertThat(dfa.getPropertyValue("messageConverter")) assertThat(dfa.getPropertyValue("messageConverter"))
.isSameAs(context.getBean("myMessageConverter")); .isSameAs(context.getBean("myMessageConverter"));
assertThat(kafkaListenerContainerFactory.getContainerProperties()
.getTransactionManager()).isSameAs(
context.getBean("chainedTransactionManager"));
assertThat(dfa.getPropertyValue("afterRollbackProcessor"))
.isSameAs(context.getBean("arp"));
}); });
} }
@ -521,6 +537,11 @@ public class KafkaAutoConfigurationTests {
@Configuration @Configuration
protected static class TestConfiguration { protected static class TestConfiguration {
@Bean
public SeekToCurrentErrorHandler errorHandler() {
return new SeekToCurrentErrorHandler();
}
} }
@Configuration @Configuration
@ -531,6 +552,22 @@ public class KafkaAutoConfigurationTests {
return mock(RecordMessageConverter.class); return mock(RecordMessageConverter.class);
} }
@Bean
@Primary
public PlatformTransactionManager chainedTransactionManager(
KafkaTransactionManager<String, String> kafkaTransactionManager) {
return new ChainedKafkaTransactionManager<String, String>(
kafkaTransactionManager);
}
@Bean
public AfterRollbackProcessor<Object, Object> arp() {
return (records, consumer, ex, recoverable) -> {
// no-op
};
}
} }
@Configuration @Configuration

@ -161,7 +161,7 @@
<spring-data-releasetrain.version>Lovelace-RC2</spring-data-releasetrain.version> <spring-data-releasetrain.version>Lovelace-RC2</spring-data-releasetrain.version>
<spring-hateoas.version>0.25.0.RELEASE</spring-hateoas.version> <spring-hateoas.version>0.25.0.RELEASE</spring-hateoas.version>
<spring-integration.version>5.1.0.M2</spring-integration.version> <spring-integration.version>5.1.0.M2</spring-integration.version>
<spring-kafka.version>2.2.0.M2</spring-kafka.version> <spring-kafka.version>2.2.0.BUILD-SNAPSHOT</spring-kafka.version>
<spring-ldap.version>2.3.2.RELEASE</spring-ldap.version> <spring-ldap.version>2.3.2.RELEASE</spring-ldap.version>
<spring-plugin.version>1.2.0.RELEASE</spring-plugin.version> <spring-plugin.version>1.2.0.RELEASE</spring-plugin.version>
<spring-restdocs.version>2.0.2.RELEASE</spring-restdocs.version> <spring-restdocs.version>2.0.2.RELEASE</spring-restdocs.version>

@ -5627,8 +5627,19 @@ bean is defined, it is automatically associated to the auto-configured `KafkaTem
When the Apache Kafka infrastructure is present, any bean can be annotated with When the Apache Kafka infrastructure is present, any bean can be annotated with
`@KafkaListener` to create a listener endpoint. If no `KafkaListenerContainerFactory` has `@KafkaListener` to create a listener endpoint. If no `KafkaListenerContainerFactory` has
been defined, a default one is automatically configured with keys defined in been defined, a default one is automatically configured with keys defined in
`spring.kafka.listener.*`. Also, if a `RecordMessageConverter` bean is defined, it is `spring.kafka.listener.*`.
automatically associated to the default factory. If the property `spring.kafka.producer.transaction-id-prefix` is defined, a
`KafkaTransactionManager` will be auto-configured with name `kafkaTransactionManager` and
will be wired into the container factory.
Also, if `RecordMessageConverter`, `ErrorHandler` and/or
`AfterRollbackProcessor` beans are defined, they are automatically associated to the
default factory.
IMPORTANT: The auto configuration of these beans occur if there is just a single
instance.
When using a `ChainedKafkaTransactionManager`, it will usually reference the configured
`KafkaTransactionManager` bean, so the chained manager must be marked
`@Primary` if you want it wired into the container factory.
The following component creates a listener endpoint on the `someTopic` topic: The following component creates a listener endpoint on the `someTopic` topic:

Loading…
Cancel
Save