Upgrade to Spring Kafka 3.0.3

Closes gh-34354
pull/34365/head
Andy Wilkinson 2 years ago
parent fcf75fd6cb
commit 6885c3432e

@ -30,7 +30,8 @@ import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
/**
@ -45,7 +46,9 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
private KafkaProperties properties;
private MessageConverter messageConverter;
private BatchMessageConverter batchMessageConverter;
private RecordMessageConverter recordMessageConverter;
private RecordFilterStrategy<Object, Object> recordFilterStrategy;
@ -72,11 +75,19 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
}
/**
* Set the {@link MessageConverter} to use.
* @param messageConverter the message converter
* Set the {@link BatchMessageConverter} to use.
* @param batchMessageConverter the message converter
*/
void setBatchMessageConverter(BatchMessageConverter batchMessageConverter) {
this.batchMessageConverter = batchMessageConverter;
}
/**
* Set the {@link RecordMessageConverter} to use.
* @param recordMessageConverter the message converter
*/
void setMessageConverter(MessageConverter messageConverter) {
this.messageConverter = messageConverter;
void setRecordMessageConverter(RecordMessageConverter recordMessageConverter) {
this.recordMessageConverter = recordMessageConverter;
}
/**
@ -164,7 +175,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
Listener properties = this.properties.getListener();
map.from(properties::getConcurrency).to(factory::setConcurrency);
map.from(properties::isAutoStartup).to(factory::setAutoStartup);
map.from(this.messageConverter).to(factory::setMessageConverter);
map.from(this.batchMessageConverter).to(factory::setBatchMessageConverter);
map.from(this.recordMessageConverter).to(factory::setRecordMessageConverter);
map.from(this.recordFilterStrategy).to(factory::setRecordFilterStrategy);
map.from(this.replyTemplate).to(factory::setReplyTemplate);
if (properties.getType().equals(Listener.Type.BATCH)) {

@ -19,7 +19,6 @@ package org.springframework.boot.autoconfigure.kafka;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener.Type;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
@ -38,7 +37,6 @@ import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
@ -55,7 +53,7 @@ class KafkaAnnotationDrivenConfiguration {
private final KafkaProperties properties;
private final RecordMessageConverter messageConverter;
private final RecordMessageConverter recordMessageConverter;
private final RecordFilterStrategy<Object, Object> recordFilterStrategy;
@ -76,7 +74,7 @@ class KafkaAnnotationDrivenConfiguration {
private final BatchInterceptor<Object, Object> batchInterceptor;
KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
ObjectProvider<RecordMessageConverter> messageConverter,
ObjectProvider<RecordMessageConverter> recordMessageConverter,
ObjectProvider<RecordFilterStrategy<Object, Object>> recordFilterStrategy,
ObjectProvider<BatchMessageConverter> batchMessageConverter,
ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
@ -87,10 +85,10 @@ class KafkaAnnotationDrivenConfiguration {
ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor,
ObjectProvider<BatchInterceptor<Object, Object>> batchInterceptor) {
this.properties = properties;
this.messageConverter = messageConverter.getIfUnique();
this.recordMessageConverter = recordMessageConverter.getIfUnique();
this.recordFilterStrategy = recordFilterStrategy.getIfUnique();
this.batchMessageConverter = batchMessageConverter
.getIfUnique(() -> new BatchMessagingMessageConverter(this.messageConverter));
.getIfUnique(() -> new BatchMessagingMessageConverter(this.recordMessageConverter));
this.kafkaTemplate = kafkaTemplate.getIfUnique();
this.transactionManager = kafkaTransactionManager.getIfUnique();
this.rebalanceListener = rebalanceListener.getIfUnique();
@ -105,9 +103,8 @@ class KafkaAnnotationDrivenConfiguration {
ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
configurer.setKafkaProperties(this.properties);
MessageConverter messageConverterToUse = (this.properties.getListener().getType().equals(Type.BATCH))
? this.batchMessageConverter : this.messageConverter;
configurer.setMessageConverter(messageConverterToUse);
configurer.setBatchMessageConverter(this.batchMessageConverter);
configurer.setRecordMessageConverter(this.recordMessageConverter);
configurer.setRecordFilterStrategy(this.recordFilterStrategy);
configurer.setReplyTemplate(this.kafkaTemplate);
configurer.setTransactionManager(this.transactionManager);

@ -551,7 +551,7 @@ class KafkaAutoConfigurationTests {
this.contextRunner.withUserConfiguration(MessageConverterConfiguration.class).run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(kafkaListenerContainerFactory).hasFieldOrPropertyWithValue("messageConverter",
assertThat(kafkaListenerContainerFactory).hasFieldOrPropertyWithValue("recordMessageConverter",
context.getBean("myMessageConverter"));
});
}
@ -564,7 +564,7 @@ class KafkaAutoConfigurationTests {
.run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(kafkaListenerContainerFactory).hasFieldOrPropertyWithValue("messageConverter",
assertThat(kafkaListenerContainerFactory).hasFieldOrPropertyWithValue("batchMessageConverter",
context.getBean("myBatchMessageConverter"));
});
}
@ -577,7 +577,7 @@ class KafkaAutoConfigurationTests {
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
Object messageConverter = ReflectionTestUtils.getField(kafkaListenerContainerFactory,
"messageConverter");
"batchMessageConverter");
assertThat(messageConverter).isInstanceOf(BatchMessagingMessageConverter.class);
assertThat(((BatchMessageConverter) messageConverter).getRecordMessageConverter())
.isSameAs(context.getBean("myMessageConverter"));
@ -589,7 +589,8 @@ class KafkaAutoConfigurationTests {
this.contextRunner.withPropertyValues("spring.kafka.listener.type=batch").run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
Object messageConverter = ReflectionTestUtils.getField(kafkaListenerContainerFactory, "messageConverter");
Object messageConverter = ReflectionTestUtils.getField(kafkaListenerContainerFactory,
"batchMessageConverter");
assertThat(messageConverter).isInstanceOf(BatchMessagingMessageConverter.class);
assertThat(((BatchMessageConverter) messageConverter).getRecordMessageConverter()).isNull();
});

@ -1371,7 +1371,7 @@ bom {
]
}
}
library("Spring Kafka", "3.0.2") {
library("Spring Kafka", "3.0.3") {
group("org.springframework.kafka") {
modules = [
"spring-kafka",

Loading…
Cancel
Save