Merge branch '2.7.x'

pull/29263/head
Stephane Nicoll 3 years ago
commit 9bfe3c21e5

@ -1,5 +1,5 @@
/*
* Copyright 2012-2020 the original author or authors.
* Copyright 2012-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -25,6 +25,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
@ -66,10 +67,12 @@ public class KafkaAutoConfiguration {
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter) {
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
map.from(kafkaProducerListener).to(kafkaTemplate::setProducerListener);
map.from(this.properties.getTemplate().getDefaultTopic()).to(kafkaTemplate::setDefaultTopic);
map.from(this.properties.getTemplate().getTransactionIdPrefix()).to(kafkaTemplate::setTransactionIdPrefix);
return kafkaTemplate;
}

@ -824,6 +824,12 @@ public class KafkaProperties {
*/
private String defaultTopic;
/**
* Transaction id prefix, override the transaction id prefix in the producer
* factory.
*/
private String transactionIdPrefix;
public String getDefaultTopic() {
return this.defaultTopic;
}
@ -832,6 +838,14 @@ public class KafkaProperties {
this.defaultTopic = defaultTopic;
}
public String getTransactionIdPrefix() {
return this.transactionIdPrefix;
}
public void setTransactionIdPrefix(String transactionIdPrefix) {
this.transactionIdPrefix = transactionIdPrefix;
}
}
public static class Listener {

@ -384,6 +384,7 @@ class KafkaAutoConfigurationTests {
void listenerProperties() {
this.contextRunner
.withPropertyValues("spring.kafka.template.default-topic=testTopic",
"spring.kafka.template.transaction-id-prefix=txOverride",
"spring.kafka.listener.ack-mode=MANUAL", "spring.kafka.listener.client-id=client",
"spring.kafka.listener.ack-count=123", "spring.kafka.listener.ack-time=456",
"spring.kafka.listener.concurrency=3", "spring.kafka.listener.poll-timeout=2000",
@ -406,6 +407,7 @@ class KafkaAutoConfigurationTests {
assertThat(kafkaTemplate.getMessageConverter()).isInstanceOf(MessagingMessageConverter.class);
assertThat(kafkaTemplate).hasFieldOrPropertyWithValue("producerFactory", producerFactory);
assertThat(kafkaTemplate.getDefaultTopic()).isEqualTo("testTopic");
assertThat(kafkaTemplate).hasFieldOrPropertyWithValue("transactionIdPrefix", "txOverride");
assertThat(kafkaListenerContainerFactory.getConsumerFactory()).isEqualTo(consumerFactory);
ContainerProperties containerProperties = kafkaListenerContainerFactory.getContainerProperties();
assertThat(containerProperties.getAckMode()).isEqualTo(AckMode.MANUAL);

Loading…
Cancel
Save