Merge pull request #31679 from m-kay

* pr/31679:
  Polish "Add config property for KafkaAdmin modifyTopicConfigs"
  Add config property for KafkaAdmin modifyTopicConfigs

Closes gh-31679
pull/31712/head
Stephane Nicoll 2 years ago
commit a2ccaa2857

@ -142,6 +142,7 @@ public class KafkaAutoConfiguration {
public KafkaAdmin kafkaAdmin() { public KafkaAdmin kafkaAdmin() {
KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties()); KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());
kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast()); kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());
kafkaAdmin.setModifyTopicConfigs(this.properties.getAdmin().isModifyTopicConfigs());
return kafkaAdmin; return kafkaAdmin;
} }

@ -647,6 +647,11 @@ public class KafkaProperties {
*/ */
private boolean failFast; private boolean failFast;
/**
* Whether to enable modification of existing topic configuration.
*/
private boolean modifyTopicConfigs;
public Ssl getSsl() { public Ssl getSsl() {
return this.ssl; return this.ssl;
} }
@ -671,6 +676,14 @@ public class KafkaProperties {
this.failFast = failFast; this.failFast = failFast;
} }
public boolean isModifyTopicConfigs() {
return this.modifyTopicConfigs;
}
public void setModifyTopicConfigs(boolean modifyTopicConfigs) {
this.modifyTopicConfigs = modifyTopicConfigs;
}
public Map<String, String> getProperties() { public Map<String, String> getProperties() {
return this.properties; return this.properties;
} }

@ -209,15 +209,14 @@ class KafkaAutoConfigurationTests {
@Test @Test
void adminProperties() { void adminProperties() {
this.contextRunner this.contextRunner.withPropertyValues("spring.kafka.clientId=cid",
.withPropertyValues("spring.kafka.clientId=cid", "spring.kafka.properties.foo.bar.baz=qux.fiz.buz", "spring.kafka.properties.foo.bar.baz=qux.fiz.buz", "spring.kafka.admin.fail-fast=true",
"spring.kafka.admin.fail-fast=true", "spring.kafka.admin.properties.fiz.buz=fix.fox", "spring.kafka.admin.properties.fiz.buz=fix.fox", "spring.kafka.admin.security.protocol=SSL",
"spring.kafka.admin.security.protocol=SSL", "spring.kafka.admin.ssl.key-password=p4", "spring.kafka.admin.ssl.key-password=p4", "spring.kafka.admin.ssl.key-store-location=classpath:ksLocP",
"spring.kafka.admin.ssl.key-store-location=classpath:ksLocP", "spring.kafka.admin.ssl.key-store-password=p5", "spring.kafka.admin.ssl.key-store-type=PKCS12",
"spring.kafka.admin.ssl.key-store-password=p5", "spring.kafka.admin.ssl.key-store-type=PKCS12", "spring.kafka.admin.ssl.trust-store-location=classpath:tsLocP",
"spring.kafka.admin.ssl.trust-store-location=classpath:tsLocP", "spring.kafka.admin.ssl.trust-store-password=p6", "spring.kafka.admin.ssl.trust-store-type=PKCS12",
"spring.kafka.admin.ssl.trust-store-password=p6", "spring.kafka.admin.ssl.protocol=TLSv1.2", "spring.kafka.admin.modify-topic-configs=true")
"spring.kafka.admin.ssl.trust-store-type=PKCS12", "spring.kafka.admin.ssl.protocol=TLSv1.2")
.run((context) -> { .run((context) -> {
KafkaAdmin admin = context.getBean(KafkaAdmin.class); KafkaAdmin admin = context.getBean(KafkaAdmin.class);
Map<String, Object> configs = admin.getConfigurationProperties(); Map<String, Object> configs = admin.getConfigurationProperties();
@ -239,6 +238,7 @@ class KafkaAutoConfigurationTests {
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz"); assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox"); assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
assertThat(admin).hasFieldOrPropertyWithValue("fatalIfBrokerNotAvailable", true); assertThat(admin).hasFieldOrPropertyWithValue("fatalIfBrokerNotAvailable", true);
assertThat(admin).hasFieldOrPropertyWithValue("modifyTopicConfigs", true);
}); });
} }

@ -21,12 +21,14 @@ import java.util.Map;
import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.SslConfigs;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Admin;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Cleanup; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Cleanup;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.IsolationLevel; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.IsolationLevel;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener;
import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException; import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.kafka.core.CleanupConfig; import org.springframework.kafka.core.CleanupConfig;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ContainerProperties;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
@ -51,6 +53,14 @@ class KafkaPropertiesTests {
assertThat(original).hasSize(IsolationLevel.values().length); assertThat(original).hasSize(IsolationLevel.values().length);
} }
@Test
void adminDefaultValuesAreConsistent() {
KafkaAdmin admin = new KafkaAdmin(Map.of());
Admin adminProperties = new KafkaProperties().getAdmin();
assertThat(admin).hasFieldOrPropertyWithValue("fatalIfBrokerNotAvailable", adminProperties.isFailFast());
assertThat(admin).hasFieldOrPropertyWithValue("modifyTopicConfigs", adminProperties.isModifyTopicConfigs());
}
@Test @Test
void listenerDefaultValuesAreConsistent() { void listenerDefaultValuesAreConsistent() {
ContainerProperties container = new ContainerProperties("test"); ContainerProperties container = new ContainerProperties("test");

Loading…
Cancel
Save