Add transactionIdPrefix Property to KafkaTemplate

See gh-29089
pull/29200/head
Gary Russell 3 years ago committed by Stephane Nicoll
parent af933f24c1
commit 33814a4a42

@ -25,6 +25,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
@ -66,10 +67,13 @@ public class KafkaAutoConfiguration {
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter) {
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
map.from(kafkaProducerListener).to(kafkaTemplate::setProducerListener);
map.from(this.properties.getTemplate().getDefaultTopic()).to(kafkaTemplate::setDefaultTopic);
map.from(this.properties.getTemplate().getTransactionIdPrefix()).to(kafkaTemplate::setTransactionIdPrefix);
return kafkaTemplate;
}

@ -824,6 +824,11 @@ public class KafkaProperties {
*/
private String defaultTopic;
/**
* Override the transaction id prefix in the producer factory.
*/
private String transactionIdPrefix;
public String getDefaultTopic() {
return this.defaultTopic;
}
@ -832,6 +837,14 @@ public class KafkaProperties {
this.defaultTopic = defaultTopic;
}
public String getTransactionIdPrefix() {
return this.transactionIdPrefix;
}
public void setTransactionIdPrefix(String transactionIdPrefix) {
this.transactionIdPrefix = transactionIdPrefix;
}
}
public static class Listener {

@ -384,6 +384,7 @@ class KafkaAutoConfigurationTests {
void listenerProperties() {
this.contextRunner
.withPropertyValues("spring.kafka.template.default-topic=testTopic",
"spring.kafka.template.transaction-id-prefix=txOverride",
"spring.kafka.listener.ack-mode=MANUAL", "spring.kafka.listener.client-id=client",
"spring.kafka.listener.ack-count=123", "spring.kafka.listener.ack-time=456",
"spring.kafka.listener.concurrency=3", "spring.kafka.listener.poll-timeout=2000",
@ -406,6 +407,7 @@ class KafkaAutoConfigurationTests {
assertThat(kafkaTemplate.getMessageConverter()).isInstanceOf(MessagingMessageConverter.class);
assertThat(kafkaTemplate).hasFieldOrPropertyWithValue("producerFactory", producerFactory);
assertThat(kafkaTemplate.getDefaultTopic()).isEqualTo("testTopic");
assertThat(kafkaTemplate).hasFieldOrPropertyWithValue("transactionIdPrefix", "txOverride");
assertThat(kafkaListenerContainerFactory.getConsumerFactory()).isEqualTo(consumerFactory);
ContainerProperties containerProperties = kafkaListenerContainerFactory.getContainerProperties();
assertThat(containerProperties.getAckMode()).isEqualTo(AckMode.MANUAL);

Loading…
Cancel
Save