Add support for ConnectionNameStrategy

This commit detects if a `ConnectionNameStrategy` bean exists in the
context and associates it with the auto-configured RabbitMQ's
`ConnectionFactory` when that is the case.

Closes gh-12367
pull/12428/head
Stephane Nicoll 7 years ago
parent bc47b715c3
commit 42629cb8ae

@ -23,6 +23,7 @@ import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionNameStrategy;
import org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean; import org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean;
import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate; import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
@ -93,7 +94,8 @@ public class RabbitAutoConfiguration {
@Bean @Bean
public CachingConnectionFactory rabbitConnectionFactory( public CachingConnectionFactory rabbitConnectionFactory(
RabbitProperties properties) throws Exception { RabbitProperties properties,
ObjectProvider<ConnectionNameStrategy> connectionNameStrategy) throws Exception {
PropertyMapper map = PropertyMapper.get(); PropertyMapper map = PropertyMapper.get();
CachingConnectionFactory factory = new CachingConnectionFactory( CachingConnectionFactory factory = new CachingConnectionFactory(
getRabbitConnectionFactoryBean(properties).getObject()); getRabbitConnectionFactoryBean(properties).getObject());
@ -109,6 +111,8 @@ public class RabbitAutoConfiguration {
map.from(connection::getMode).whenNonNull().to(factory::setCacheMode); map.from(connection::getMode).whenNonNull().to(factory::setCacheMode);
map.from(connection::getSize).whenNonNull() map.from(connection::getSize).whenNonNull()
.to(factory::setConnectionCacheSize); .to(factory::setConnectionCacheSize);
map.from(connectionNameStrategy::getIfUnique).whenNonNull()
.to(factory::setConnectionNameStrategy);
return factory; return factory;
} }

@ -17,10 +17,12 @@
package org.springframework.boot.autoconfigure.amqp; package org.springframework.boot.autoconfigure.amqp;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.SSLSocketFactory;
import com.rabbitmq.client.Address; import com.rabbitmq.client.Address;
import com.rabbitmq.client.Connection;
import org.aopalliance.aop.Advice; import org.aopalliance.aop.Advice;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@ -36,6 +38,7 @@ import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFacto
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.CacheMode; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.CacheMode;
import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionNameStrategy;
import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate; import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate;
@ -57,6 +60,10 @@ import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate; import org.springframework.retry.support.RetryTemplate;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -151,6 +158,28 @@ public class RabbitAutoConfigurationTests {
}); });
} }
@Test
public void testConnectionFactoryWithCustomConnectionNameStrategy() {
this.contextRunner
.withUserConfiguration(ConnectionNameStrategyConfiguration.class)
.run((context) -> {
CachingConnectionFactory connectionFactory = context
.getBean(CachingConnectionFactory.class);
DirectFieldAccessor dfa = new DirectFieldAccessor(connectionFactory);
Address[] addresses = (Address[]) dfa.getPropertyValue("addresses");
assertThat(addresses).hasSize(1);
com.rabbitmq.client.ConnectionFactory rcf = mock(com.rabbitmq.client.ConnectionFactory.class);
given(rcf.newConnection(isNull(), eq(addresses), anyString()))
.willReturn(mock(Connection.class));
dfa.setPropertyValue("rabbitConnectionFactory", rcf);
connectionFactory.createConnection();
verify(rcf).newConnection(isNull(), eq(addresses), eq("test#0"));
connectionFactory.resetConnection();
connectionFactory.createConnection();
verify(rcf).newConnection(isNull(), eq(addresses), eq("test#1"));
});
}
@Test @Test
public void testConnectionFactoryEmptyVirtualHost() { public void testConnectionFactoryEmptyVirtualHost() {
this.contextRunner.withUserConfiguration(TestConfiguration.class) this.contextRunner.withUserConfiguration(TestConfiguration.class)
@ -751,6 +780,18 @@ public class RabbitAutoConfigurationTests {
} }
@Configuration
protected static class ConnectionNameStrategyConfiguration {
private final AtomicInteger counter = new AtomicInteger();
@Bean
public ConnectionNameStrategy myConnectionNameStrategy() {
return c -> "test#" + this.counter.getAndIncrement();
}
}
@Configuration @Configuration
@EnableRabbit @EnableRabbit
protected static class EnableRabbitConfiguration { protected static class EnableRabbitConfiguration {

@ -5187,8 +5187,10 @@ RabbitMQ configuration is controlled by external configuration properties in
spring.rabbitmq.password=secret spring.rabbitmq.password=secret
---- ----
See {sc-spring-boot-autoconfigure}/amqp/RabbitProperties.{sc-ext}[`RabbitProperties`] If a `ConnectionNameStrategy` bean exists in the context, it will be automatically used to
for more of the supported options. name connections created by the auto-configured `ConnectionFactory`. See
{sc-spring-boot-autoconfigure}/amqp/RabbitProperties.{sc-ext}[`RabbitProperties`] for more
of the supported options.
TIP: See TIP: See
https://spring.io/blog/2010/06/14/understanding-amqp-the-protocol-used-by-rabbitmq/[Understanding https://spring.io/blog/2010/06/14/understanding-amqp-the-protocol-used-by-rabbitmq/[Understanding

Loading…
Cancel
Save