Add KafkaAdmin Auto Configuration

Spring for Apache Kafka has added a `KafkaAdmin` feature to automatically
add new topics to the broker.

See gh-10309
pull/10239/merge
Gary Russell 7 years ago committed by Stephane Nicoll
parent c7eb0fb281
commit 3e1d9fa856

@ -30,6 +30,7 @@ import org.springframework.context.annotation.Import;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
@ -103,4 +104,12 @@ public class KafkaAutoConfiguration {
return jaas;
}
@Bean
@ConditionalOnMissingBean(KafkaAdmin.class)
public KafkaAdmin kafkaAdmin() {
KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());
kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailIfNoBrokers());
return kafkaAdmin;
}
}

@ -72,6 +72,8 @@ public class KafkaProperties {
private final Producer producer = new Producer();
private final Admin admin = new Admin();
private final Listener listener = new Listener();
private final Ssl ssl = new Ssl();
@ -112,6 +114,10 @@ public class KafkaProperties {
return this.listener;
}
public Admin getAdmin() {
return this.admin;
}
public Ssl getSsl() {
return this.ssl;
}
@ -186,6 +192,20 @@ public class KafkaProperties {
return properties;
}
/**
* Create an initial map of admin properties from the state of this instance.
* <p>
* This allows you to add additional properties, if necessary, and override the
* default kafkaAdmin bean.
* @return the admin properties initialized with the customizations defined on this
* instance
*/
public Map<String, Object> buildAdminProperties() {
Map<String, Object> properties = buildCommonProperties();
properties.putAll(this.admin.buildProperties());
return properties;
}
private static String resourceToPath(Resource resource) {
try {
return resource.getFile().getAbsolutePath();
@ -643,6 +663,81 @@ public class KafkaProperties {
}
public static class Admin {
private final Ssl ssl = new Ssl();
/**
* Id to pass to the server when making requests; used for server-side logging.
*/
private String clientId;
/**
* Additional producer-specific properties used to configure the client.
*/
private final Map<String, String> properties = new HashMap<>();
/**
* When true, the application context will not load if the broker connection
* fails when attempting to provision topics.
*/
private boolean failIfNoBrokers;
public Ssl getSsl() {
return this.ssl;
}
public String getClientId() {
return this.clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public boolean isFailIfNoBrokers() {
return this.failIfNoBrokers;
}
public void setFailIfNoBrokers(boolean failIfNoBrokers) {
this.failIfNoBrokers = failIfNoBrokers;
}
public Map<String, String> getProperties() {
return this.properties;
}
public Map<String, Object> buildProperties() {
Map<String, Object> properties = new HashMap<>();
if (this.clientId != null) {
properties.put(ProducerConfig.CLIENT_ID_CONFIG, this.clientId);
}
if (this.ssl.getKeyPassword() != null) {
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG,
this.ssl.getKeyPassword());
}
if (this.ssl.getKeystoreLocation() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getKeystoreLocation()));
}
if (this.ssl.getKeystorePassword() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
this.ssl.getKeystorePassword());
}
if (this.ssl.getTruststoreLocation() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
resourceToPath(this.ssl.getTruststoreLocation()));
}
if (this.ssl.getTruststorePassword() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
this.ssl.getTruststorePassword());
}
properties.putAll(this.properties);
return properties;
}
}
public static class Template {
/**

@ -19,6 +19,8 @@ package org.springframework.boot.autoconfigure.kafka;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.Producer;
import org.junit.After;
import org.junit.ClassRule;
import org.junit.Test;
@ -27,6 +29,7 @@ import org.springframework.boot.test.util.TestPropertyValues;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.rule.KafkaEmbedded;
@ -43,6 +46,8 @@ public class KafkaAutoConfigurationIntegrationTests {
private static final String TEST_TOPIC = "testTopic";
private static final String ADMIN_CREATED_TOPIC = "adminCreatedTopic";
@ClassRule
public static final KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true,
TEST_TOPIC);
@ -56,13 +61,13 @@ public class KafkaAutoConfigurationIntegrationTests {
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testEndToEnd() throws Exception {
load(KafkaConfig.class,
"spring.kafka.bootstrap-servers:" + kafkaEmbedded.getBrokersAsString(),
"spring.kafka.consumer.group-id=testGroup",
"spring.kafka.consumer.auto-offset-reset=earliest");
@SuppressWarnings("unchecked")
KafkaTemplate<String, String> template = this.context
.getBean(KafkaTemplate.class);
template.send(TEST_TOPIC, "foo", "bar");
@ -70,6 +75,11 @@ public class KafkaAutoConfigurationIntegrationTests {
assertThat(listener.latch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(listener.key).isEqualTo("foo");
assertThat(listener.received).isEqualTo("bar");
DefaultKafkaProducerFactory producerFactory = this.context.getBean(DefaultKafkaProducerFactory.class);
Producer producer = producerFactory.createProducer();
assertThat(producer.partitionsFor(ADMIN_CREATED_TOPIC).size()).isEqualTo(10);
producer.close();
}
private void load(Class<?> config, String... environment) {
@ -93,6 +103,11 @@ public class KafkaAutoConfigurationIntegrationTests {
return new Listener();
}
@Bean
public NewTopic adminCreated() {
return new NewTopic(ADMIN_CREATED_TOPIC, 10, (short) 1);
}
}
public static class Listener {

@ -22,6 +22,7 @@ import java.util.Map;
import javax.security.auth.login.AppConfigurationEntry;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
@ -38,9 +39,11 @@ import org.springframework.context.annotation.AnnotationConfigApplicationContext
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
@ -170,6 +173,38 @@ public class KafkaAutoConfigurationTests {
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
}
@Test
public void adminProperties() {
load("spring.kafka.clientId=cid",
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
"spring.kafka.admin.fail-if-no-brokers=true",
"spring.kafka.admin.properties.fiz.buz=fix.fox",
"spring.kafka.admin.ssl.key-password=p4",
"spring.kafka.admin.ssl.keystore-location=classpath:ksLocP",
"spring.kafka.admin.ssl.keystore-password=p5",
"spring.kafka.admin.ssl.truststore-location=classpath:tsLocP",
"spring.kafka.admin.ssl.truststore-password=p6");
KafkaAdmin admin = this.context
.getBean(KafkaAdmin.class);
Map<String, Object> configs = admin.getConfig();
// common
assertThat(configs.get(AdminClientConfig.CLIENT_ID_CONFIG)).isEqualTo("cid");
// admin
assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).isEqualTo("p4");
assertThat((String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "ksLocP");
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).isEqualTo("p5");
assertThat((String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "tsLocP");
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG))
.isEqualTo("p6");
assertThat(this.context.getBeansOfType(KafkaJaasLoginModuleInitializer.class))
.isEmpty();
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
assertThat(KafkaTestUtils.getPropertyValue(admin, "fatalIfBrokerNotAvailable", Boolean.class)).isTrue();
}
@SuppressWarnings("unchecked")
@Test
public void listenerProperties() {

@ -939,6 +939,15 @@ content into your application; rather pick only the properties that you need.
spring.jms.template.time-to-live= # Time-to-live of a message when sending in milliseconds. Enable QoS when set.
# APACHE KAFKA ({sc-spring-boot-autoconfigure}/kafka/KafkaProperties.{sc-ext}[KafkaProperties])
spring.kafka.admin.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster.
spring.kafka.admin.client-id= # Id to pass to the server when making requests; used for server-side logging.
spring.kafka.admin.fail-if-no-brokers=false # When true, the application context will not load if the broker connection fails when attempting to provision topics.
spring.kafka.admin.properties.*= # Additional admin-specific properties used to configure the client.
spring.kafka.admin.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.admin.ssl.keystore-location= # Location of the key store file.
spring.kafka.admin.ssl.keystore-password= # Store password for the key store file.
spring.kafka.admin.ssl.truststore-location= # Location of the trust store file.
spring.kafka.admin.ssl.truststore-password= # Store password for the trust store file.
spring.kafka.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster.
spring.kafka.client-id= # Id to pass to the server when making requests; used for server-side logging.
spring.kafka.consumer.auto-commit-interval= # Frequency in milliseconds that the consumer offsets are auto-committed to Kafka if 'enable.auto.commit' true.

@ -4821,6 +4821,15 @@ for more of the supported options.
[[boot-features-kafka-topics]]
==== Creating Topics
Spring Boot auto configuration will add a `KafkaAdmin` bean to the application context.
This bean will automatically create a topic for each `NewTopic` bean in the application context.
Topics can only be added with this technique, not modified; if a topic already exists, the `NewTopic` bean is ignored.
[[boot-features-kafka-sending-a-message]]
==== Sending a Message
Spring's `KafkaTemplate` is auto-configured and you can autowire them directly in your own

Loading…
Cancel
Save