diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfiguration.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfiguration.java index 04c468f277..803034cbd1 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfiguration.java +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfiguration.java @@ -48,8 +48,8 @@ import org.springframework.kafka.core.KafkaAdmin; @AutoConfigureBefore(HealthIndicatorAutoConfiguration.class) @AutoConfigureAfter(KafkaAutoConfiguration.class) @EnableConfigurationProperties(KafkaHealthIndicatorProperties.class) -public class KafkaHealthIndicatorAutoConfiguration extends - CompositeHealthIndicatorConfiguration { +public class KafkaHealthIndicatorAutoConfiguration + extends CompositeHealthIndicatorConfiguration { private final Map admins; diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfigurationTests.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfigurationTests.java index 41be4c994a..b03583c7e0 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfigurationTests.java @@ -41,9 +41,9 @@ public class KafkaHealthIndicatorAutoConfigurationTests { @Test public void runShouldCreateIndicator() { - this.contextRunner.run((context) -> assertThat(context) - .hasSingleBean(KafkaHealthIndicator.class) - .doesNotHaveBean(ApplicationHealthIndicator.class)); + this.contextRunner.run( + (context) -> assertThat(context).hasSingleBean(KafkaHealthIndicator.class) + .doesNotHaveBean(ApplicationHealthIndicator.class)); } @Test diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java index 8a2c505ffe..5ce3d935e7 100644 --- a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java @@ -63,21 +63,19 @@ public class KafkaHealthIndicator extends AbstractHealthIndicator { @Override protected void doHealthCheck(Builder builder) throws Exception { try (AdminClient adminClient = AdminClient.create(this.kafkaAdmin.getConfig())) { - DescribeClusterResult result = adminClient.describeCluster( - this.describeOptions); + DescribeClusterResult result = adminClient + .describeCluster(this.describeOptions); String brokerId = result.controller().get().idString(); int replicationFactor = getReplicationFactor(brokerId, adminClient); int nodes = result.nodes().get().size(); Status status = nodes >= replicationFactor ? Status.UP : Status.DOWN; - builder.status(status) - .withDetail("clusterId", result.clusterId().get()) - .withDetail("brokerId", brokerId) - .withDetail("nodes", nodes); + builder.status(status).withDetail("clusterId", result.clusterId().get()) + .withDetail("brokerId", brokerId).withDetail("nodes", nodes); } } - private int getReplicationFactor(String brokerId, - AdminClient adminClient) throws ExecutionException, InterruptedException { + private int getReplicationFactor(String brokerId, AdminClient adminClient) + throws ExecutionException, InterruptedException { ConfigResource configResource = new ConfigResource(Type.BROKER, brokerId); Map kafkaConfig = adminClient .describeConfigs(Collections.singletonList(configResource)).all().get(); @@ -86,4 +84,3 @@ public class KafkaHealthIndicator extends AbstractHealthIndicator { } } - diff --git a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java index 8f646a76f6..595d8683b1 100644 --- a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java +++ b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java @@ -49,11 +49,12 @@ public class KafkaHealthIndicatorTests { this.kafkaEmbedded.destroy(); } } + @Test public void kafkaIsUp() throws Exception { startKafka(1); - KafkaHealthIndicator healthIndicator = - new KafkaHealthIndicator(this.kafkaAdmin, 1000L); + KafkaHealthIndicator healthIndicator = new KafkaHealthIndicator(this.kafkaAdmin, + 1000L); Health health = healthIndicator.health(); assertThat(health.getStatus()).isEqualTo(Status.UP); assertDetails(health.getDetails()); @@ -64,8 +65,8 @@ public class KafkaHealthIndicatorTests { int freePort = SocketUtils.findAvailableTcpPort(); this.kafkaAdmin = new KafkaAdmin(Collections.singletonMap( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:" + freePort)); - KafkaHealthIndicator healthIndicator = - new KafkaHealthIndicator(this.kafkaAdmin, 1L); + KafkaHealthIndicator healthIndicator = new KafkaHealthIndicator(this.kafkaAdmin, + 1L); Health health = healthIndicator.health(); assertThat(health.getStatus()).isEqualTo(Status.DOWN); assertThat((String) health.getDetails().get("error")).isNotEmpty(); @@ -74,8 +75,8 @@ public class KafkaHealthIndicatorTests { @Test public void notEnoughNodesForReplicationFactor() throws Exception { startKafka(2); - KafkaHealthIndicator healthIndicator = - new KafkaHealthIndicator(this.kafkaAdmin, 1000L); + KafkaHealthIndicator healthIndicator = new KafkaHealthIndicator(this.kafkaAdmin, + 1000L); Health health = healthIndicator.health(); assertThat(health.getStatus()).isEqualTo(Status.DOWN); assertDetails(health.getDetails()); @@ -89,13 +90,13 @@ public class KafkaHealthIndicatorTests { private void startKafka(int replicationFactor) throws Exception { this.kafkaEmbedded = new KafkaEmbedded(1, true); - this.kafkaEmbedded.brokerProperties(Collections.singletonMap( - KafkaHealthIndicator.REPLICATION_PROPERTY, - String.valueOf(replicationFactor))); + this.kafkaEmbedded.brokerProperties( + Collections.singletonMap(KafkaHealthIndicator.REPLICATION_PROPERTY, + String.valueOf(replicationFactor))); this.kafkaEmbedded.before(); - this.kafkaAdmin = new KafkaAdmin(Collections.singletonMap( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, - this.kafkaEmbedded.getBrokersAsString())); + this.kafkaAdmin = new KafkaAdmin( + Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + this.kafkaEmbedded.getBrokersAsString())); } }