diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java index cf7b44e1a9..9b9632ddc9 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java @@ -18,6 +18,12 @@ package org.springframework.boot.autoconfigure.kafka; import java.io.IOException; import java.time.Duration; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.AutoConfiguration; @@ -26,6 +32,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate; +import org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails.Node; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Retry.Topic; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -56,6 +63,9 @@ import org.springframework.retry.backoff.SleepingBackOffPolicy; * @author Eddú Meléndez * @author Nakul Mishra * @author Tomaz Fernandes + * @author Moritz Halbritter + * @author Andy Wilkinson + * @author Phillip Webb * @since 1.5.0 */ @AutoConfiguration @@ -66,8 +76,12 @@ public class KafkaAutoConfiguration { private final KafkaProperties properties; - public KafkaAutoConfiguration(KafkaProperties properties) { + private final KafkaConnectionDetails connectionDetails; + + KafkaAutoConfiguration(KafkaProperties properties, ObjectProvider connectionDetails) { this.properties = properties; + this.connectionDetails = connectionDetails + .getIfAvailable(() -> new PropertiesKafkaConnectionDetails(properties)); } @Bean @@ -94,8 +108,9 @@ public class KafkaAutoConfiguration { @ConditionalOnMissingBean(ConsumerFactory.class) public DefaultKafkaConsumerFactory kafkaConsumerFactory( ObjectProvider customizers) { - DefaultKafkaConsumerFactory factory = new DefaultKafkaConsumerFactory<>( - this.properties.buildConsumerProperties()); + Map properties = this.properties.buildConsumerProperties(); + applyKafkaConnectionDetailsForConsumer(properties); + DefaultKafkaConsumerFactory factory = new DefaultKafkaConsumerFactory<>(properties); customizers.orderedStream().forEach((customizer) -> customizer.customize(factory)); return factory; } @@ -104,8 +119,9 @@ public class KafkaAutoConfiguration { @ConditionalOnMissingBean(ProducerFactory.class) public DefaultKafkaProducerFactory kafkaProducerFactory( ObjectProvider customizers) { - DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>( - this.properties.buildProducerProperties()); + Map properties = this.properties.buildProducerProperties(); + applyKafkaConnectionDetailsForProducer(properties); + DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(properties); String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix(); if (transactionIdPrefix != null) { factory.setTransactionIdPrefix(transactionIdPrefix); @@ -140,7 +156,9 @@ public class KafkaAutoConfiguration { @Bean @ConditionalOnMissingBean public KafkaAdmin kafkaAdmin() { - KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties()); + Map properties = this.properties.buildAdminProperties(); + applyKafkaConnectionDetailsForAdmin(properties); + KafkaAdmin kafkaAdmin = new KafkaAdmin(properties); KafkaProperties.Admin admin = this.properties.getAdmin(); if (admin.getCloseTimeout() != null) { kafkaAdmin.setCloseTimeout((int) admin.getCloseTimeout().getSeconds()); @@ -168,6 +186,34 @@ public class KafkaAutoConfiguration { return builder.create(kafkaTemplate); } + private void applyKafkaConnectionDetailsForConsumer(Map properties) { + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + nodesToStringList(this.connectionDetails.getConsumerBootstrapNodes())); + if (!(this.connectionDetails instanceof PropertiesKafkaConnectionDetails)) { + properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT"); + } + } + + private void applyKafkaConnectionDetailsForProducer(Map properties) { + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + nodesToStringList(this.connectionDetails.getProducerBootstrapNodes())); + if (!(this.connectionDetails instanceof PropertiesKafkaConnectionDetails)) { + properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT"); + } + } + + private void applyKafkaConnectionDetailsForAdmin(Map properties) { + properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + nodesToStringList(this.connectionDetails.getAdminBootstrapNodes())); + if (!(this.connectionDetails instanceof PropertiesKafkaConnectionDetails)) { + properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT"); + } + } + + private List nodesToStringList(List nodes) { + return nodes.stream().map((node) -> node.host() + ":" + node.port()).toList(); + } + private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Topic retryTopic) { long delay = (retryTopic.getDelay() != null) ? retryTopic.getDelay().toMillis() : 0; if (delay > 0) { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaConnectionDetails.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaConnectionDetails.java new file mode 100644 index 0000000000..3e1421d445 --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaConnectionDetails.java @@ -0,0 +1,81 @@ +/* + * Copyright 2012-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.kafka; + +import java.util.List; + +import org.springframework.boot.autoconfigure.service.connection.ConnectionDetails; + +/** + * Details required to establish a connection to a Kafka service. + * + * @author Moritz Halbritter + * @author Andy Wilkinson + * @author Phillip Webb + * @since 3.1.0 + */ +public interface KafkaConnectionDetails extends ConnectionDetails { + + /** + * Returns the list of bootstrap nodes. + * @return the list of bootstrap nodes + */ + List getBootstrapNodes(); + + /** + * Returns the list of bootstrap nodes used for consumers. + * @return the list of bootstrap nodes used for consumers + */ + default List getConsumerBootstrapNodes() { + return getBootstrapNodes(); + } + + /** + * Returns the list of bootstrap nodes used for producers. + * @return the list of bootstrap nodes used for producers + */ + default List getProducerBootstrapNodes() { + return getBootstrapNodes(); + } + + /** + * Returns the list of bootstrap nodes used for the admin. + * @return the list of bootstrap nodes used for the admin + */ + default List getAdminBootstrapNodes() { + return getBootstrapNodes(); + } + + /** + * Returns the list of bootstrap nodes used for Kafka Streams. + * @return the list of bootstrap nodes used for Kafka Streams + */ + default List getStreamsBootstrapNodes() { + return getBootstrapNodes(); + } + + /** + * A Kafka node. + * + * @param host the hostname + * @param port the port + */ + record Node(String host, int port) { + + } + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java index feec421f84..a3ae685e7f 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2020 the original author or authors. + * Copyright 2012-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,8 +16,11 @@ package org.springframework.boot.autoconfigure.kafka; +import java.util.List; import java.util.Map; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; @@ -27,6 +30,7 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails.Node; import org.springframework.boot.context.properties.source.InvalidConfigurationPropertyValueException; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -42,6 +46,8 @@ import org.springframework.kafka.core.CleanupConfig; * @author Gary Russell * @author Stephane Nicoll * @author Eddú Meléndez + * @author Moritz Halbritter + * @author Andy Wilkinson */ @Configuration(proxyBeanMethods = false) @ConditionalOnClass(StreamsBuilder.class) @@ -56,17 +62,21 @@ class KafkaStreamsAnnotationDrivenConfiguration { @ConditionalOnMissingBean @Bean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) - KafkaStreamsConfiguration defaultKafkaStreamsConfig(Environment environment) { - Map streamsProperties = this.properties.buildStreamsProperties(); + KafkaStreamsConfiguration defaultKafkaStreamsConfig(Environment environment, + ObjectProvider connectionDetailsProvider) { + KafkaConnectionDetails connectionDetails = connectionDetailsProvider + .getIfAvailable(() -> new PropertiesKafkaConnectionDetails(this.properties)); + Map properties = this.properties.buildStreamsProperties(); + applyKafkaConnectionDetailsForStreams(connectionDetails, properties); if (this.properties.getStreams().getApplicationId() == null) { String applicationName = environment.getProperty("spring.application.name"); if (applicationName == null) { throw new InvalidConfigurationPropertyValueException("spring.kafka.streams.application-id", null, "This property is mandatory and fallback 'spring.application.name' is not set either."); } - streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationName); + properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationName); } - return new KafkaStreamsConfiguration(streamsProperties); + return new KafkaStreamsConfiguration(properties); } @Bean @@ -77,6 +87,19 @@ class KafkaStreamsAnnotationDrivenConfiguration { return new KafkaStreamsFactoryBeanConfigurer(this.properties, factoryBean); } + private void applyKafkaConnectionDetailsForStreams(KafkaConnectionDetails connectionDetails, + Map properties) { + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + nodesToStringList(connectionDetails.getStreamsBootstrapNodes())); + if (!(connectionDetails instanceof PropertiesKafkaConnectionDetails)) { + properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT"); + } + } + + private List nodesToStringList(List nodes) { + return nodes.stream().map((node) -> node.host() + ":" + node.port()).toList(); + } + // Separate class required to avoid BeanCurrentlyInCreationException static class KafkaStreamsFactoryBeanConfigurer implements InitializingBean { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/PropertiesKafkaConnectionDetails.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/PropertiesKafkaConnectionDetails.java new file mode 100644 index 0000000000..83152cd917 --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/PropertiesKafkaConnectionDetails.java @@ -0,0 +1,75 @@ +/* + * Copyright 2012-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.kafka; + +import java.util.List; + +/** + * Adapts {@link KafkaProperties} to {@link KafkaConnectionDetails}. + * + * @author Moritz Halbritter + * @author Andy Wilkinson + * @author Phillip Webb + */ +class PropertiesKafkaConnectionDetails implements KafkaConnectionDetails { + + private final int DEFAULT_PORT = 9092; + + private final KafkaProperties properties; + + PropertiesKafkaConnectionDetails(KafkaProperties properties) { + this.properties = properties; + } + + @Override + public List getBootstrapNodes() { + return asNodes(this.properties.getBootstrapServers()); + } + + @Override + public List getConsumerBootstrapNodes() { + return bootstrapNodes(this.properties.getConsumer().getBootstrapServers()); + } + + @Override + public List getProducerBootstrapNodes() { + return bootstrapNodes(this.properties.getProducer().getBootstrapServers()); + } + + @Override + public List getStreamsBootstrapNodes() { + return bootstrapNodes(this.properties.getStreams().getBootstrapServers()); + } + + private List bootstrapNodes(List bootstrapServers) { + return (bootstrapServers != null) ? asNodes(bootstrapServers) : getBootstrapNodes(); + } + + private List asNodes(List bootstrapServers) { + return bootstrapServers.stream().map(this::asNode).toList(); + } + + private Node asNode(String bootstrapNode) { + int separatorIndex = bootstrapNode.indexOf(':'); + if (separatorIndex == -1) { + return new Node(bootstrapNode, this.DEFAULT_PORT); + } + return new Node(bootstrapNode.substring(0, separatorIndex), + Integer.parseInt(bootstrapNode.substring(separatorIndex + 1))); + } + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 22cb10dcc1..013f574aeb 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -99,6 +99,9 @@ import static org.mockito.Mockito.never; * @author Nakul Mishra * @author Tomaz Fernandes * @author Thomas Kåsene + * @author Moritz Halbritter + * @author Andy Wilkinson + * @author Phillip Webb */ class KafkaAutoConfigurationTests { @@ -161,6 +164,24 @@ class KafkaAutoConfigurationTests { }); } + @Test + void connectionDetailsAreAppliedToConsumer() { + this.contextRunner + .withPropertyValues("spring.kafka.bootstrap-servers=foo:1234", + "spring.kafka.consumer.bootstrap-servers=foo:1234", "spring.kafka.security.protocol=SSL", + "spring.kafka.consumer.security.protocol=SSL") + .withBean(KafkaConnectionDetails.class, this::kafkaConnectionDetails) + .run((context) -> { + DefaultKafkaConsumerFactory consumerFactory = context.getBean(DefaultKafkaConsumerFactory.class); + Map configs = consumerFactory.getConfigurationProperties(); + assertThat(configs).containsEntry(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + Collections.singletonList("kafka.example.com:12345")); + assertThat(configs).containsEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + Collections.singletonList("kafka.example.com:12345")); + assertThat(configs).containsEntry(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT"); + }); + } + @Test void producerProperties() { this.contextRunner.withPropertyValues("spring.kafka.clientId=cid", @@ -211,6 +232,24 @@ class KafkaAutoConfigurationTests { }); } + @Test + void connectionDetailsAreAppliedToProducer() { + this.contextRunner + .withPropertyValues("spring.kafka.bootstrap-servers=foo:1234", + "spring.kafka.producer.bootstrap-servers=foo:1234", "spring.kafka.security.protocol=SSL", + "spring.kafka.producer.security.protocol=SSL") + .withBean(KafkaConnectionDetails.class, this::kafkaConnectionDetails) + .run((context) -> { + DefaultKafkaProducerFactory producerFactory = context.getBean(DefaultKafkaProducerFactory.class); + Map configs = producerFactory.getConfigurationProperties(); + assertThat(configs).containsEntry(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + Collections.singletonList("kafka.example.com:12345")); + assertThat(configs).containsEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + Collections.singletonList("kafka.example.com:12345")); + assertThat(configs).containsEntry(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT"); + }); + } + @Test void adminProperties() { this.contextRunner @@ -252,6 +291,24 @@ class KafkaAutoConfigurationTests { }); } + @Test + void connectionDetailsAreAppliedToAdmin() { + this.contextRunner + .withPropertyValues("spring.kafka.bootstrap-servers=foo:1234", "spring.kafka.security.protocol=SSL", + "spring.kafka.admin.security.protocol=SSL") + .withBean(KafkaConnectionDetails.class, this::kafkaConnectionDetails) + .run((context) -> { + KafkaAdmin admin = context.getBean(KafkaAdmin.class); + Map configs = admin.getConfigurationProperties(); + assertThat(configs).containsEntry(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + Collections.singletonList("kafka.example.com:12345")); + assertThat(configs).containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, + Collections.singletonList("kafka.example.com:12345")); + assertThat(configs).containsEntry(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT"); + assertThat(configs).containsEntry(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT"); + }); + } + @SuppressWarnings("unchecked") @Test void streamsProperties() { @@ -298,6 +355,27 @@ class KafkaAutoConfigurationTests { }); } + @Test + void connectionDetailsAreAppliedToStreams() { + this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class) + .withPropertyValues("spring.kafka.streams.auto-startup=false", "spring.kafka.streams.application-id=test", + "spring.kafka.bootstrap-servers=foo:1234", "spring.kafka.streams.bootstrap-servers=foo:1234", + "spring.kafka.security.protocol=SSL", "spring.kafka.streams.security.protocol=SSL") + .withBean(KafkaConnectionDetails.class, this::kafkaConnectionDetails) + .run((context) -> { + Properties configs = context + .getBean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME, + KafkaStreamsConfiguration.class) + .asProperties(); + assertThat(configs).containsEntry(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + Collections.singletonList("kafka.example.com:12345")); + assertThat(configs).containsEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, + Collections.singletonList("kafka.example.com:12345")); + assertThat(configs).containsEntry(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT"); + assertThat(configs).containsEntry(StreamsConfig.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT"); + }); + } + @SuppressWarnings("deprecation") @Deprecated(since = "3.1.0", forRemoval = true) void streamsCacheMaxSizeBuffering() { @@ -744,6 +822,17 @@ class KafkaAutoConfigurationTests { }); } + private KafkaConnectionDetails kafkaConnectionDetails() { + return new KafkaConnectionDetails() { + + @Override + public List getBootstrapNodes() { + return List.of(new Node("kafka.example.com", 12345)); + } + + }; + } + @Configuration(proxyBeanMethods = false) static class MessageConverterConfiguration {