Add health indicator for Cassandra that uses the CqlSession

This commit provides a CassandraDriverHealthIndicator and
CassandraDriverReactiveHealthIndicator that do not require Spring Data.
As a result, a health indicator for Cassandra is provided even if the
application does not use Spring Data.

See gh-20887
pull/21936/head
Alexandre Dutra 5 years ago committed by Stephane Nicoll
parent 78a9cdcee1
commit dad9ec86d5

@ -31,6 +31,7 @@ dependencies {
optional(platform(project(":spring-boot-project:spring-boot-dependencies")))
optional("ch.qos.logback:logback-classic")
optional("com.datastax.oss:java-driver-core")
optional("com.fasterxml.jackson.dataformat:jackson-dataformat-xml")
optional("com.github.ben-manes.caffeine:caffeine")
optional("com.hazelcast:hazelcast")

@ -0,0 +1,59 @@
/*
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.actuate.autoconfigure.cassandra;
import java.util.Map;
import com.datastax.oss.driver.api.core.CqlSession;
import org.springframework.boot.actuate.autoconfigure.health.CompositeHealthContributorConfiguration;
import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
import org.springframework.boot.actuate.cassandra.CassandraDriverHealthIndicator;
import org.springframework.boot.actuate.health.HealthContributor;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.cassandra.CassandraAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* {@link EnableAutoConfiguration Auto-configuration} for
* {@link CassandraDriverHealthIndicator}.
*
* @author Alexandre Dutra
* @since 2.4.0
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(CqlSession.class)
@ConditionalOnBean(CqlSession.class)
@ConditionalOnEnabledHealthIndicator("cassandra")
@AutoConfigureAfter({ CassandraAutoConfiguration.class, CassandraReactiveHealthContributorAutoConfiguration.class,
CassandraHealthContributorAutoConfiguration.class,
CassandraDriverReactiveHealthContributorAutoConfiguration.class })
public class CassandraDriverHealthContributorAutoConfiguration
extends CompositeHealthContributorConfiguration<CassandraDriverHealthIndicator, CqlSession> {
@Bean
@ConditionalOnMissingBean(name = { "cassandraHealthIndicator", "cassandraHealthContributor" })
public HealthContributor cassandraHealthContributor(Map<String, CqlSession> sessions) {
return createContributor(sessions);
}
}

@ -0,0 +1,58 @@
/*
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.actuate.autoconfigure.cassandra;
import java.util.Map;
import com.datastax.oss.driver.api.core.CqlSession;
import reactor.core.publisher.Flux;
import org.springframework.boot.actuate.autoconfigure.health.CompositeReactiveHealthContributorConfiguration;
import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
import org.springframework.boot.actuate.cassandra.CassandraDriverReactiveHealthIndicator;
import org.springframework.boot.actuate.health.ReactiveHealthContributor;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.cassandra.CassandraAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* {@link EnableAutoConfiguration Auto-configuration} for
* {@link CassandraDriverReactiveHealthIndicator}.
*
* @author Alexandre Dutra
* @since 2.4.0
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ CqlSession.class, Flux.class })
@ConditionalOnBean(CqlSession.class)
@ConditionalOnEnabledHealthIndicator("cassandra")
@AutoConfigureAfter({ CassandraAutoConfiguration.class, CassandraReactiveHealthContributorAutoConfiguration.class,
CassandraHealthContributorAutoConfiguration.class })
public class CassandraDriverReactiveHealthContributorAutoConfiguration
extends CompositeReactiveHealthContributorConfiguration<CassandraDriverReactiveHealthIndicator, CqlSession> {
@Bean
@ConditionalOnMissingBean(name = { "cassandraHealthIndicator", "cassandraHealthContributor" })
public ReactiveHealthContributor cassandraHealthContributor(Map<String, CqlSession> sessions) {
return createContributor(sessions);
}
}

@ -7,6 +7,8 @@ org.springframework.boot.actuate.autoconfigure.beans.BeansEndpointAutoConfigurat
org.springframework.boot.actuate.autoconfigure.cache.CachesEndpointAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.cassandra.CassandraHealthContributorAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.cassandra.CassandraReactiveHealthContributorAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.cassandra.CassandraDriverHealthContributorAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.cassandra.CassandraDriverReactiveHealthContributorAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.cloudfoundry.servlet.CloudFoundryActuatorAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.cloudfoundry.reactive.ReactiveCloudFoundryActuatorAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.condition.ConditionsReportEndpointAutoConfiguration,\

@ -0,0 +1,89 @@
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.actuate.autoconfigure.cassandra;
import com.datastax.oss.driver.api.core.CqlSession;
import org.junit.jupiter.api.Test;
import org.springframework.boot.actuate.autoconfigure.health.HealthContributorAutoConfiguration;
import org.springframework.boot.actuate.cassandra.CassandraDriverHealthIndicator;
import org.springframework.boot.actuate.cassandra.CassandraDriverReactiveHealthIndicator;
import org.springframework.boot.actuate.cassandra.CassandraHealthIndicator;
import org.springframework.boot.actuate.cassandra.CassandraReactiveHealthIndicator;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.data.cassandra.core.CassandraOperations;
import org.springframework.data.cassandra.core.ReactiveCassandraOperations;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
/**
* Tests for {@link CassandraDriverHealthContributorAutoConfiguration}.
*
* @author Alexandre Dutra
* @since 2.4.0
*/
class CassandraDriverHealthContributorAutoConfigurationTests {
private ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withBean(CqlSession.class, () -> mock(CqlSession.class)).withConfiguration(AutoConfigurations.of(
CassandraDriverHealthContributorAutoConfiguration.class, HealthContributorAutoConfiguration.class));
@Test
void runShouldCreateDriverIndicator() {
this.contextRunner.run((context) -> assertThat(context).hasSingleBean(CassandraDriverHealthIndicator.class)
.hasBean("cassandraHealthContributor").doesNotHaveBean(CassandraHealthIndicator.class)
.doesNotHaveBean(CassandraReactiveHealthIndicator.class)
.doesNotHaveBean(CassandraDriverReactiveHealthIndicator.class));
}
@Test
void runWhenDisabledShouldNotCreateDriverIndicator() {
this.contextRunner.withPropertyValues("management.health.cassandra.enabled:false")
.run((context) -> assertThat(context).doesNotHaveBean(CassandraDriverHealthIndicator.class)
.doesNotHaveBean("cassandraHealthContributor"));
}
@Test
void runWhenSpringDataPresentShouldNotCreateDriverIndicator() {
this.contextRunner.withConfiguration(AutoConfigurations.of(CassandraHealthContributorAutoConfiguration.class))
.withBean(CassandraOperations.class, () -> mock(CassandraOperations.class))
.run((context) -> assertThat(context).doesNotHaveBean(CassandraDriverHealthIndicator.class)
.hasSingleBean(CassandraHealthIndicator.class).hasBean("cassandraHealthContributor"));
}
@Test
void runWhenReactorPresentShouldNotCreateDriverIndicator() {
this.contextRunner
.withConfiguration(
AutoConfigurations.of(CassandraDriverReactiveHealthContributorAutoConfiguration.class))
.run((context) -> assertThat(context).doesNotHaveBean(CassandraDriverHealthIndicator.class)
.hasSingleBean(CassandraDriverReactiveHealthIndicator.class)
.hasBean("cassandraHealthContributor"));
}
@Test
void runWhenSpringDataAndReactorPresentShouldNotCreateDriverIndicator() {
this.contextRunner
.withConfiguration(AutoConfigurations.of(CassandraReactiveHealthContributorAutoConfiguration.class))
.withBean(ReactiveCassandraOperations.class, () -> mock(ReactiveCassandraOperations.class))
.run((context) -> assertThat(context).doesNotHaveBean(CassandraDriverHealthIndicator.class)
.hasSingleBean(CassandraReactiveHealthIndicator.class).hasBean("cassandraHealthContributor"));
}
}

@ -0,0 +1,79 @@
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.actuate.autoconfigure.cassandra;
import com.datastax.oss.driver.api.core.CqlSession;
import org.junit.jupiter.api.Test;
import org.springframework.boot.actuate.autoconfigure.health.HealthContributorAutoConfiguration;
import org.springframework.boot.actuate.cassandra.CassandraDriverReactiveHealthIndicator;
import org.springframework.boot.actuate.cassandra.CassandraHealthIndicator;
import org.springframework.boot.actuate.cassandra.CassandraReactiveHealthIndicator;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.data.cassandra.core.CassandraOperations;
import org.springframework.data.cassandra.core.ReactiveCassandraOperations;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
/**
* Tests for {@link CassandraDriverReactiveHealthContributorAutoConfiguration}.
*
* @author Alexandre Dutra
* @since 2.4.0
*/
class CassandraDriverReactiveHealthContributorAutoConfigurationTests {
private ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withBean(CqlSession.class, () -> mock(CqlSession.class))
.withConfiguration(AutoConfigurations.of(CassandraDriverReactiveHealthContributorAutoConfiguration.class,
HealthContributorAutoConfiguration.class));
@Test
void runShouldCreateDriverReactiveIndicator() {
this.contextRunner
.run((context) -> assertThat(context).hasSingleBean(CassandraDriverReactiveHealthIndicator.class)
.hasBean("cassandraHealthContributor").doesNotHaveBean(CassandraHealthIndicator.class)
.doesNotHaveBean(CassandraReactiveHealthIndicator.class));
}
@Test
void runWhenDisabledShouldNotCreateDriverReactiveIndicator() {
this.contextRunner.withPropertyValues("management.health.cassandra.enabled:false")
.run((context) -> assertThat(context).doesNotHaveBean(CassandraDriverReactiveHealthIndicator.class)
.doesNotHaveBean("cassandraHealthContributor"));
}
@Test
void runWhenSpringDataPresentShouldNotCreateDriverReactiveIndicator() {
this.contextRunner.withConfiguration(AutoConfigurations.of(CassandraHealthContributorAutoConfiguration.class))
.withBean(CassandraOperations.class, () -> mock(CassandraOperations.class))
.run((context) -> assertThat(context).doesNotHaveBean(CassandraDriverReactiveHealthIndicator.class)
.hasSingleBean(CassandraHealthIndicator.class).hasBean("cassandraHealthContributor"));
}
@Test
void runWhenSpringDataAndReactorPresentShouldNotCreateDriverReactiveIndicator() {
this.contextRunner
.withConfiguration(AutoConfigurations.of(CassandraReactiveHealthContributorAutoConfiguration.class))
.withBean(ReactiveCassandraOperations.class, () -> mock(ReactiveCassandraOperations.class))
.run((context) -> assertThat(context).doesNotHaveBean(CassandraDriverReactiveHealthIndicator.class)
.hasSingleBean(CassandraReactiveHealthIndicator.class).hasBean("cassandraHealthContributor"));
}
}

@ -13,6 +13,7 @@ dependencies {
api(project(":spring-boot-project:spring-boot"))
optional(platform(project(":spring-boot-project:spring-boot-dependencies")))
optional("com.datastax.oss:java-driver-core")
optional("com.fasterxml.jackson.core:jackson-databind")
optional("com.github.ben-manes.caffeine:caffeine")
optional("com.hazelcast:hazelcast")

@ -0,0 +1,65 @@
/*
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.actuate.cassandra;
import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.util.Assert;
/**
* Simple implementation of a {@link HealthIndicator} returning status information for
* Cassandra data stores.
*
* This health indicator is automatically used when Spring Data Cassandra is not present,
* but the Cassandra driver is.
*
* @author Alexandre Dutra
* @since 2.4.0
*/
public class CassandraDriverHealthIndicator extends AbstractHealthIndicator {
private static final SimpleStatement SELECT = SimpleStatement
.newInstance("SELECT release_version FROM system.local").setConsistencyLevel(ConsistencyLevel.LOCAL_ONE);
private final CqlSession session;
/**
* Create a new {@link CassandraDriverHealthIndicator} instance.
* @param session the {@link CqlSession}.
*/
public CassandraDriverHealthIndicator(CqlSession session) {
super("Cassandra health check failed");
Assert.notNull(session, "session must not be null");
this.session = session;
}
@Override
protected void doHealthCheck(Health.Builder builder) throws Exception {
Row row = this.session.execute(SELECT).one();
builder.up();
if (row != null && !row.isNull(0)) {
builder.withDetail("version", row.getString(0));
}
}
}

@ -0,0 +1,61 @@
/*
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.actuate.cassandra;
import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import reactor.core.publisher.Mono;
import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.ReactiveHealthIndicator;
import org.springframework.util.Assert;
/**
* Simple implementation of a {@link ReactiveHealthIndicator} returning status information
* for Cassandra data stores.
*
* This health indicator is automatically used when Spring Data Cassandra is not present,
* but the Cassandra driver is.
*
* @author Alexandre Dutra
* @since 2.4.0
*/
public class CassandraDriverReactiveHealthIndicator extends AbstractReactiveHealthIndicator {
private static final SimpleStatement SELECT = SimpleStatement
.newInstance("SELECT release_version FROM system.local").setConsistencyLevel(ConsistencyLevel.LOCAL_ONE);
private final CqlSession session;
/**
* Create a new {@link CassandraHealthIndicator} instance.
* @param session the {@link CqlSession}.
*/
public CassandraDriverReactiveHealthIndicator(CqlSession session) {
super("Cassandra health check failed");
Assert.notNull(session, "session must not be null");
this.session = session;
}
@Override
protected Mono<Health> doHealthCheck(Health.Builder builder) {
return Mono.from(this.session.executeReactive(SELECT))
.map((row) -> builder.up().withDetail("version", row.getString(0)).build());
}
}

@ -0,0 +1,74 @@
/*
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.actuate.cassandra;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.DriverTimeoutException;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import org.junit.jupiter.api.Test;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
/**
* Tests for {@link CassandraDriverHealthIndicator}.
*
* @author Alexandre Dutra
* @since 2.4.0
*/
class CassandraDriverHealthIndicatorTests {
@Test
void createWhenCqlSessionIsNullShouldThrowException() {
assertThatIllegalArgumentException().isThrownBy(() -> new CassandraDriverHealthIndicator(null));
}
@Test
void healthWithCassandraUp() {
CqlSession session = mock(CqlSession.class);
ResultSet resultSet = mock(ResultSet.class);
Row row = mock(Row.class);
given(session.execute(any(SimpleStatement.class))).willReturn(resultSet);
given(resultSet.one()).willReturn(row);
given(row.isNull(0)).willReturn(false);
given(row.getString(0)).willReturn("1.0.0");
CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session);
Health health = healthIndicator.health();
assertThat(health.getStatus()).isEqualTo(Status.UP);
assertThat(health.getDetails().get("version")).isEqualTo("1.0.0");
}
@Test
void healthWithCassandraDown() {
CqlSession session = mock(CqlSession.class);
given(session.execute(any(SimpleStatement.class))).willThrow(new DriverTimeoutException("Test Exception"));
CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session);
Health health = healthIndicator.health();
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
assertThat(health.getDetails().get("error"))
.isEqualTo(DriverTimeoutException.class.getName() + ": Test Exception");
}
}

@ -0,0 +1,108 @@
/*
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.actuate.cassandra;
import com.datastax.dse.driver.api.core.cql.reactive.ReactiveResultSet;
import com.datastax.dse.driver.api.core.cql.reactive.ReactiveRow;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.DriverTimeoutException;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import org.junit.jupiter.api.Test;
import org.mockito.stubbing.Answer;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.doAnswer;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.mock;
/**
* Tests for {@link CassandraDriverReactiveHealthIndicator}.
*
* @author Alexandre Dutra
* @since 2.4.0
*/
class CassandraDriverReactiveHealthIndicatorTests {
@Test
void createWhenCqlSessionIsNullShouldThrowException() {
assertThatIllegalArgumentException().isThrownBy(() -> new CassandraDriverReactiveHealthIndicator(null));
}
@Test
void testCassandraIsUp() {
CqlSession session = mock(CqlSession.class);
ReactiveResultSet results = mock(ReactiveResultSet.class);
ReactiveRow row = mock(ReactiveRow.class);
given(session.executeReactive(any(SimpleStatement.class))).willReturn(results);
doAnswer(mockReactiveResultSetBehavior(row)).when(results).subscribe(any());
given(row.getString(0)).willReturn("6.0.0");
CassandraDriverReactiveHealthIndicator cassandraReactiveHealthIndicator = new CassandraDriverReactiveHealthIndicator(
session);
Mono<Health> health = cassandraReactiveHealthIndicator.health();
StepVerifier.create(health).consumeNextWith((h) -> {
assertThat(h.getStatus()).isEqualTo(Status.UP);
assertThat(h.getDetails()).containsOnlyKeys("version");
assertThat(h.getDetails().get("version")).isEqualTo("6.0.0");
}).verifyComplete();
}
@Test
void testCassandraIsDown() {
CqlSession session = mock(CqlSession.class);
given(session.executeReactive(any(SimpleStatement.class)))
.willThrow(new DriverTimeoutException("Test Exception"));
CassandraDriverReactiveHealthIndicator cassandraReactiveHealthIndicator = new CassandraDriverReactiveHealthIndicator(
session);
Mono<Health> health = cassandraReactiveHealthIndicator.health();
StepVerifier.create(health).consumeNextWith((h) -> {
assertThat(h.getStatus()).isEqualTo(Status.DOWN);
assertThat(h.getDetails()).containsOnlyKeys("error");
assertThat(h.getDetails().get("error"))
.isEqualTo(DriverTimeoutException.class.getName() + ": Test Exception");
}).verifyComplete();
}
private Answer<Void> mockReactiveResultSetBehavior(ReactiveRow row) {
return (invocation) -> {
Subscriber<ReactiveRow> subscriber = invocation.getArgument(0);
Subscription s = new Subscription() {
@Override
public void request(long n) {
subscriber.onNext(row);
subscriber.onComplete();
}
@Override
public void cancel() {
}
};
subscriber.onSubscribe(s);
return null;
};
}
}
Loading…
Cancel
Save