|
|
|
@ -1,5 +1,5 @@
|
|
|
|
|
/*
|
|
|
|
|
* Copyright 2012-2019 the original author or authors.
|
|
|
|
|
* Copyright 2012-2020 the original author or authors.
|
|
|
|
|
*
|
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
|
@ -16,12 +16,24 @@
|
|
|
|
|
|
|
|
|
|
package org.springframework.boot.autoconfigure.cassandra;
|
|
|
|
|
|
|
|
|
|
import java.security.NoSuchAlgorithmException;
|
|
|
|
|
import java.time.Duration;
|
|
|
|
|
import java.util.LinkedHashMap;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.Map;
|
|
|
|
|
|
|
|
|
|
import com.datastax.driver.core.Cluster;
|
|
|
|
|
import com.datastax.driver.core.PoolingOptions;
|
|
|
|
|
import com.datastax.driver.core.QueryOptions;
|
|
|
|
|
import com.datastax.driver.core.SocketOptions;
|
|
|
|
|
import javax.net.ssl.SSLContext;
|
|
|
|
|
|
|
|
|
|
import com.datastax.oss.driver.api.core.CqlSession;
|
|
|
|
|
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
|
|
|
|
|
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
|
|
|
|
|
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
|
|
|
|
|
import com.datastax.oss.driver.api.core.config.DriverOption;
|
|
|
|
|
import com.datastax.oss.driver.api.core.config.ProgrammaticDriverConfigLoaderBuilder;
|
|
|
|
|
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultDriverConfigLoader;
|
|
|
|
|
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultProgrammaticDriverConfigLoaderBuilder;
|
|
|
|
|
import com.typesafe.config.Config;
|
|
|
|
|
import com.typesafe.config.ConfigFactory;
|
|
|
|
|
|
|
|
|
|
import org.springframework.beans.factory.ObjectProvider;
|
|
|
|
|
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
|
|
|
@ -31,7 +43,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
|
|
|
|
|
import org.springframework.boot.context.properties.PropertyMapper;
|
|
|
|
|
import org.springframework.context.annotation.Bean;
|
|
|
|
|
import org.springframework.context.annotation.Configuration;
|
|
|
|
|
import org.springframework.util.StringUtils;
|
|
|
|
|
import org.springframework.context.annotation.Lazy;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* {@link EnableAutoConfiguration Auto-configuration} for Cassandra.
|
|
|
|
@ -44,64 +56,131 @@ import org.springframework.util.StringUtils;
|
|
|
|
|
* @since 1.3.0
|
|
|
|
|
*/
|
|
|
|
|
@Configuration(proxyBeanMethods = false)
|
|
|
|
|
@ConditionalOnClass({ Cluster.class })
|
|
|
|
|
@ConditionalOnClass({ CqlSession.class })
|
|
|
|
|
@EnableConfigurationProperties(CassandraProperties.class)
|
|
|
|
|
public class CassandraAutoConfiguration {
|
|
|
|
|
|
|
|
|
|
@Bean
|
|
|
|
|
@ConditionalOnMissingBean
|
|
|
|
|
public Cluster cassandraCluster(CassandraProperties properties,
|
|
|
|
|
ObjectProvider<ClusterBuilderCustomizer> builderCustomizers,
|
|
|
|
|
ObjectProvider<ClusterFactory> clusterFactory) {
|
|
|
|
|
@Lazy
|
|
|
|
|
public CqlSession cqlSession(CqlSessionBuilder cqlSessionBuilder) {
|
|
|
|
|
return cqlSessionBuilder.build();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Bean
|
|
|
|
|
@ConditionalOnMissingBean
|
|
|
|
|
public CqlSessionBuilder cqlSessionBuilder(CassandraProperties properties, DriverConfigLoader driverConfigLoader,
|
|
|
|
|
ObjectProvider<CqlSessionBuilderCustomizer> builderCustomizers) {
|
|
|
|
|
CqlSessionBuilder builder = CqlSession.builder().withConfigLoader(driverConfigLoader);
|
|
|
|
|
configureSsl(properties, builder);
|
|
|
|
|
builder.withKeyspace(properties.getKeyspaceName());
|
|
|
|
|
builderCustomizers.orderedStream().forEach((customizer) -> customizer.customize(builder));
|
|
|
|
|
return builder;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void configureSsl(CassandraProperties properties, CqlSessionBuilder builder) {
|
|
|
|
|
if (properties.isSsl()) {
|
|
|
|
|
try {
|
|
|
|
|
builder.withSslContext(SSLContext.getDefault());
|
|
|
|
|
}
|
|
|
|
|
catch (NoSuchAlgorithmException ex) {
|
|
|
|
|
throw new IllegalStateException("Could not setup SSL default context for Cassandra", ex);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Bean
|
|
|
|
|
@ConditionalOnMissingBean
|
|
|
|
|
public DriverConfigLoader driverConfigLoader(CassandraProperties properties,
|
|
|
|
|
ObjectProvider<DriverConfigLoaderBuilderCustomizer> builderCustomizers) {
|
|
|
|
|
ProgrammaticDriverConfigLoaderBuilder builder = new DefaultProgrammaticDriverConfigLoaderBuilder(
|
|
|
|
|
() -> cassandraConfiguration(properties), DefaultDriverConfigLoader.DEFAULT_ROOT_PATH);
|
|
|
|
|
builderCustomizers.orderedStream().forEach((customizer) -> customizer.customizer(builder));
|
|
|
|
|
return builder.build();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Config cassandraConfiguration(CassandraProperties properties) {
|
|
|
|
|
CassandraDriverOptions options = new CassandraDriverOptions();
|
|
|
|
|
PropertyMapper map = PropertyMapper.get();
|
|
|
|
|
Cluster.Builder builder = Cluster.builder().withClusterName(properties.getClusterName())
|
|
|
|
|
.withPort(properties.getPort());
|
|
|
|
|
map.from(properties.getSessionName()).whenHasText()
|
|
|
|
|
.to((sessionName) -> options.add(DefaultDriverOption.SESSION_NAME, sessionName));
|
|
|
|
|
map.from(properties::getUsername).whenNonNull()
|
|
|
|
|
.to((username) -> builder.withCredentials(username, properties.getPassword()));
|
|
|
|
|
map.from(properties::getCompression).whenNonNull().to(builder::withCompression);
|
|
|
|
|
QueryOptions queryOptions = getQueryOptions(properties);
|
|
|
|
|
map.from(queryOptions).to(builder::withQueryOptions);
|
|
|
|
|
SocketOptions socketOptions = getSocketOptions(properties);
|
|
|
|
|
map.from(socketOptions).to(builder::withSocketOptions);
|
|
|
|
|
map.from(properties::isSsl).whenTrue().toCall(builder::withSSL);
|
|
|
|
|
PoolingOptions poolingOptions = getPoolingOptions(properties);
|
|
|
|
|
map.from(poolingOptions).to(builder::withPoolingOptions);
|
|
|
|
|
map.from(properties::getContactPoints).as(StringUtils::toStringArray).to(builder::addContactPoints);
|
|
|
|
|
map.from(properties::isJmxEnabled).whenFalse().toCall(builder::withoutJMXReporting);
|
|
|
|
|
builderCustomizers.orderedStream().forEach((customizer) -> customizer.customize(builder));
|
|
|
|
|
return clusterFactory.getIfAvailable(() -> Cluster::buildFrom).create(builder);
|
|
|
|
|
.to((username) -> options.add(DefaultDriverOption.AUTH_PROVIDER_USER_NAME, username)
|
|
|
|
|
.add(DefaultDriverOption.AUTH_PROVIDER_PASSWORD, properties.getPassword()));
|
|
|
|
|
map.from(properties::getCompression).whenNonNull()
|
|
|
|
|
.to((compression) -> options.add(DefaultDriverOption.PROTOCOL_COMPRESSION, compression));
|
|
|
|
|
mapQueryOptions(properties, options);
|
|
|
|
|
mapSocketOptions(properties, options);
|
|
|
|
|
mapPoolingOptions(properties, options);
|
|
|
|
|
map.from(properties::getContactPoints)
|
|
|
|
|
.to((contactPoints) -> options.add(DefaultDriverOption.CONTACT_POINTS, contactPoints));
|
|
|
|
|
ConfigFactory.invalidateCaches();
|
|
|
|
|
return ConfigFactory.defaultOverrides().withFallback(options.build())
|
|
|
|
|
.withFallback(ConfigFactory.defaultReference()).resolve();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private QueryOptions getQueryOptions(CassandraProperties properties) {
|
|
|
|
|
private void mapQueryOptions(CassandraProperties properties, CassandraDriverOptions options) {
|
|
|
|
|
PropertyMapper map = PropertyMapper.get();
|
|
|
|
|
QueryOptions options = new QueryOptions();
|
|
|
|
|
map.from(properties::getConsistencyLevel).whenNonNull().to(options::setConsistencyLevel);
|
|
|
|
|
map.from(properties::getSerialConsistencyLevel).whenNonNull().to(options::setSerialConsistencyLevel);
|
|
|
|
|
map.from(properties::getFetchSize).to(options::setFetchSize);
|
|
|
|
|
return options;
|
|
|
|
|
map.from(properties::getConsistencyLevel).whenNonNull()
|
|
|
|
|
.to(((consistency) -> options.add(DefaultDriverOption.REQUEST_CONSISTENCY, consistency)));
|
|
|
|
|
map.from(properties::getSerialConsistencyLevel).whenNonNull().to(
|
|
|
|
|
(serialConsistency) -> options.add(DefaultDriverOption.REQUEST_SERIAL_CONSISTENCY, serialConsistency));
|
|
|
|
|
map.from(properties::getPageSize)
|
|
|
|
|
.to((pageSize) -> options.add(DefaultDriverOption.REQUEST_PAGE_SIZE, pageSize));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private SocketOptions getSocketOptions(CassandraProperties properties) {
|
|
|
|
|
private void mapSocketOptions(CassandraProperties properties, CassandraDriverOptions options) {
|
|
|
|
|
PropertyMapper map = PropertyMapper.get();
|
|
|
|
|
SocketOptions options = new SocketOptions();
|
|
|
|
|
map.from(properties::getConnectTimeout).whenNonNull().asInt(Duration::toMillis)
|
|
|
|
|
.to(options::setConnectTimeoutMillis);
|
|
|
|
|
map.from(properties::getReadTimeout).whenNonNull().asInt(Duration::toMillis).to(options::setReadTimeoutMillis);
|
|
|
|
|
return options;
|
|
|
|
|
.to((connectTimeout) -> options.add(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, connectTimeout));
|
|
|
|
|
map.from(properties::getReadTimeout).whenNonNull().asInt(Duration::toMillis)
|
|
|
|
|
.to((readTimeout) -> options.add(DefaultDriverOption.REQUEST_TIMEOUT, readTimeout));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private PoolingOptions getPoolingOptions(CassandraProperties properties) {
|
|
|
|
|
private void mapPoolingOptions(CassandraProperties properties, CassandraDriverOptions options) {
|
|
|
|
|
PropertyMapper map = PropertyMapper.get();
|
|
|
|
|
CassandraProperties.Pool poolProperties = properties.getPool();
|
|
|
|
|
PoolingOptions options = new PoolingOptions();
|
|
|
|
|
map.from(poolProperties::getIdleTimeout).whenNonNull().asInt(Duration::getSeconds)
|
|
|
|
|
.to(options::setIdleTimeoutSeconds);
|
|
|
|
|
map.from(poolProperties::getPoolTimeout).whenNonNull().asInt(Duration::toMillis)
|
|
|
|
|
.to(options::setPoolTimeoutMillis);
|
|
|
|
|
.to((idleTimeout) -> options.add(DefaultDriverOption.HEARTBEAT_TIMEOUT, idleTimeout));
|
|
|
|
|
map.from(poolProperties::getHeartbeatInterval).whenNonNull().asInt(Duration::getSeconds)
|
|
|
|
|
.to(options::setHeartbeatIntervalSeconds);
|
|
|
|
|
map.from(poolProperties::getMaxQueueSize).to(options::setMaxQueueSize);
|
|
|
|
|
return options;
|
|
|
|
|
.to((heartBeatInterval) -> options.add(DefaultDriverOption.HEARTBEAT_INTERVAL, heartBeatInterval));
|
|
|
|
|
map.from(poolProperties::getMaxQueueSize)
|
|
|
|
|
.to((maxQueueSize) -> options.add(DefaultDriverOption.REQUEST_THROTTLER_MAX_QUEUE_SIZE, maxQueueSize));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static class CassandraDriverOptions {
|
|
|
|
|
|
|
|
|
|
private final Map<String, String> options = new LinkedHashMap<>();
|
|
|
|
|
|
|
|
|
|
private CassandraDriverOptions add(DriverOption option, String value) {
|
|
|
|
|
String key = createKeyFor(option);
|
|
|
|
|
this.options.put(key, value);
|
|
|
|
|
return this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private CassandraDriverOptions add(DriverOption option, int value) {
|
|
|
|
|
return add(option, String.valueOf(value));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private CassandraDriverOptions add(DriverOption option, Enum<?> value) {
|
|
|
|
|
return add(option, value.name());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private CassandraDriverOptions add(DriverOption option, List<String> values) {
|
|
|
|
|
for (int i = 0; i < values.size(); i++) {
|
|
|
|
|
this.options.put(String.format("%s.%s", createKeyFor(option), i), values.get(i));
|
|
|
|
|
}
|
|
|
|
|
return this;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Config build() {
|
|
|
|
|
return ConfigFactory.parseMap(this.options, "Environment");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static String createKeyFor(DriverOption option) {
|
|
|
|
|
return String.format("%s.%s", DefaultDriverConfigLoader.DEFAULT_ROOT_PATH, option.getPath());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|