Add R2BC connection factory health check

This commit adds an health indicator for R2DBC. If a validation query is
provided, it is used to validate the state of the database. If not, a
check of the connection is issued.

See gh-19988

Co-authored-by: Mark Paluch <mpaluch@pivotal.io>
pull/20318/head
Stephane Nicoll 5 years ago
parent 6817856e7d
commit bee7302fc7

@ -61,6 +61,7 @@ dependencies {
optional("io.micrometer:micrometer-registry-statsd")
optional("io.micrometer:micrometer-registry-wavefront")
optional("io.projectreactor.netty:reactor-netty")
optional("io.r2dbc:r2dbc-spi")
optional("jakarta.jms:jakarta.jms-api")
optional("jakarta.servlet:jakarta.servlet-api")
optional("javax.cache:cache-api")
@ -107,6 +108,7 @@ dependencies {
testImplementation(project(":spring-boot-project:spring-boot-test"))
testImplementation(project(":spring-boot-project:spring-boot-tools:spring-boot-test-support"))
testImplementation("io.projectreactor:reactor-test")
testImplementation("io.r2dbc:r2dbc-h2")
testImplementation("com.squareup.okhttp3:mockwebserver")
testImplementation("com.jayway.jsonpath:json-path")
testImplementation("io.undertow:undertow-core")

@ -0,0 +1,63 @@
/*
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.actuate.autoconfigure.r2dbc;
import java.util.Map;
import io.r2dbc.spi.ConnectionFactory;
import org.springframework.boot.actuate.autoconfigure.health.CompositeReactiveHealthContributorConfiguration;
import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
import org.springframework.boot.actuate.health.ReactiveHealthContributor;
import org.springframework.boot.actuate.r2dbc.ConnectionFactoryHealthIndicator;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.r2dbc.R2dbcAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* {@link EnableAutoConfiguration Auto-configuration} for
* {@link ConnectionFactoryHealthIndicator}.
*
* @author Mark Paluch
* @since 2.3.0
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(ConnectionFactory.class)
@ConditionalOnBean(ConnectionFactory.class)
@ConditionalOnEnabledHealthIndicator("r2dbc")
@AutoConfigureAfter(R2dbcAutoConfiguration.class)
public class ConnectionFactoryHealthContributorAutoConfiguration
extends CompositeReactiveHealthContributorConfiguration<ConnectionFactoryHealthIndicator, ConnectionFactory> {
private final Map<String, ConnectionFactory> connectionFactory;
ConnectionFactoryHealthContributorAutoConfiguration(Map<String, ConnectionFactory> connectionFactory) {
this.connectionFactory = connectionFactory;
}
@Bean
@ConditionalOnMissingBean(name = { "r2dbcHealthIndicator", "r2dbcHealthContributor" })
public ReactiveHealthContributor r2dbcHealthContributor() {
return createContributor(this.connectionFactory);
}
}

@ -0,0 +1,20 @@
/*
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Auto-configuration for actuator R2DBC.
*/
package org.springframework.boot.actuate.autoconfigure.r2dbc;

@ -75,6 +75,7 @@ org.springframework.boot.actuate.autoconfigure.metrics.web.tomcat.TomcatMetricsA
org.springframework.boot.actuate.autoconfigure.mongo.MongoHealthContributorAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.mongo.MongoReactiveHealthContributorAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.neo4j.Neo4jHealthContributorAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.r2dbc.ConnectionFactoryHealthContributorAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.redis.RedisHealthContributorAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.redis.RedisReactiveHealthContributorAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.scheduling.ScheduledTasksEndpointAutoConfiguration,\

@ -0,0 +1,59 @@
/*
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.actuate.autoconfigure.r2dbc;
import org.junit.jupiter.api.Test;
import org.springframework.boot.actuate.autoconfigure.health.HealthContributorAutoConfiguration;
import org.springframework.boot.actuate.r2dbc.ConnectionFactoryHealthIndicator;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.autoconfigure.r2dbc.R2dbcAutoConfiguration;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Test for {@link ConnectionFactoryHealthContributorAutoConfiguration}.
*
* @author Stephane Nicoll
*/
class ConnectionFactoryHealthContributorAutoConfigurationTests {
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(ConnectionFactoryHealthContributorAutoConfiguration.class,
HealthContributorAutoConfiguration.class));
@Test
void runShouldCreateIndicator() {
this.contextRunner.withConfiguration(AutoConfigurations.of(R2dbcAutoConfiguration.class))
.run((context) -> assertThat(context).hasSingleBean(ConnectionFactoryHealthIndicator.class));
}
@Test
void runWithNoConnectionFactoryShouldNotCreateIndicator() {
this.contextRunner
.run((context) -> assertThat(context).doesNotHaveBean(ConnectionFactoryHealthIndicator.class));
}
@Test
void runWhenDisabledShouldNotCreateIndicator() {
this.contextRunner.withConfiguration(AutoConfigurations.of(R2dbcAutoConfiguration.class))
.withPropertyValues("management.health.r2dbc.enabled:false")
.run((context) -> assertThat(context).doesNotHaveBean(ConnectionFactoryHealthIndicator.class));
}
}

@ -24,6 +24,7 @@ dependencies {
optional("io.micrometer:micrometer-core")
optional("io.micrometer:micrometer-registry-prometheus")
optional("io.prometheus:simpleclient_pushgateway")
optional("io.r2dbc:r2dbc-spi")
optional("io.reactivex:rxjava-reactive-streams")
optional("org.elasticsearch.client:elasticsearch-rest-client")
optional("io.undertow:undertow-servlet") {
@ -70,6 +71,7 @@ dependencies {
testImplementation("org.assertj:assertj-core")
testImplementation("com.jayway.jsonpath:json-path")
testImplementation("io.projectreactor:reactor-test")
testImplementation("io.r2dbc:r2dbc-h2")
testImplementation("org.apache.logging.log4j:log4j-to-slf4j")
testImplementation("org.awaitility:awaitility")
testImplementation("org.glassfish.jersey.media:jersey-media-json-jackson")

@ -0,0 +1,104 @@
/*
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.actuate.r2dbc;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.Row;
import io.r2dbc.spi.RowMetadata;
import io.r2dbc.spi.ValidationDepth;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Health.Builder;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.boot.actuate.health.Status;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* A {@link HealthIndicator} to validate a R2DBC {@link ConnectionFactory}.
*
* @author Mark Paluch
* @author Stephane Nicoll
* @since 2.3.0
*/
public class ConnectionFactoryHealthIndicator extends AbstractReactiveHealthIndicator {
private final ConnectionFactory connectionFactory;
private final String validationQuery;
/**
* Create a new {@link ConnectionFactoryHealthIndicator} using the specified
* {@link ConnectionFactory} and no validation query.
* @param connectionFactory the connection factory
* @see Connection#validate(ValidationDepth)
*/
public ConnectionFactoryHealthIndicator(ConnectionFactory connectionFactory) {
this(connectionFactory, null);
}
/**
* Create a new {@link ConnectionFactoryHealthIndicator} using the specified
* {@link ConnectionFactory} and validation query.
* @param connectionFactory the connection factory
* @param validationQuery the validation query, can be {@code null} to use connection
* validation
*/
public ConnectionFactoryHealthIndicator(ConnectionFactory connectionFactory, String validationQuery) {
Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
this.connectionFactory = connectionFactory;
this.validationQuery = validationQuery;
}
@Override
protected final Mono<Health> doHealthCheck(Builder builder) {
return validate(builder).defaultIfEmpty(builder.build()).onErrorResume(Exception.class,
(ex) -> Mono.just(builder.down(ex).build()));
}
private Mono<Health> validate(Builder builder) {
builder.withDetail("database", this.connectionFactory.getMetadata().getName());
return (StringUtils.hasText(this.validationQuery)) ? validateWithQuery(builder)
: validateWithConnectionValidation(builder);
}
private Mono<Health> validateWithQuery(Builder builder) {
builder.withDetail("validationQuery", this.validationQuery);
Mono<Object> connectionValidation = Mono.usingWhen(this.connectionFactory.create(),
(conn) -> Flux.from(conn.createStatement(this.validationQuery).execute())
.flatMap((it) -> it.map(this::extractResult)).next(),
Connection::close, (o, throwable) -> o.close(), Connection::close);
return connectionValidation.map((result) -> builder.up().withDetail("result", result).build());
}
private Mono<Health> validateWithConnectionValidation(Builder builder) {
builder.withDetail("validationQuery", "validate(REMOTE)");
Mono<Boolean> connectionValidation = Mono.usingWhen(this.connectionFactory.create(),
(connection) -> Mono.from(connection.validate(ValidationDepth.REMOTE)), Connection::close,
(connection, ex) -> connection.close(), Connection::close);
return connectionValidation.map((valid) -> builder.status((valid) ? Status.UP : Status.DOWN).build());
}
private Object extractResult(Row row, RowMetadata metadata) {
return row.get(metadata.getColumnMetadatas().iterator().next().getName());
}
}

@ -0,0 +1,20 @@
/*
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Actuator support for R2DBC.
*/
package org.springframework.boot.actuate.r2dbc;

@ -0,0 +1,142 @@
/*
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.actuate.r2dbc;
import java.util.Collections;
import java.util.UUID;
import io.r2dbc.h2.CloseableConnectionFactory;
import io.r2dbc.h2.H2ConnectionFactory;
import io.r2dbc.h2.H2ConnectionOption;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.ValidationDepth;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import org.springframework.boot.actuate.health.ReactiveHealthIndicator;
import org.springframework.boot.actuate.health.Status;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
/**
* Tests for {@link ConnectionFactoryHealthIndicator}.
*
* @author Mark Paluch
* @author Stephane Nicoll
*/
class ConnectionFactoryHealthIndicatorTests {
@Test
void healthIndicatorWhenDatabaseUpWithConnectionValidation() {
CloseableConnectionFactory connectionFactory = createTestDatabase();
try {
ConnectionFactoryHealthIndicator healthIndicator = new ConnectionFactoryHealthIndicator(connectionFactory);
healthIndicator.health().as(StepVerifier::create).assertNext((actual) -> {
assertThat(actual.getStatus()).isEqualTo(Status.UP);
assertThat(actual.getDetails()).containsOnly(entry("database", "H2"),
entry("validationQuery", "validate(REMOTE)"));
}).verifyComplete();
}
finally {
connectionFactory.close();
}
}
@Test
void healthIndicatorWhenDatabaseDownWithConnectionValidation() {
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
given(connectionFactory.getMetadata()).willReturn(() -> "mock");
RuntimeException exception = new RuntimeException("test");
given(connectionFactory.create()).willReturn(Mono.error(exception));
ConnectionFactoryHealthIndicator healthIndicator = new ConnectionFactoryHealthIndicator(connectionFactory);
healthIndicator.health().as(StepVerifier::create).assertNext((actual) -> {
assertThat(actual.getStatus()).isEqualTo(Status.DOWN);
assertThat(actual.getDetails()).containsOnly(entry("database", "mock"),
entry("validationQuery", "validate(REMOTE)"), entry("error", "java.lang.RuntimeException: test"));
}).verifyComplete();
}
@Test
void healthIndicatorWhenConnectionValidationFails() {
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
given(connectionFactory.getMetadata()).willReturn(() -> "mock");
Connection connection = mock(Connection.class);
given(connection.validate(ValidationDepth.REMOTE)).willReturn(Mono.just(false));
given(connection.close()).willReturn(Mono.empty());
given(connectionFactory.create()).willAnswer((invocation) -> Mono.just(connection));
ConnectionFactoryHealthIndicator healthIndicator = new ConnectionFactoryHealthIndicator(connectionFactory);
healthIndicator.health().as(StepVerifier::create).assertNext((actual) -> {
assertThat(actual.getStatus()).isEqualTo(Status.DOWN);
assertThat(actual.getDetails()).containsOnly(entry("database", "mock"),
entry("validationQuery", "validate(REMOTE)"));
}).verifyComplete();
}
@Test
void healthIndicatorWhenDatabaseUpWithSuccessValidationQuery() {
CloseableConnectionFactory connectionFactory = createTestDatabase();
try {
String customValidationQuery = "SELECT COUNT(*) from HEALTH_TEST";
Mono.from(connectionFactory.create()).flatMapMany((it) -> Flux
.from(it.createStatement("CREATE TABLE HEALTH_TEST (id INTEGER IDENTITY PRIMARY KEY)").execute())
.flatMap(Result::getRowsUpdated).thenMany(it.close())).as(StepVerifier::create).verifyComplete();
ReactiveHealthIndicator healthIndicator = new ConnectionFactoryHealthIndicator(connectionFactory,
customValidationQuery);
healthIndicator.health().as(StepVerifier::create).assertNext((actual) -> {
assertThat(actual.getStatus()).isEqualTo(Status.UP);
assertThat(actual.getDetails()).containsOnly(entry("database", "H2"), entry("result", 0L),
entry("validationQuery", customValidationQuery));
}).verifyComplete();
}
finally {
connectionFactory.close();
}
}
@Test
void healthIndicatorWhenDatabaseUpWithFailureValidationQuery() {
CloseableConnectionFactory connectionFactory = createTestDatabase();
try {
String invalidValidationQuery = "SELECT COUNT(*) from DOES_NOT_EXIST";
ReactiveHealthIndicator healthIndicator = new ConnectionFactoryHealthIndicator(connectionFactory,
invalidValidationQuery);
healthIndicator.health().as(StepVerifier::create).assertNext((actual) -> {
assertThat(actual.getStatus()).isEqualTo(Status.DOWN);
assertThat(actual.getDetails()).contains(entry("database", "H2"),
entry("validationQuery", invalidValidationQuery));
assertThat(actual.getDetails()).containsOnlyKeys("database", "error", "validationQuery");
}).verifyComplete();
}
finally {
connectionFactory.close();
}
}
private CloseableConnectionFactory createTestDatabase() {
return H2ConnectionFactory.inMemory("db-" + UUID.randomUUID(), "sa", "",
Collections.singletonMap(H2ConnectionOption.DB_CLOSE_DELAY, "-1"));
}
}
Loading…
Cancel
Save