From c4cfc4dd0c773ed12c58d074a55917dfc3443703 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 10 May 2017 13:18:59 -0400 Subject: [PATCH 1/2] Add Kafka Kerberos Configuration Properties See gh-9151 --- .../kafka/KafkaAutoConfiguration.java | 21 ++++++ .../autoconfigure/kafka/KafkaProperties.java | 66 +++++++++++++++++++ .../kafka/KafkaAutoConfigurationTests.java | 17 ++++- .../appendix-application-properties.adoc | 4 ++ 4 files changed, 107 insertions(+), 1 deletion(-) diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java index be2481c38b..c99188757e 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java @@ -16,9 +16,13 @@ package org.springframework.boot.autoconfigure.kafka; +import java.io.IOException; + import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -28,6 +32,7 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.support.LoggingProducerListener; import org.springframework.kafka.support.ProducerListener; @@ -81,4 +86,20 @@ public class KafkaAutoConfiguration { this.properties.buildProducerProperties()); } + @Bean + @ConditionalOnProperty(name = "spring.kafka.jaas.enabled") + @ConditionalOnMissingBean(KafkaJaasLoginModuleInitializer.class) + public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException { + KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer(); + Jaas jaasProperties = this.properties.getJaas(); + if (jaasProperties.getControlFlag() != null) { + jaas.setControlFlag(jaasProperties.getControlFlag()); + } + if (jaasProperties.getLoginModule() != null) { + jaas.setLoginModule(jaasProperties.getLoginModule()); + } + jaas.setOptions(jaasProperties.getOptions()); + return jaas; + } + } diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index 239d11344e..aac811ab5c 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.core.io.Resource; import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; +import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.util.CollectionUtils; /** @@ -74,6 +75,8 @@ public class KafkaProperties { private final Ssl ssl = new Ssl(); + private final Jaas jaas = new Jaas(); + private final Template template = new Template(); public List getBootstrapServers() { @@ -116,6 +119,10 @@ public class KafkaProperties { return this.ssl; } + public Jaas getJaas() { + return this.jaas; + } + public Template getTemplate() { return this.template; } @@ -776,4 +783,63 @@ public class KafkaProperties { } + public static class Jaas { + + /** + * Enable JAAS configuration. + */ + private boolean enabled; + + /** + * Login module. + */ + private String loginModule; + + /** + * AppConfigurationEntry.LoginModuleControlFlag value. + */ + private KafkaJaasLoginModuleInitializer.ControlFlag controlFlag = + KafkaJaasLoginModuleInitializer.ControlFlag.REQUIRED; + + /** + * Map of JAAS options, e.g. 'spring.kafka.jaas.options.useKeyTab=true'. + */ + private final Map options = new HashMap<>(); + + public boolean isEnabled() { + return this.enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public String getLoginModule() { + return this.loginModule; + } + + public void setLoginModule(String loginModule) { + this.loginModule = loginModule; + } + + public KafkaJaasLoginModuleInitializer.ControlFlag getControlFlag() { + return this.controlFlag; + } + + public void setControlFlag(KafkaJaasLoginModuleInitializer.ControlFlag controlFlag) { + this.controlFlag = controlFlag; + } + + public Map getOptions() { + return this.options; + } + + public void setOptions(Map options) { + if (options != null) { + this.options.putAll(options); + } + } + + } + } diff --git a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 5dbb753df6..3fbb699406 100644 --- a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -20,6 +20,8 @@ import java.io.File; import java.util.Collections; import java.util.Map; +import javax.security.auth.login.AppConfigurationEntry; + import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SslConfigs; @@ -38,6 +40,7 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; +import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import static org.assertj.core.api.Assertions.assertThat; @@ -160,6 +163,7 @@ public class KafkaAutoConfigurationTests { assertThat(configs.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2); assertThat(configs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) .isEqualTo(IntegerSerializer.class); + assertThat(this.context.containsBean("kafkaJaasInitializer")).isFalse(); } @Test @@ -169,7 +173,11 @@ public class KafkaAutoConfigurationTests { "spring.kafka.listener.ack-count=123", "spring.kafka.listener.ack-time=456", "spring.kafka.listener.concurrency=3", - "spring.kafka.listener.poll-timeout=2000"); + "spring.kafka.listener.poll-timeout=2000", + "spring.kafka.jaas.enabled=true", + "spring.kafka.jaas.login-module=foo", + "spring.kafka.jaas.control-flag=REQUISITE", + "spring.kafka.jaas.options.useKeyTab=true"); DefaultKafkaProducerFactory producerFactory = this.context .getBean(DefaultKafkaProducerFactory.class); DefaultKafkaConsumerFactory consumerFactory = this.context @@ -189,6 +197,13 @@ public class KafkaAutoConfigurationTests { assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3); assertThat(dfa.getPropertyValue("containerProperties.pollTimeout")) .isEqualTo(2000L); + assertThat(this.context.containsBean("kafkaJaasInitializer")).isTrue(); + KafkaJaasLoginModuleInitializer jaas = this.context.getBean(KafkaJaasLoginModuleInitializer.class); + dfa = new DirectFieldAccessor(jaas); + assertThat(dfa.getPropertyValue("loginModule")).isEqualTo("foo"); + assertThat(dfa.getPropertyValue("controlFlag")) + .isEqualTo(AppConfigurationEntry.LoginModuleControlFlag.REQUISITE); + assertThat(((Map) dfa.getPropertyValue("options")).get("useKeyTab")).isEqualTo("true"); } private void load(String... environment) { diff --git a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc index f5c55b3910..af61320f85 100644 --- a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc +++ b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc @@ -942,6 +942,10 @@ content into your application; rather pick only the properties that you need. spring.kafka.consumer.key-deserializer= # Deserializer class for keys. spring.kafka.consumer.max-poll-records= # Maximum number of records returned in a single call to poll(). spring.kafka.consumer.value-deserializer= # Deserializer class for values. + spring.kafka.jaas.control-flag=REQUIRED # AppConfigurationEntry.LoginModuleControlFlag value. + spring.kafka.jaas.enabled= # Enable JAAS configuration. + spring.kafka.jaas.login-module=com.sun.security.auth.module.Krb5LoginModule # Login module. + spring.kafka.jaas.options= # Map of JAAS options, e.g. 'spring.kafka.jaas.options.useKeyTab=true'. spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME". spring.kafka.listener.ack-mode= # Listener AckMode; see the spring-kafka documentation. spring.kafka.listener.ack-time= # Time in milliseconds between offset commits when ackMode is "TIME" or "COUNT_TIME". From 1480f0717fcfdf5483798c30c1547e155def95c9 Mon Sep 17 00:00:00 2001 From: Stephane Nicoll Date: Sun, 14 May 2017 18:13:00 +0200 Subject: [PATCH 2/2] Polish "Add Kafka Kerberos Configuration Properties" Closes gh-9151 --- .../kafka/KafkaAutoConfiguration.java | 3 ++- .../boot/autoconfigure/kafka/KafkaProperties.java | 6 +++--- .../additional-spring-configuration-metadata.json | 4 ++++ .../kafka/KafkaAutoConfigurationTests.java | 15 ++++++++++----- .../asciidoc/appendix-application-properties.adoc | 4 ++-- 5 files changed, 21 insertions(+), 11 deletions(-) diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java index c99188757e..88f21da1ef 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java @@ -40,6 +40,7 @@ import org.springframework.kafka.support.ProducerListener; * {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka. * * @author Gary Russell + * @author Stephane Nicoll * @since 1.5.0 */ @Configuration @@ -88,7 +89,7 @@ public class KafkaAutoConfiguration { @Bean @ConditionalOnProperty(name = "spring.kafka.jaas.enabled") - @ConditionalOnMissingBean(KafkaJaasLoginModuleInitializer.class) + @ConditionalOnMissingBean public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException { KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer(); Jaas jaasProperties = this.properties.getJaas(); diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index aac811ab5c..d362c87816 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -793,16 +793,16 @@ public class KafkaProperties { /** * Login module. */ - private String loginModule; + private String loginModule = "com.sun.security.auth.module.Krb5LoginModule"; /** - * AppConfigurationEntry.LoginModuleControlFlag value. + * Control flag for login configuration. */ private KafkaJaasLoginModuleInitializer.ControlFlag controlFlag = KafkaJaasLoginModuleInitializer.ControlFlag.REQUIRED; /** - * Map of JAAS options, e.g. 'spring.kafka.jaas.options.useKeyTab=true'. + * Additional JAAS options. */ private final Map options = new HashMap<>(); diff --git a/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json index 9bb77b8370..42d7e3c3e2 100644 --- a/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -325,6 +325,10 @@ "description": "Log a warning for transactions executed without a single enlisted resource.", "defaultValue": true }, + { + "name": "spring.kafka.jaas.control-flag", + "defaultValue": "required" + }, { "name": "spring.mobile.devicedelegatingviewresolver.enabled", "type": "java.lang.Boolean", diff --git a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 3fbb699406..104ccd2d91 100644 --- a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2016 the original author or authors. + * Copyright 2012-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,6 +43,7 @@ import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMo import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.entry; /** * Tests for {@link KafkaAutoConfiguration}. @@ -163,7 +164,8 @@ public class KafkaAutoConfigurationTests { assertThat(configs.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2); assertThat(configs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) .isEqualTo(IntegerSerializer.class); - assertThat(this.context.containsBean("kafkaJaasInitializer")).isFalse(); + assertThat(this.context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)) + .isEmpty(); } @Test @@ -197,13 +199,16 @@ public class KafkaAutoConfigurationTests { assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3); assertThat(dfa.getPropertyValue("containerProperties.pollTimeout")) .isEqualTo(2000L); - assertThat(this.context.containsBean("kafkaJaasInitializer")).isTrue(); - KafkaJaasLoginModuleInitializer jaas = this.context.getBean(KafkaJaasLoginModuleInitializer.class); + assertThat(this.context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)) + .hasSize(1); + KafkaJaasLoginModuleInitializer jaas = this.context.getBean( + KafkaJaasLoginModuleInitializer.class); dfa = new DirectFieldAccessor(jaas); assertThat(dfa.getPropertyValue("loginModule")).isEqualTo("foo"); assertThat(dfa.getPropertyValue("controlFlag")) .isEqualTo(AppConfigurationEntry.LoginModuleControlFlag.REQUISITE); - assertThat(((Map) dfa.getPropertyValue("options")).get("useKeyTab")).isEqualTo("true"); + assertThat(((Map) dfa.getPropertyValue("options"))) + .containsExactly(entry("useKeyTab", "true")); } private void load(String... environment) { diff --git a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc index af61320f85..b1b5ec2ad5 100644 --- a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc +++ b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc @@ -942,10 +942,10 @@ content into your application; rather pick only the properties that you need. spring.kafka.consumer.key-deserializer= # Deserializer class for keys. spring.kafka.consumer.max-poll-records= # Maximum number of records returned in a single call to poll(). spring.kafka.consumer.value-deserializer= # Deserializer class for values. - spring.kafka.jaas.control-flag=REQUIRED # AppConfigurationEntry.LoginModuleControlFlag value. + spring.kafka.jaas.control-flag=required # Control flag for login configuration. spring.kafka.jaas.enabled= # Enable JAAS configuration. spring.kafka.jaas.login-module=com.sun.security.auth.module.Krb5LoginModule # Login module. - spring.kafka.jaas.options= # Map of JAAS options, e.g. 'spring.kafka.jaas.options.useKeyTab=true'. + spring.kafka.jaas.options= # Additional JAAS options. spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME". spring.kafka.listener.ack-mode= # Listener AckMode; see the spring-kafka documentation. spring.kafka.listener.ack-time= # Time in milliseconds between offset commits when ackMode is "TIME" or "COUNT_TIME".