Merge pull request #33288 from terminux

* pr/33288:
  Support setting more properties of KafkaAdmin

Closes gh-33288
pull/34049/head
Moritz Halbritter 2 years ago
commit cde0d5a625

@ -1,5 +1,5 @@
/*
* Copyright 2012-2022 the original author or authors.
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -141,8 +141,16 @@ public class KafkaAutoConfiguration {
@ConditionalOnMissingBean
public KafkaAdmin kafkaAdmin() {
KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());
kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());
kafkaAdmin.setModifyTopicConfigs(this.properties.getAdmin().isModifyTopicConfigs());
KafkaProperties.Admin admin = this.properties.getAdmin();
if (admin.getCloseTimeout() != null) {
kafkaAdmin.setCloseTimeout((int) admin.getCloseTimeout().getSeconds());
}
if (admin.getOperationTimeout() != null) {
kafkaAdmin.setOperationTimeout((int) admin.getOperationTimeout().getSeconds());
}
kafkaAdmin.setFatalIfBrokerNotAvailable(admin.isFailFast());
kafkaAdmin.setModifyTopicConfigs(admin.isModifyTopicConfigs());
kafkaAdmin.setAutoCreate(admin.isAutoCreate());
return kafkaAdmin;
}

@ -642,6 +642,16 @@ public class KafkaProperties {
*/
private final Map<String, String> properties = new HashMap<>();
/**
* The close timeout.
*/
private Duration closeTimeout;
/**
* The operation timeout.
*/
private Duration operationTimeout;
/**
* Whether to fail fast if the broker is not available on startup.
*/
@ -652,6 +662,12 @@ public class KafkaProperties {
*/
private boolean modifyTopicConfigs;
/**
* Whether to automatically create topics during context initialization. When set
* to false, disables automatic topic creation during context initialization.
*/
private boolean autoCreate = true;
public Ssl getSsl() {
return this.ssl;
}
@ -668,6 +684,22 @@ public class KafkaProperties {
this.clientId = clientId;
}
public Duration getCloseTimeout() {
return this.closeTimeout;
}
public void setCloseTimeout(Duration closeTimeout) {
this.closeTimeout = closeTimeout;
}
public Duration getOperationTimeout() {
return this.operationTimeout;
}
public void setOperationTimeout(Duration operationTimeout) {
this.operationTimeout = operationTimeout;
}
public boolean isFailFast() {
return this.failFast;
}
@ -684,6 +716,14 @@ public class KafkaProperties {
this.modifyTopicConfigs = modifyTopicConfigs;
}
public boolean isAutoCreate() {
return this.autoCreate;
}
public void setAutoCreate(boolean autoCreate) {
this.autoCreate = autoCreate;
}
public Map<String, String> getProperties() {
return this.properties;
}

@ -17,6 +17,7 @@
package org.springframework.boot.autoconfigure.kafka;
import java.io.File;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -209,14 +210,17 @@ class KafkaAutoConfigurationTests {
@Test
void adminProperties() {
this.contextRunner.withPropertyValues("spring.kafka.clientId=cid",
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz", "spring.kafka.admin.fail-fast=true",
"spring.kafka.admin.properties.fiz.buz=fix.fox", "spring.kafka.admin.security.protocol=SSL",
"spring.kafka.admin.ssl.key-password=p4", "spring.kafka.admin.ssl.key-store-location=classpath:ksLocP",
"spring.kafka.admin.ssl.key-store-password=p5", "spring.kafka.admin.ssl.key-store-type=PKCS12",
"spring.kafka.admin.ssl.trust-store-location=classpath:tsLocP",
"spring.kafka.admin.ssl.trust-store-password=p6", "spring.kafka.admin.ssl.trust-store-type=PKCS12",
"spring.kafka.admin.ssl.protocol=TLSv1.2", "spring.kafka.admin.modify-topic-configs=true")
this.contextRunner
.withPropertyValues("spring.kafka.clientId=cid", "spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
"spring.kafka.admin.fail-fast=true", "spring.kafka.admin.properties.fiz.buz=fix.fox",
"spring.kafka.admin.security.protocol=SSL", "spring.kafka.admin.ssl.key-password=p4",
"spring.kafka.admin.ssl.key-store-location=classpath:ksLocP",
"spring.kafka.admin.ssl.key-store-password=p5", "spring.kafka.admin.ssl.key-store-type=PKCS12",
"spring.kafka.admin.ssl.trust-store-location=classpath:tsLocP",
"spring.kafka.admin.ssl.trust-store-password=p6",
"spring.kafka.admin.ssl.trust-store-type=PKCS12", "spring.kafka.admin.ssl.protocol=TLSv1.2",
"spring.kafka.admin.close-timeout=35s", "spring.kafka.admin.operation-timeout=60s",
"spring.kafka.admin.modify-topic-configs=true", "spring.kafka.admin.auto-create=false")
.run((context) -> {
KafkaAdmin admin = context.getBean(KafkaAdmin.class);
Map<String, Object> configs = admin.getConfigurationProperties();
@ -237,8 +241,11 @@ class KafkaAutoConfigurationTests {
assertThat(context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)).isEmpty();
assertThat(configs).containsEntry("foo.bar.baz", "qux.fiz.buz");
assertThat(configs).containsEntry("fiz.buz", "fix.fox");
assertThat(admin).hasFieldOrPropertyWithValue("closeTimeout", Duration.ofSeconds(35));
assertThat(admin).hasFieldOrPropertyWithValue("operationTimeout", 60);
assertThat(admin).hasFieldOrPropertyWithValue("fatalIfBrokerNotAvailable", true);
assertThat(admin).hasFieldOrPropertyWithValue("modifyTopicConfigs", true);
assertThat(admin).hasFieldOrPropertyWithValue("autoCreate", false);
});
}

Loading…
Cancel
Save