Merge pull request #11515 from Jcamilorada:GH-11435
* pr/11515: Polish "Add Kafka health indicator" Add Kafka health indicatorpull/11146/head
commit
c1d7d5ec2a
@ -0,0 +1,76 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2012-2018 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.springframework.boot.actuate.autoconfigure.kafka;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.springframework.boot.actuate.autoconfigure.health.CompositeHealthIndicatorConfiguration;
|
||||||
|
import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
|
||||||
|
import org.springframework.boot.actuate.autoconfigure.health.HealthIndicatorAutoConfiguration;
|
||||||
|
import org.springframework.boot.actuate.health.HealthIndicator;
|
||||||
|
import org.springframework.boot.actuate.kafka.KafkaHealthIndicator;
|
||||||
|
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
|
||||||
|
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
|
||||||
|
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||||
|
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
|
||||||
|
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.kafka.core.KafkaAdmin;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link EnableAutoConfiguration Auto-configuration} for {@link KafkaHealthIndicator}.
|
||||||
|
*
|
||||||
|
* @author Juan Rada
|
||||||
|
*/
|
||||||
|
@Configuration
|
||||||
|
@ConditionalOnClass(KafkaAdmin.class)
|
||||||
|
@ConditionalOnBean(KafkaAdmin.class)
|
||||||
|
@ConditionalOnEnabledHealthIndicator("kafka")
|
||||||
|
@AutoConfigureBefore(HealthIndicatorAutoConfiguration.class)
|
||||||
|
@AutoConfigureAfter(KafkaAutoConfiguration.class)
|
||||||
|
@EnableConfigurationProperties(KafkaHealthIndicatorProperties.class)
|
||||||
|
public class KafkaHealthIndicatorAutoConfiguration extends
|
||||||
|
CompositeHealthIndicatorConfiguration<KafkaHealthIndicator, KafkaAdmin> {
|
||||||
|
|
||||||
|
private final Map<String, KafkaAdmin> admins;
|
||||||
|
|
||||||
|
private final KafkaHealthIndicatorProperties properties;
|
||||||
|
|
||||||
|
KafkaHealthIndicatorAutoConfiguration(Map<String, KafkaAdmin> admins,
|
||||||
|
KafkaHealthIndicatorProperties properties) {
|
||||||
|
this.admins = admins;
|
||||||
|
this.properties = properties;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@ConditionalOnMissingBean(name = "kafkaHealthIndicator")
|
||||||
|
public HealthIndicator kafkaHealthIndicator() {
|
||||||
|
return createHealthIndicator(this.admins);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected KafkaHealthIndicator createHealthIndicator(KafkaAdmin source) {
|
||||||
|
Duration responseTimeout = this.properties.getResponseTimeout();
|
||||||
|
return new KafkaHealthIndicator(source, responseTimeout.toMillis());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,46 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2012-2018 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.springframework.boot.actuate.autoconfigure.kafka;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
|
||||||
|
import org.springframework.boot.actuate.kafka.KafkaHealthIndicator;
|
||||||
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configuration properties for {@link KafkaHealthIndicator}.
|
||||||
|
*
|
||||||
|
* @author Juan Rada
|
||||||
|
* @since 2.0.0
|
||||||
|
*/
|
||||||
|
@ConfigurationProperties(prefix = "management.health.kafka", ignoreUnknownFields = false)
|
||||||
|
public class KafkaHealthIndicatorProperties {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Time to wait for a response from the cluster description operation.
|
||||||
|
*/
|
||||||
|
private Duration responseTimeout = Duration.ofMillis(1000);
|
||||||
|
|
||||||
|
public Duration getResponseTimeout() {
|
||||||
|
return this.responseTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setResponseTimeout(Duration responseTimeout) {
|
||||||
|
this.responseTimeout = responseTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,20 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2012-2018 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Auto-configuration for actuator Apache Kafka support.
|
||||||
|
*/
|
||||||
|
package org.springframework.boot.actuate.autoconfigure.kafka;
|
@ -0,0 +1,56 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2012-2018 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.springframework.boot.actuate.autoconfigure.kafka;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.springframework.boot.actuate.autoconfigure.health.HealthIndicatorAutoConfiguration;
|
||||||
|
import org.springframework.boot.actuate.health.ApplicationHealthIndicator;
|
||||||
|
import org.springframework.boot.actuate.kafka.KafkaHealthIndicator;
|
||||||
|
import org.springframework.boot.autoconfigure.AutoConfigurations;
|
||||||
|
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
|
||||||
|
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests for {@link KafkaHealthIndicatorAutoConfiguration}.
|
||||||
|
*
|
||||||
|
* @author Juan Rada
|
||||||
|
*/
|
||||||
|
public class KafkaHealthIndicatorAutoConfigurationTests {
|
||||||
|
|
||||||
|
private ApplicationContextRunner contextRunner = new ApplicationContextRunner()
|
||||||
|
.withConfiguration(AutoConfigurations.of(KafkaAutoConfiguration.class,
|
||||||
|
KafkaHealthIndicatorAutoConfiguration.class,
|
||||||
|
HealthIndicatorAutoConfiguration.class));
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void runShouldCreateIndicator() {
|
||||||
|
this.contextRunner.run((context) -> assertThat(context)
|
||||||
|
.hasSingleBean(KafkaHealthIndicator.class)
|
||||||
|
.doesNotHaveBean(ApplicationHealthIndicator.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void runWhenDisabledShouldNotCreateIndicator() {
|
||||||
|
this.contextRunner.withPropertyValues("management.health.kafka.enabled:false")
|
||||||
|
.run((context) -> assertThat(context)
|
||||||
|
.doesNotHaveBean(KafkaHealthIndicator.class)
|
||||||
|
.hasSingleBean(ApplicationHealthIndicator.class));
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,89 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2012-2018 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.springframework.boot.actuate.kafka;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.admin.AdminClient;
|
||||||
|
import org.apache.kafka.clients.admin.Config;
|
||||||
|
import org.apache.kafka.clients.admin.DescribeClusterOptions;
|
||||||
|
import org.apache.kafka.clients.admin.DescribeClusterResult;
|
||||||
|
import org.apache.kafka.common.config.ConfigResource;
|
||||||
|
import org.apache.kafka.common.config.ConfigResource.Type;
|
||||||
|
|
||||||
|
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
|
||||||
|
import org.springframework.boot.actuate.health.Health.Builder;
|
||||||
|
import org.springframework.boot.actuate.health.HealthIndicator;
|
||||||
|
import org.springframework.boot.actuate.health.Status;
|
||||||
|
import org.springframework.kafka.core.KafkaAdmin;
|
||||||
|
import org.springframework.util.Assert;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link HealthIndicator} for Kafka cluster.
|
||||||
|
*
|
||||||
|
* @author Juan Rada
|
||||||
|
*/
|
||||||
|
public class KafkaHealthIndicator extends AbstractHealthIndicator {
|
||||||
|
|
||||||
|
static final String REPLICATION_PROPERTY = "transaction.state.log.replication.factor";
|
||||||
|
|
||||||
|
private final KafkaAdmin kafkaAdmin;
|
||||||
|
|
||||||
|
private final DescribeClusterOptions describeOptions;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new {@link KafkaHealthIndicator} instance.
|
||||||
|
*
|
||||||
|
* @param kafkaAdmin the kafka admin
|
||||||
|
* @param requestTimeout the request timeout in milliseconds
|
||||||
|
*/
|
||||||
|
public KafkaHealthIndicator(KafkaAdmin kafkaAdmin, long requestTimeout) {
|
||||||
|
Assert.notNull(kafkaAdmin, "KafkaAdmin must not be null");
|
||||||
|
this.kafkaAdmin = kafkaAdmin;
|
||||||
|
this.describeOptions = new DescribeClusterOptions()
|
||||||
|
.timeoutMs((int) requestTimeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doHealthCheck(Builder builder) throws Exception {
|
||||||
|
try (AdminClient adminClient = AdminClient.create(this.kafkaAdmin.getConfig())) {
|
||||||
|
DescribeClusterResult result = adminClient.describeCluster(
|
||||||
|
this.describeOptions);
|
||||||
|
String brokerId = result.controller().get().idString();
|
||||||
|
int replicationFactor = getReplicationFactor(brokerId, adminClient);
|
||||||
|
int nodes = result.nodes().get().size();
|
||||||
|
Status status = nodes >= replicationFactor ? Status.UP : Status.DOWN;
|
||||||
|
builder.status(status)
|
||||||
|
.withDetail("clusterId", result.clusterId().get())
|
||||||
|
.withDetail("brokerId", brokerId)
|
||||||
|
.withDetail("nodes", nodes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private int getReplicationFactor(String brokerId,
|
||||||
|
AdminClient adminClient) throws ExecutionException, InterruptedException {
|
||||||
|
ConfigResource configResource = new ConfigResource(Type.BROKER, brokerId);
|
||||||
|
Map<ConfigResource, Config> kafkaConfig = adminClient
|
||||||
|
.describeConfigs(Collections.singletonList(configResource)).all().get();
|
||||||
|
Config brokerConfig = kafkaConfig.get(configResource);
|
||||||
|
return Integer.parseInt(brokerConfig.get(REPLICATION_PROPERTY).value());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,20 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2012-2018 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Actuator support for Apache Kafka.
|
||||||
|
*/
|
||||||
|
package org.springframework.boot.actuate.kafka;
|
@ -0,0 +1,101 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2012-2018 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.springframework.boot.actuate.kafka;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.springframework.boot.actuate.health.Health;
|
||||||
|
import org.springframework.boot.actuate.health.Status;
|
||||||
|
import org.springframework.kafka.core.KafkaAdmin;
|
||||||
|
import org.springframework.kafka.test.rule.KafkaEmbedded;
|
||||||
|
import org.springframework.util.SocketUtils;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests for {@link KafkaHealthIndicator}.
|
||||||
|
*
|
||||||
|
* @author Juan Rada
|
||||||
|
* @author Stephane Nicoll
|
||||||
|
*/
|
||||||
|
public class KafkaHealthIndicatorTests {
|
||||||
|
|
||||||
|
private KafkaEmbedded kafkaEmbedded;
|
||||||
|
|
||||||
|
private KafkaAdmin kafkaAdmin;
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void shutdownKafka() throws Exception {
|
||||||
|
if (this.kafkaEmbedded != null) {
|
||||||
|
this.kafkaEmbedded.destroy();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
public void kafkaIsUp() throws Exception {
|
||||||
|
startKafka(1);
|
||||||
|
KafkaHealthIndicator healthIndicator =
|
||||||
|
new KafkaHealthIndicator(this.kafkaAdmin, 1000L);
|
||||||
|
Health health = healthIndicator.health();
|
||||||
|
assertThat(health.getStatus()).isEqualTo(Status.UP);
|
||||||
|
assertDetails(health.getDetails());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void kafkaIsDown() {
|
||||||
|
int freePort = SocketUtils.findAvailableTcpPort();
|
||||||
|
this.kafkaAdmin = new KafkaAdmin(Collections.singletonMap(
|
||||||
|
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:" + freePort));
|
||||||
|
KafkaHealthIndicator healthIndicator =
|
||||||
|
new KafkaHealthIndicator(this.kafkaAdmin, 1L);
|
||||||
|
Health health = healthIndicator.health();
|
||||||
|
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
|
||||||
|
assertThat((String) health.getDetails().get("error")).isNotEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void notEnoughNodesForReplicationFactor() throws Exception {
|
||||||
|
startKafka(2);
|
||||||
|
KafkaHealthIndicator healthIndicator =
|
||||||
|
new KafkaHealthIndicator(this.kafkaAdmin, 1000L);
|
||||||
|
Health health = healthIndicator.health();
|
||||||
|
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
|
||||||
|
assertDetails(health.getDetails());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertDetails(Map<String, Object> details) {
|
||||||
|
assertThat(details).containsEntry("brokerId", "0");
|
||||||
|
assertThat(details).containsKey("clusterId");
|
||||||
|
assertThat(details).containsEntry("nodes", 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startKafka(int replicationFactor) throws Exception {
|
||||||
|
this.kafkaEmbedded = new KafkaEmbedded(1, true);
|
||||||
|
this.kafkaEmbedded.brokerProperties(Collections.singletonMap(
|
||||||
|
KafkaHealthIndicator.REPLICATION_PROPERTY,
|
||||||
|
String.valueOf(replicationFactor)));
|
||||||
|
this.kafkaEmbedded.before();
|
||||||
|
this.kafkaAdmin = new KafkaAdmin(Collections.singletonMap(
|
||||||
|
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
|
||||||
|
this.kafkaEmbedded.getBrokersAsString()));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue