Merge branch '2.1.x'

Closes gh-16860
pull/16875/head
Phillip Webb 6 years ago
commit 963a544fb1

@ -1,5 +1,5 @@
/* /*
* Copyright 2012-2017 the original author or authors. * Copyright 2012-2019 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -19,6 +19,7 @@ package org.springframework.boot.actuate.redis;
import java.util.Properties; import java.util.Properties;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator; import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator;
import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.Health;
@ -31,6 +32,7 @@ import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
* *
* @author Stephane Nicoll * @author Stephane Nicoll
* @author Mark Paluch * @author Mark Paluch
* @author Artsiom Yudovin
* @since 2.0.0 * @since 2.0.0
*/ */
public class RedisReactiveHealthIndicator extends AbstractReactiveHealthIndicator { public class RedisReactiveHealthIndicator extends AbstractReactiveHealthIndicator {
@ -44,10 +46,20 @@ public class RedisReactiveHealthIndicator extends AbstractReactiveHealthIndicato
@Override @Override
protected Mono<Health> doHealthCheck(Health.Builder builder) { protected Mono<Health> doHealthCheck(Health.Builder builder) {
ReactiveRedisConnection connection = this.connectionFactory return getConnection()
.getReactiveConnection(); .flatMap((connection) -> doHealthCheck(builder, connection));
}
private Mono<Health> doHealthCheck(Health.Builder builder,
ReactiveRedisConnection connection) {
return connection.serverCommands().info().map((info) -> up(builder, info)) return connection.serverCommands().info().map((info) -> up(builder, info))
.doFinally((signal) -> connection.close()); .onErrorResume((ex) -> Mono.just(down(builder, ex)))
.flatMap((health) -> connection.closeLater().thenReturn(health));
}
private Mono<ReactiveRedisConnection> getConnection() {
return Mono.fromSupplier(this.connectionFactory::getReactiveConnection)
.subscribeOn(Schedulers.parallel());
} }
private Health up(Health.Builder builder, Properties info) { private Health up(Health.Builder builder, Properties info) {
@ -55,4 +67,8 @@ public class RedisReactiveHealthIndicator extends AbstractReactiveHealthIndicato
info.getProperty(RedisHealthIndicator.REDIS_VERSION)).build(); info.getProperty(RedisHealthIndicator.REDIS_VERSION)).build();
} }
private Health down(Health.Builder builder, Throwable cause) {
return builder.down(cause).build();
}
} }

@ -41,6 +41,7 @@ import static org.mockito.Mockito.verify;
* @author Stephane Nicoll * @author Stephane Nicoll
* @author Mark Paluch * @author Mark Paluch
* @author Nikolay Rybak * @author Nikolay Rybak
* @author Artsiom Yudovin
*/ */
public class RedisReactiveHealthIndicatorTests { public class RedisReactiveHealthIndicatorTests {
@ -49,6 +50,7 @@ public class RedisReactiveHealthIndicatorTests {
Properties info = new Properties(); Properties info = new Properties();
info.put("redis_version", "2.8.9"); info.put("redis_version", "2.8.9");
ReactiveRedisConnection redisConnection = mock(ReactiveRedisConnection.class); ReactiveRedisConnection redisConnection = mock(ReactiveRedisConnection.class);
given(redisConnection.closeLater()).willReturn(Mono.empty());
ReactiveServerCommands commands = mock(ReactiveServerCommands.class); ReactiveServerCommands commands = mock(ReactiveServerCommands.class);
given(commands.info()).willReturn(Mono.just(info)); given(commands.info()).willReturn(Mono.just(info));
RedisReactiveHealthIndicator healthIndicator = createHealthIndicator( RedisReactiveHealthIndicator healthIndicator = createHealthIndicator(
@ -59,7 +61,7 @@ public class RedisReactiveHealthIndicatorTests {
assertThat(h.getDetails()).containsOnlyKeys("version"); assertThat(h.getDetails()).containsOnlyKeys("version");
assertThat(h.getDetails().get("version")).isEqualTo("2.8.9"); assertThat(h.getDetails().get("version")).isEqualTo("2.8.9");
}).verifyComplete(); }).verifyComplete();
verify(redisConnection).close(); verify(redisConnection).closeLater();
} }
@Test @Test
@ -68,13 +70,14 @@ public class RedisReactiveHealthIndicatorTests {
given(commands.info()).willReturn( given(commands.info()).willReturn(
Mono.error(new RedisConnectionFailureException("Connection failed"))); Mono.error(new RedisConnectionFailureException("Connection failed")));
ReactiveRedisConnection redisConnection = mock(ReactiveRedisConnection.class); ReactiveRedisConnection redisConnection = mock(ReactiveRedisConnection.class);
given(redisConnection.closeLater()).willReturn(Mono.empty());
RedisReactiveHealthIndicator healthIndicator = createHealthIndicator( RedisReactiveHealthIndicator healthIndicator = createHealthIndicator(
redisConnection, commands); redisConnection, commands);
Mono<Health> health = healthIndicator.health(); Mono<Health> health = healthIndicator.health();
StepVerifier.create(health) StepVerifier.create(health)
.consumeNextWith((h) -> assertThat(h.getStatus()).isEqualTo(Status.DOWN)) .consumeNextWith((h) -> assertThat(h.getStatus()).isEqualTo(Status.DOWN))
.verifyComplete(); .verifyComplete();
verify(redisConnection).close(); verify(redisConnection).closeLater();
} }
@Test @Test

Loading…
Cancel
Save