Add ConnectionDetail support to Kafka auto-configuration

Update Kafka auto-configuration so that `KafkaConnectionDetails`
beans may be optionally used to provide connection details.

See gh-34657

Co-Authored-By: Mortitz Halbritter <mkammerer@vmware.com>
Co-Authored-By: Phillip Webb <pwebb@vmware.com>
pull/34759/head
Andy Wilkinson 2 years ago
parent d860d875b9
commit 042f0c8520

@ -18,6 +18,12 @@ package org.springframework.boot.autoconfigure.kafka;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.AutoConfiguration;
@ -26,6 +32,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate;
import org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails.Node;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Retry.Topic;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@ -56,6 +63,9 @@ import org.springframework.retry.backoff.SleepingBackOffPolicy;
* @author Eddú Meléndez
* @author Nakul Mishra
* @author Tomaz Fernandes
* @author Moritz Halbritter
* @author Andy Wilkinson
* @author Phillip Webb
* @since 1.5.0
*/
@AutoConfiguration
@ -66,8 +76,12 @@ public class KafkaAutoConfiguration {
private final KafkaProperties properties;
public KafkaAutoConfiguration(KafkaProperties properties) {
private final KafkaConnectionDetails connectionDetails;
KafkaAutoConfiguration(KafkaProperties properties, ObjectProvider<KafkaConnectionDetails> connectionDetails) {
this.properties = properties;
this.connectionDetails = connectionDetails
.getIfAvailable(() -> new PropertiesKafkaConnectionDetails(properties));
}
@Bean
@ -94,8 +108,9 @@ public class KafkaAutoConfiguration {
@ConditionalOnMissingBean(ConsumerFactory.class)
public DefaultKafkaConsumerFactory<?, ?> kafkaConsumerFactory(
ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {
DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(
this.properties.buildConsumerProperties());
Map<String, Object> properties = this.properties.buildConsumerProperties();
applyKafkaConnectionDetailsForConsumer(properties);
DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(properties);
customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
return factory;
}
@ -104,8 +119,9 @@ public class KafkaAutoConfiguration {
@ConditionalOnMissingBean(ProducerFactory.class)
public DefaultKafkaProducerFactory<?, ?> kafkaProducerFactory(
ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
this.properties.buildProducerProperties());
Map<String, Object> properties = this.properties.buildProducerProperties();
applyKafkaConnectionDetailsForProducer(properties);
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(properties);
String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
@ -140,7 +156,9 @@ public class KafkaAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public KafkaAdmin kafkaAdmin() {
KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());
Map<String, Object> properties = this.properties.buildAdminProperties();
applyKafkaConnectionDetailsForAdmin(properties);
KafkaAdmin kafkaAdmin = new KafkaAdmin(properties);
KafkaProperties.Admin admin = this.properties.getAdmin();
if (admin.getCloseTimeout() != null) {
kafkaAdmin.setCloseTimeout((int) admin.getCloseTimeout().getSeconds());
@ -168,6 +186,34 @@ public class KafkaAutoConfiguration {
return builder.create(kafkaTemplate);
}
private void applyKafkaConnectionDetailsForConsumer(Map<String, Object> properties) {
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
nodesToStringList(this.connectionDetails.getConsumerBootstrapNodes()));
if (!(this.connectionDetails instanceof PropertiesKafkaConnectionDetails)) {
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
}
}
private void applyKafkaConnectionDetailsForProducer(Map<String, Object> properties) {
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
nodesToStringList(this.connectionDetails.getProducerBootstrapNodes()));
if (!(this.connectionDetails instanceof PropertiesKafkaConnectionDetails)) {
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
}
}
private void applyKafkaConnectionDetailsForAdmin(Map<String, Object> properties) {
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
nodesToStringList(this.connectionDetails.getAdminBootstrapNodes()));
if (!(this.connectionDetails instanceof PropertiesKafkaConnectionDetails)) {
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
}
}
private List<String> nodesToStringList(List<Node> nodes) {
return nodes.stream().map((node) -> node.host() + ":" + node.port()).toList();
}
private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Topic retryTopic) {
long delay = (retryTopic.getDelay() != null) ? retryTopic.getDelay().toMillis() : 0;
if (delay > 0) {

@ -0,0 +1,81 @@
/*
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.kafka;
import java.util.List;
import org.springframework.boot.autoconfigure.service.connection.ConnectionDetails;
/**
* Details required to establish a connection to a Kafka service.
*
* @author Moritz Halbritter
* @author Andy Wilkinson
* @author Phillip Webb
* @since 3.1.0
*/
public interface KafkaConnectionDetails extends ConnectionDetails {
/**
* Returns the list of bootstrap nodes.
* @return the list of bootstrap nodes
*/
List<Node> getBootstrapNodes();
/**
* Returns the list of bootstrap nodes used for consumers.
* @return the list of bootstrap nodes used for consumers
*/
default List<Node> getConsumerBootstrapNodes() {
return getBootstrapNodes();
}
/**
* Returns the list of bootstrap nodes used for producers.
* @return the list of bootstrap nodes used for producers
*/
default List<Node> getProducerBootstrapNodes() {
return getBootstrapNodes();
}
/**
* Returns the list of bootstrap nodes used for the admin.
* @return the list of bootstrap nodes used for the admin
*/
default List<Node> getAdminBootstrapNodes() {
return getBootstrapNodes();
}
/**
* Returns the list of bootstrap nodes used for Kafka Streams.
* @return the list of bootstrap nodes used for Kafka Streams
*/
default List<Node> getStreamsBootstrapNodes() {
return getBootstrapNodes();
}
/**
* A Kafka node.
*
* @param host the hostname
* @param port the port
*/
record Node(String host, int port) {
}
}

@ -1,5 +1,5 @@
/*
* Copyright 2012-2020 the original author or authors.
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,8 +16,11 @@
package org.springframework.boot.autoconfigure.kafka;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
@ -27,6 +30,7 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails.Node;
import org.springframework.boot.context.properties.source.InvalidConfigurationPropertyValueException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -42,6 +46,8 @@ import org.springframework.kafka.core.CleanupConfig;
* @author Gary Russell
* @author Stephane Nicoll
* @author Eddú Meléndez
* @author Moritz Halbritter
* @author Andy Wilkinson
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(StreamsBuilder.class)
@ -56,17 +62,21 @@ class KafkaStreamsAnnotationDrivenConfiguration {
@ConditionalOnMissingBean
@Bean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
KafkaStreamsConfiguration defaultKafkaStreamsConfig(Environment environment) {
Map<String, Object> streamsProperties = this.properties.buildStreamsProperties();
KafkaStreamsConfiguration defaultKafkaStreamsConfig(Environment environment,
ObjectProvider<KafkaConnectionDetails> connectionDetailsProvider) {
KafkaConnectionDetails connectionDetails = connectionDetailsProvider
.getIfAvailable(() -> new PropertiesKafkaConnectionDetails(this.properties));
Map<String, Object> properties = this.properties.buildStreamsProperties();
applyKafkaConnectionDetailsForStreams(connectionDetails, properties);
if (this.properties.getStreams().getApplicationId() == null) {
String applicationName = environment.getProperty("spring.application.name");
if (applicationName == null) {
throw new InvalidConfigurationPropertyValueException("spring.kafka.streams.application-id", null,
"This property is mandatory and fallback 'spring.application.name' is not set either.");
}
streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationName);
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationName);
}
return new KafkaStreamsConfiguration(streamsProperties);
return new KafkaStreamsConfiguration(properties);
}
@Bean
@ -77,6 +87,19 @@ class KafkaStreamsAnnotationDrivenConfiguration {
return new KafkaStreamsFactoryBeanConfigurer(this.properties, factoryBean);
}
private void applyKafkaConnectionDetailsForStreams(KafkaConnectionDetails connectionDetails,
Map<String, Object> properties) {
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
nodesToStringList(connectionDetails.getStreamsBootstrapNodes()));
if (!(connectionDetails instanceof PropertiesKafkaConnectionDetails)) {
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
}
}
private List<String> nodesToStringList(List<Node> nodes) {
return nodes.stream().map((node) -> node.host() + ":" + node.port()).toList();
}
// Separate class required to avoid BeanCurrentlyInCreationException
static class KafkaStreamsFactoryBeanConfigurer implements InitializingBean {

@ -0,0 +1,75 @@
/*
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.kafka;
import java.util.List;
/**
* Adapts {@link KafkaProperties} to {@link KafkaConnectionDetails}.
*
* @author Moritz Halbritter
* @author Andy Wilkinson
* @author Phillip Webb
*/
class PropertiesKafkaConnectionDetails implements KafkaConnectionDetails {
private final int DEFAULT_PORT = 9092;
private final KafkaProperties properties;
PropertiesKafkaConnectionDetails(KafkaProperties properties) {
this.properties = properties;
}
@Override
public List<Node> getBootstrapNodes() {
return asNodes(this.properties.getBootstrapServers());
}
@Override
public List<Node> getConsumerBootstrapNodes() {
return bootstrapNodes(this.properties.getConsumer().getBootstrapServers());
}
@Override
public List<Node> getProducerBootstrapNodes() {
return bootstrapNodes(this.properties.getProducer().getBootstrapServers());
}
@Override
public List<Node> getStreamsBootstrapNodes() {
return bootstrapNodes(this.properties.getStreams().getBootstrapServers());
}
private List<Node> bootstrapNodes(List<String> bootstrapServers) {
return (bootstrapServers != null) ? asNodes(bootstrapServers) : getBootstrapNodes();
}
private List<Node> asNodes(List<String> bootstrapServers) {
return bootstrapServers.stream().map(this::asNode).toList();
}
private Node asNode(String bootstrapNode) {
int separatorIndex = bootstrapNode.indexOf(':');
if (separatorIndex == -1) {
return new Node(bootstrapNode, this.DEFAULT_PORT);
}
return new Node(bootstrapNode.substring(0, separatorIndex),
Integer.parseInt(bootstrapNode.substring(separatorIndex + 1)));
}
}

@ -99,6 +99,9 @@ import static org.mockito.Mockito.never;
* @author Nakul Mishra
* @author Tomaz Fernandes
* @author Thomas Kåsene
* @author Moritz Halbritter
* @author Andy Wilkinson
* @author Phillip Webb
*/
class KafkaAutoConfigurationTests {
@ -161,6 +164,24 @@ class KafkaAutoConfigurationTests {
});
}
@Test
void connectionDetailsAreAppliedToConsumer() {
this.contextRunner
.withPropertyValues("spring.kafka.bootstrap-servers=foo:1234",
"spring.kafka.consumer.bootstrap-servers=foo:1234", "spring.kafka.security.protocol=SSL",
"spring.kafka.consumer.security.protocol=SSL")
.withBean(KafkaConnectionDetails.class, this::kafkaConnectionDetails)
.run((context) -> {
DefaultKafkaConsumerFactory<?, ?> consumerFactory = context.getBean(DefaultKafkaConsumerFactory.class);
Map<String, Object> configs = consumerFactory.getConfigurationProperties();
assertThat(configs).containsEntry(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
Collections.singletonList("kafka.example.com:12345"));
assertThat(configs).containsEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
Collections.singletonList("kafka.example.com:12345"));
assertThat(configs).containsEntry(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
});
}
@Test
void producerProperties() {
this.contextRunner.withPropertyValues("spring.kafka.clientId=cid",
@ -211,6 +232,24 @@ class KafkaAutoConfigurationTests {
});
}
@Test
void connectionDetailsAreAppliedToProducer() {
this.contextRunner
.withPropertyValues("spring.kafka.bootstrap-servers=foo:1234",
"spring.kafka.producer.bootstrap-servers=foo:1234", "spring.kafka.security.protocol=SSL",
"spring.kafka.producer.security.protocol=SSL")
.withBean(KafkaConnectionDetails.class, this::kafkaConnectionDetails)
.run((context) -> {
DefaultKafkaProducerFactory<?, ?> producerFactory = context.getBean(DefaultKafkaProducerFactory.class);
Map<String, Object> configs = producerFactory.getConfigurationProperties();
assertThat(configs).containsEntry(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
Collections.singletonList("kafka.example.com:12345"));
assertThat(configs).containsEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
Collections.singletonList("kafka.example.com:12345"));
assertThat(configs).containsEntry(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
});
}
@Test
void adminProperties() {
this.contextRunner
@ -252,6 +291,24 @@ class KafkaAutoConfigurationTests {
});
}
@Test
void connectionDetailsAreAppliedToAdmin() {
this.contextRunner
.withPropertyValues("spring.kafka.bootstrap-servers=foo:1234", "spring.kafka.security.protocol=SSL",
"spring.kafka.admin.security.protocol=SSL")
.withBean(KafkaConnectionDetails.class, this::kafkaConnectionDetails)
.run((context) -> {
KafkaAdmin admin = context.getBean(KafkaAdmin.class);
Map<String, Object> configs = admin.getConfigurationProperties();
assertThat(configs).containsEntry(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
Collections.singletonList("kafka.example.com:12345"));
assertThat(configs).containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
Collections.singletonList("kafka.example.com:12345"));
assertThat(configs).containsEntry(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
assertThat(configs).containsEntry(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
});
}
@SuppressWarnings("unchecked")
@Test
void streamsProperties() {
@ -298,6 +355,27 @@ class KafkaAutoConfigurationTests {
});
}
@Test
void connectionDetailsAreAppliedToStreams() {
this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class)
.withPropertyValues("spring.kafka.streams.auto-startup=false", "spring.kafka.streams.application-id=test",
"spring.kafka.bootstrap-servers=foo:1234", "spring.kafka.streams.bootstrap-servers=foo:1234",
"spring.kafka.security.protocol=SSL", "spring.kafka.streams.security.protocol=SSL")
.withBean(KafkaConnectionDetails.class, this::kafkaConnectionDetails)
.run((context) -> {
Properties configs = context
.getBean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME,
KafkaStreamsConfiguration.class)
.asProperties();
assertThat(configs).containsEntry(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
Collections.singletonList("kafka.example.com:12345"));
assertThat(configs).containsEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
Collections.singletonList("kafka.example.com:12345"));
assertThat(configs).containsEntry(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
assertThat(configs).containsEntry(StreamsConfig.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
});
}
@SuppressWarnings("deprecation")
@Deprecated(since = "3.1.0", forRemoval = true)
void streamsCacheMaxSizeBuffering() {
@ -744,6 +822,17 @@ class KafkaAutoConfigurationTests {
});
}
private KafkaConnectionDetails kafkaConnectionDetails() {
return new KafkaConnectionDetails() {
@Override
public List<Node> getBootstrapNodes() {
return List.of(new Node("kafka.example.com", 12345));
}
};
}
@Configuration(proxyBeanMethods = false)
static class MessageConverterConfiguration {

Loading…
Cancel
Save