Add support for Elasticsearch sniffer

Closes gh-24174
pull/24610/head
Stephane Nicoll 4 years ago
parent 1e8d5c37ed
commit 1fd17cf641

@ -76,6 +76,7 @@ dependencies {
optional("org.eclipse.jetty.websocket:javax-websocket-server-impl")
optional("org.ehcache:ehcache")
optional("org.elasticsearch.client:elasticsearch-rest-client")
optional("org.elasticsearch.client:elasticsearch-rest-client-sniffer")
optional("org.elasticsearch.client:elasticsearch-rest-high-level-client")
optional("org.flywaydb:flyway-core")
optional("org.freemarker:freemarker")

@ -16,30 +16,18 @@
package org.springframework.boot.autoconfigure.elasticsearch;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.Credentials;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientConfigurations.RestClientBuilderConfiguration;
import org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientConfigurations.RestClientSnifferConfiguration;
import org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientConfigurations.RestHighLevelClientConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
import org.springframework.context.annotation.Import;
/**
* {@link EnableAutoConfiguration Auto-configuration} for Elasticsearch REST clients.
@ -52,139 +40,8 @@ import org.springframework.util.StringUtils;
@ConditionalOnClass(RestHighLevelClient.class)
@ConditionalOnMissingBean(RestClient.class)
@EnableConfigurationProperties(ElasticsearchRestClientProperties.class)
@Import({ RestClientBuilderConfiguration.class, RestHighLevelClientConfiguration.class,
RestClientSnifferConfiguration.class })
public class ElasticsearchRestClientAutoConfiguration {
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(RestClientBuilder.class)
static class RestClientBuilderConfiguration {
@Bean
RestClientBuilderCustomizer defaultRestClientBuilderCustomizer(ElasticsearchRestClientProperties properties) {
return new DefaultRestClientBuilderCustomizer(properties);
}
@Bean
RestClientBuilder elasticsearchRestClientBuilder(ElasticsearchRestClientProperties properties,
ObjectProvider<RestClientBuilderCustomizer> builderCustomizers) {
HttpHost[] hosts = properties.getUris().stream().map(this::createHttpHost).toArray(HttpHost[]::new);
RestClientBuilder builder = RestClient.builder(hosts);
builder.setHttpClientConfigCallback((httpClientBuilder) -> {
builderCustomizers.orderedStream().forEach((customizer) -> customizer.customize(httpClientBuilder));
return httpClientBuilder;
});
builder.setRequestConfigCallback((requestConfigBuilder) -> {
builderCustomizers.orderedStream().forEach((customizer) -> customizer.customize(requestConfigBuilder));
return requestConfigBuilder;
});
builderCustomizers.orderedStream().forEach((customizer) -> customizer.customize(builder));
return builder;
}
private HttpHost createHttpHost(String uri) {
try {
return createHttpHost(URI.create(uri));
}
catch (IllegalArgumentException ex) {
return HttpHost.create(uri);
}
}
private HttpHost createHttpHost(URI uri) {
if (!StringUtils.hasLength(uri.getUserInfo())) {
return HttpHost.create(uri.toString());
}
try {
return HttpHost.create(new URI(uri.getScheme(), null, uri.getHost(), uri.getPort(), uri.getPath(),
uri.getQuery(), uri.getFragment()).toString());
}
catch (URISyntaxException ex) {
throw new IllegalStateException(ex);
}
}
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(RestHighLevelClient.class)
static class RestHighLevelClientConfiguration {
@Bean
RestHighLevelClient elasticsearchRestHighLevelClient(RestClientBuilder restClientBuilder) {
return new RestHighLevelClient(restClientBuilder);
}
}
static class DefaultRestClientBuilderCustomizer implements RestClientBuilderCustomizer {
private static final PropertyMapper map = PropertyMapper.get();
private final ElasticsearchRestClientProperties properties;
DefaultRestClientBuilderCustomizer(ElasticsearchRestClientProperties properties) {
this.properties = properties;
}
@Override
public void customize(RestClientBuilder builder) {
}
@Override
public void customize(HttpAsyncClientBuilder builder) {
builder.setDefaultCredentialsProvider(new PropertiesCredentialsProvider(this.properties));
}
@Override
public void customize(RequestConfig.Builder builder) {
map.from(this.properties::getConnectionTimeout).whenNonNull().asInt(Duration::toMillis)
.to(builder::setConnectTimeout);
map.from(this.properties::getReadTimeout).whenNonNull().asInt(Duration::toMillis)
.to(builder::setSocketTimeout);
}
}
private static class PropertiesCredentialsProvider extends BasicCredentialsProvider {
PropertiesCredentialsProvider(ElasticsearchRestClientProperties properties) {
if (StringUtils.hasText(properties.getUsername())) {
Credentials credentials = new UsernamePasswordCredentials(properties.getUsername(),
properties.getPassword());
setCredentials(AuthScope.ANY, credentials);
}
properties.getUris().stream().map(this::toUri).filter(this::hasUserInfo)
.forEach(this::addUserInfoCredentials);
}
private URI toUri(String uri) {
try {
return URI.create(uri);
}
catch (IllegalArgumentException ex) {
return null;
}
}
private boolean hasUserInfo(URI uri) {
return uri != null && StringUtils.hasLength(uri.getUserInfo());
}
private void addUserInfoCredentials(URI uri) {
AuthScope authScope = new AuthScope(uri.getHost(), uri.getPort());
Credentials credentials = createUserInfoCredentials(uri.getUserInfo());
setCredentials(authScope, credentials);
}
private Credentials createUserInfoCredentials(String userInfo) {
int delimiter = userInfo.indexOf(":");
if (delimiter == -1) {
return new UsernamePasswordCredentials(userInfo, null);
}
String username = userInfo.substring(0, delimiter);
String password = userInfo.substring(delimiter + 1);
return new UsernamePasswordCredentials(username, password);
}
}
}

@ -0,0 +1,204 @@
/*
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.elasticsearch;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.Credentials;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.sniff.Sniffer;
import org.elasticsearch.client.sniff.SnifferBuilder;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
/**
* Elasticsearch rest client configurations.
*
* @author Stephane Nicoll
*/
class ElasticsearchRestClientConfigurations {
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(RestClientBuilder.class)
static class RestClientBuilderConfiguration {
@Bean
RestClientBuilderCustomizer defaultRestClientBuilderCustomizer(ElasticsearchRestClientProperties properties) {
return new DefaultRestClientBuilderCustomizer(properties);
}
@Bean
RestClientBuilder elasticsearchRestClientBuilder(ElasticsearchRestClientProperties properties,
ObjectProvider<RestClientBuilderCustomizer> builderCustomizers) {
HttpHost[] hosts = properties.getUris().stream().map(this::createHttpHost).toArray(HttpHost[]::new);
RestClientBuilder builder = RestClient.builder(hosts);
builder.setHttpClientConfigCallback((httpClientBuilder) -> {
builderCustomizers.orderedStream().forEach((customizer) -> customizer.customize(httpClientBuilder));
return httpClientBuilder;
});
builder.setRequestConfigCallback((requestConfigBuilder) -> {
builderCustomizers.orderedStream().forEach((customizer) -> customizer.customize(requestConfigBuilder));
return requestConfigBuilder;
});
builderCustomizers.orderedStream().forEach((customizer) -> customizer.customize(builder));
return builder;
}
private HttpHost createHttpHost(String uri) {
try {
return createHttpHost(URI.create(uri));
}
catch (IllegalArgumentException ex) {
return HttpHost.create(uri);
}
}
private HttpHost createHttpHost(URI uri) {
if (!StringUtils.hasLength(uri.getUserInfo())) {
return HttpHost.create(uri.toString());
}
try {
return HttpHost.create(new URI(uri.getScheme(), null, uri.getHost(), uri.getPort(), uri.getPath(),
uri.getQuery(), uri.getFragment()).toString());
}
catch (URISyntaxException ex) {
throw new IllegalStateException(ex);
}
}
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(RestHighLevelClient.class)
static class RestHighLevelClientConfiguration {
@Bean
RestHighLevelClient elasticsearchRestHighLevelClient(RestClientBuilder restClientBuilder) {
return new RestHighLevelClient(restClientBuilder);
}
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(Sniffer.class)
@ConditionalOnSingleCandidate(RestHighLevelClient.class)
static class RestClientSnifferConfiguration {
@Bean
@ConditionalOnMissingBean
Sniffer elasticsearchSniffer(RestHighLevelClient client, ElasticsearchRestClientProperties properties) {
SnifferBuilder builder = Sniffer.builder(client.getLowLevelClient());
PropertyMapper get = PropertyMapper.get().alwaysApplyingWhenNonNull();
get.from(properties.getSniffer().getInterval()).asInt(Duration::toMillis)
.to(builder::setSniffIntervalMillis);
get.from(properties.getSniffer().getDelayAfterFailure()).asInt(Duration::toMillis)
.to(builder::setSniffAfterFailureDelayMillis);
return builder.build();
}
}
static class DefaultRestClientBuilderCustomizer implements RestClientBuilderCustomizer {
private static final PropertyMapper map = PropertyMapper.get();
private final ElasticsearchRestClientProperties properties;
DefaultRestClientBuilderCustomizer(ElasticsearchRestClientProperties properties) {
this.properties = properties;
}
@Override
public void customize(RestClientBuilder builder) {
}
@Override
public void customize(HttpAsyncClientBuilder builder) {
builder.setDefaultCredentialsProvider(new PropertiesCredentialsProvider(this.properties));
}
@Override
public void customize(RequestConfig.Builder builder) {
map.from(this.properties::getConnectionTimeout).whenNonNull().asInt(Duration::toMillis)
.to(builder::setConnectTimeout);
map.from(this.properties::getReadTimeout).whenNonNull().asInt(Duration::toMillis)
.to(builder::setSocketTimeout);
}
}
private static class PropertiesCredentialsProvider extends BasicCredentialsProvider {
PropertiesCredentialsProvider(ElasticsearchRestClientProperties properties) {
if (StringUtils.hasText(properties.getUsername())) {
Credentials credentials = new UsernamePasswordCredentials(properties.getUsername(),
properties.getPassword());
setCredentials(AuthScope.ANY, credentials);
}
properties.getUris().stream().map(this::toUri).filter(this::hasUserInfo)
.forEach(this::addUserInfoCredentials);
}
private URI toUri(String uri) {
try {
return URI.create(uri);
}
catch (IllegalArgumentException ex) {
return null;
}
}
private boolean hasUserInfo(URI uri) {
return uri != null && StringUtils.hasLength(uri.getUserInfo());
}
private void addUserInfoCredentials(URI uri) {
AuthScope authScope = new AuthScope(uri.getHost(), uri.getPort());
Credentials credentials = createUserInfoCredentials(uri.getUserInfo());
setCredentials(authScope, credentials);
}
private Credentials createUserInfoCredentials(String userInfo) {
int delimiter = userInfo.indexOf(":");
if (delimiter == -1) {
return new UsernamePasswordCredentials(userInfo, null);
}
String username = userInfo.substring(0, delimiter);
String password = userInfo.substring(delimiter + 1);
return new UsernamePasswordCredentials(username, password);
}
}
}

@ -57,6 +57,8 @@ public class ElasticsearchRestClientProperties {
*/
private Duration readTimeout = Duration.ofSeconds(30);
private final Sniffer sniffer = new Sniffer();
public List<String> getUris() {
return this.uris;
}
@ -97,4 +99,38 @@ public class ElasticsearchRestClientProperties {
this.readTimeout = readTimeout;
}
public Sniffer getSniffer() {
return this.sniffer;
}
public static class Sniffer {
/**
* Interval between consecutive ordinary sniff executions.
*/
private Duration interval = Duration.ofMinutes(5);
/**
* Delay of a sniff execution scheduled after a failure.
*/
private Duration delayAfterFailure = Duration.ofMinutes(1);
public Duration getInterval() {
return this.interval;
}
public void setInterval(Duration interval) {
this.interval = interval;
}
public Duration getDelayAfterFailure() {
return this.delayAfterFailure;
}
public void setDelayAfterFailure(Duration delayAfterFailure) {
this.delayAfterFailure = delayAfterFailure;
}
}
}

@ -34,12 +34,14 @@ import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.sniff.Sniffer;
import org.junit.jupiter.api.Test;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.FilteredClassLoader;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.boot.testsupport.testcontainers.DockerImageNames;
import org.springframework.context.annotation.Bean;
@ -47,6 +49,7 @@ import org.springframework.context.annotation.Configuration;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyNoInteractions;
/**
* Tests for {@link ElasticsearchRestClientAutoConfiguration}.
@ -220,6 +223,52 @@ class ElasticsearchRestClientAutoConfigurationTests {
});
}
@Test
void configureWithoutSnifferLibraryShouldNotCreateSniffer() {
this.contextRunner.withClassLoader(new FilteredClassLoader("org.elasticsearch.client.sniff"))
.run((context) -> assertThat(context).hasSingleBean(RestHighLevelClient.class)
.doesNotHaveBean(Sniffer.class));
}
@Test
void configureShouldCreateSnifferUsingRestHighLevelClient() {
this.contextRunner.run((context) -> {
assertThat(context).hasSingleBean(Sniffer.class);
assertThat(context.getBean(Sniffer.class)).hasFieldOrPropertyWithValue("restClient",
context.getBean(RestHighLevelClient.class).getLowLevelClient());
// Validate shutdown order as the sniffer must be shutdown before the client
assertThat(context.getBeanFactory().getDependentBeans("elasticsearchRestHighLevelClient"))
.contains("elasticsearchSniffer");
});
}
@Test
void configureWithCustomSnifferSettings() {
this.contextRunner.withPropertyValues("spring.elasticsearch.rest.sniffer.interval=180s",
"spring.elasticsearch.rest.sniffer.delay-after-failure=30s").run((context) -> {
assertThat(context).hasSingleBean(Sniffer.class);
Sniffer sniffer = context.getBean(Sniffer.class);
assertThat(sniffer).hasFieldOrPropertyWithValue("sniffIntervalMillis",
Duration.ofMinutes(3).toMillis());
assertThat(sniffer).hasFieldOrPropertyWithValue("sniffAfterFailureDelayMillis",
Duration.ofSeconds(30).toMillis());
});
}
@Test
void configureWhenCustomSnifferShouldBackOff() {
Sniffer customSniffer = mock(Sniffer.class);
this.contextRunner.withBean(Sniffer.class, () -> customSniffer)
.withPropertyValues("spring.elasticsearch.rest.sniffer.interval=180s",
"spring.elasticsearch.rest.sniffer.delay-after-failure=30s")
.run((context) -> {
assertThat(context).hasSingleBean(Sniffer.class);
Sniffer sniffer = context.getBean(Sniffer.class);
assertThat(sniffer).isSameAs(customSniffer);
verifyNoInteractions(customSniffer);
});
}
@Configuration(proxyBeanMethods = false)
static class BuilderCustomizerConfiguration {

@ -4983,6 +4983,19 @@ To take full control over the registration, define a `RestClientBuilder` bean.
TIP: If your application needs access to a "Low Level" `RestClient`, you can get it by calling `client.getLowLevelClient()` on the auto-configured `RestHighLevelClient`.
Additionally, if `elasticsearch-rest-client-sniffer` is on the classpath, a `Sniffer` is auto-configured to automatically discover nodes from a running Elasticsearch cluster and set them to the `RestHighLevelClient` bean.
You can further tune how `Sniffer` is configured, as shown in the following example:
[source,yaml,indent=0,configprops,configblocks]
----
spring:
elasticsearch:
rest:
sniffer:
interval: 10m
delay-after-failure: 30s
----
[[boot-features-connecting-to-elasticsearch-reactive-rest]]

Loading…
Cancel
Save