|
|
|
@ -16,6 +16,8 @@
|
|
|
|
|
|
|
|
|
|
package org.springframework.boot.autoconfigure.amqp;
|
|
|
|
|
|
|
|
|
|
import java.time.Duration;
|
|
|
|
|
|
|
|
|
|
import com.rabbitmq.client.Channel;
|
|
|
|
|
|
|
|
|
|
import org.springframework.amqp.core.AmqpAdmin;
|
|
|
|
@ -33,6 +35,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
|
|
|
|
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
|
|
|
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate;
|
|
|
|
|
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
|
|
|
|
import org.springframework.boot.context.properties.PropertyMapper;
|
|
|
|
|
import org.springframework.context.annotation.Bean;
|
|
|
|
|
import org.springframework.context.annotation.Configuration;
|
|
|
|
|
import org.springframework.context.annotation.Import;
|
|
|
|
@ -76,6 +79,7 @@ import org.springframework.retry.support.RetryTemplate;
|
|
|
|
|
* @author Josh Long
|
|
|
|
|
* @author Stephane Nicoll
|
|
|
|
|
* @author Gary Russell
|
|
|
|
|
* @author Phillip Webb
|
|
|
|
|
*/
|
|
|
|
|
@Configuration
|
|
|
|
|
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
|
|
|
|
@ -88,66 +92,55 @@ public class RabbitAutoConfiguration {
|
|
|
|
|
protected static class RabbitConnectionFactoryCreator {
|
|
|
|
|
|
|
|
|
|
@Bean
|
|
|
|
|
public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties config)
|
|
|
|
|
throws Exception {
|
|
|
|
|
public CachingConnectionFactory rabbitConnectionFactory(
|
|
|
|
|
RabbitProperties properties) throws Exception {
|
|
|
|
|
PropertyMapper map = PropertyMapper.get();
|
|
|
|
|
CachingConnectionFactory factory = new CachingConnectionFactory(
|
|
|
|
|
getRabbitConnectionFactoryBean(properties).getObject());
|
|
|
|
|
map.from(properties::determineAddresses).to(factory::setAddresses);
|
|
|
|
|
map.from(properties::isPublisherConfirms).to(factory::setPublisherConfirms);
|
|
|
|
|
map.from(properties::isPublisherReturns).to(factory::setPublisherReturns);
|
|
|
|
|
RabbitProperties.Cache.Channel channel = properties.getCache().getChannel();
|
|
|
|
|
map.from(channel::getSize).whenNonNull().to(factory::setChannelCacheSize);
|
|
|
|
|
map.from(channel::getCheckoutTimeout).whenNonNull()
|
|
|
|
|
.to(factory::setChannelCheckoutTimeout);
|
|
|
|
|
RabbitProperties.Cache.Connection connection = properties.getCache()
|
|
|
|
|
.getConnection();
|
|
|
|
|
map.from(connection::getMode).whenNonNull().to(factory::setCacheMode);
|
|
|
|
|
map.from(connection::getSize).whenNonNull()
|
|
|
|
|
.to(factory::setConnectionCacheSize);
|
|
|
|
|
return factory;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private RabbitConnectionFactoryBean getRabbitConnectionFactoryBean(
|
|
|
|
|
RabbitProperties properties) throws Exception {
|
|
|
|
|
PropertyMapper map = PropertyMapper.get();
|
|
|
|
|
RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();
|
|
|
|
|
if (config.determineHost() != null) {
|
|
|
|
|
factory.setHost(config.determineHost());
|
|
|
|
|
}
|
|
|
|
|
factory.setPort(config.determinePort());
|
|
|
|
|
if (config.determineUsername() != null) {
|
|
|
|
|
factory.setUsername(config.determineUsername());
|
|
|
|
|
}
|
|
|
|
|
if (config.determinePassword() != null) {
|
|
|
|
|
factory.setPassword(config.determinePassword());
|
|
|
|
|
}
|
|
|
|
|
if (config.determineVirtualHost() != null) {
|
|
|
|
|
factory.setVirtualHost(config.determineVirtualHost());
|
|
|
|
|
}
|
|
|
|
|
if (config.getRequestedHeartbeat() != null) {
|
|
|
|
|
factory.setRequestedHeartbeat(
|
|
|
|
|
(int) config.getRequestedHeartbeat().getSeconds());
|
|
|
|
|
}
|
|
|
|
|
RabbitProperties.Ssl ssl = config.getSsl();
|
|
|
|
|
map.from(properties::determineHost).whenNonNull().to(factory::setHost);
|
|
|
|
|
map.from(properties::determinePort).to(factory::setPort);
|
|
|
|
|
map.from(properties::determineUsername).whenNonNull()
|
|
|
|
|
.to(factory::setUsername);
|
|
|
|
|
map.from(properties::determinePassword).whenNonNull()
|
|
|
|
|
.to(factory::setPassword);
|
|
|
|
|
map.from(properties::determineVirtualHost).whenNonNull()
|
|
|
|
|
.to(factory::setVirtualHost);
|
|
|
|
|
map.from(properties::getRequestedHeartbeat).whenNonNull()
|
|
|
|
|
.asInt(Duration::getSeconds).to(factory::setRequestedHeartbeat);
|
|
|
|
|
RabbitProperties.Ssl ssl = properties.getSsl();
|
|
|
|
|
if (ssl.isEnabled()) {
|
|
|
|
|
factory.setUseSSL(true);
|
|
|
|
|
if (ssl.getAlgorithm() != null) {
|
|
|
|
|
factory.setSslAlgorithm(ssl.getAlgorithm());
|
|
|
|
|
}
|
|
|
|
|
factory.setKeyStoreType(ssl.getKeyStoreType());
|
|
|
|
|
factory.setKeyStore(ssl.getKeyStore());
|
|
|
|
|
factory.setKeyStorePassphrase(ssl.getKeyStorePassword());
|
|
|
|
|
factory.setTrustStoreType(ssl.getTrustStoreType());
|
|
|
|
|
factory.setTrustStore(ssl.getTrustStore());
|
|
|
|
|
factory.setTrustStorePassphrase(ssl.getTrustStorePassword());
|
|
|
|
|
}
|
|
|
|
|
if (config.getConnectionTimeout() != null) {
|
|
|
|
|
factory.setConnectionTimeout(
|
|
|
|
|
(int) config.getConnectionTimeout().toMillis());
|
|
|
|
|
}
|
|
|
|
|
map.from(ssl::getAlgorithm).whenNonNull().to(factory::setSslAlgorithm);
|
|
|
|
|
map.from(ssl::getKeyStoreType).to(factory::setKeyStoreType);
|
|
|
|
|
map.from(ssl::getKeyStore).to(factory::setKeyStore);
|
|
|
|
|
map.from(ssl::getKeyStorePassword).to(factory::setKeyStorePassphrase);
|
|
|
|
|
map.from(ssl::getTrustStoreType).to(factory::setTrustStoreType);
|
|
|
|
|
map.from(ssl::getTrustStore).to(factory::setTrustStore);
|
|
|
|
|
map.from(ssl::getTrustStorePassword).to(factory::setTrustStorePassphrase);
|
|
|
|
|
}
|
|
|
|
|
map.from(properties::getConnectionTimeout).whenNonNull()
|
|
|
|
|
.asInt(Duration::toMillis).to(factory::setConnectionTimeout);
|
|
|
|
|
factory.afterPropertiesSet();
|
|
|
|
|
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
|
|
|
|
|
factory.getObject());
|
|
|
|
|
connectionFactory.setAddresses(config.determineAddresses());
|
|
|
|
|
connectionFactory.setPublisherConfirms(config.isPublisherConfirms());
|
|
|
|
|
connectionFactory.setPublisherReturns(config.isPublisherReturns());
|
|
|
|
|
if (config.getCache().getChannel().getSize() != null) {
|
|
|
|
|
connectionFactory
|
|
|
|
|
.setChannelCacheSize(config.getCache().getChannel().getSize());
|
|
|
|
|
}
|
|
|
|
|
if (config.getCache().getConnection().getMode() != null) {
|
|
|
|
|
connectionFactory
|
|
|
|
|
.setCacheMode(config.getCache().getConnection().getMode());
|
|
|
|
|
}
|
|
|
|
|
if (config.getCache().getConnection().getSize() != null) {
|
|
|
|
|
connectionFactory.setConnectionCacheSize(
|
|
|
|
|
config.getCache().getConnection().getSize());
|
|
|
|
|
}
|
|
|
|
|
if (config.getCache().getChannel().getCheckoutTimeout() != null) {
|
|
|
|
|
connectionFactory.setChannelCheckoutTimeout(
|
|
|
|
|
config.getCache().getChannel().getCheckoutTimeout());
|
|
|
|
|
}
|
|
|
|
|
return connectionFactory;
|
|
|
|
|
return factory;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
@ -171,26 +164,24 @@ public class RabbitAutoConfiguration {
|
|
|
|
|
@ConditionalOnSingleCandidate(ConnectionFactory.class)
|
|
|
|
|
@ConditionalOnMissingBean(RabbitTemplate.class)
|
|
|
|
|
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
|
|
|
|
|
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
|
|
|
|
|
PropertyMapper map = PropertyMapper.get();
|
|
|
|
|
RabbitTemplate template = new RabbitTemplate(connectionFactory);
|
|
|
|
|
MessageConverter messageConverter = this.messageConverter.getIfUnique();
|
|
|
|
|
if (messageConverter != null) {
|
|
|
|
|
rabbitTemplate.setMessageConverter(messageConverter);
|
|
|
|
|
}
|
|
|
|
|
rabbitTemplate.setMandatory(determineMandatoryFlag());
|
|
|
|
|
RabbitProperties.Template templateProperties = this.properties.getTemplate();
|
|
|
|
|
RabbitProperties.Retry retryProperties = templateProperties.getRetry();
|
|
|
|
|
if (retryProperties.isEnabled()) {
|
|
|
|
|
rabbitTemplate.setRetryTemplate(createRetryTemplate(retryProperties));
|
|
|
|
|
}
|
|
|
|
|
if (templateProperties.getReceiveTimeout() != null) {
|
|
|
|
|
rabbitTemplate.setReceiveTimeout(templateProperties.getReceiveTimeout());
|
|
|
|
|
}
|
|
|
|
|
if (templateProperties.getReplyTimeout() != null) {
|
|
|
|
|
rabbitTemplate.setReplyTimeout(templateProperties.getReplyTimeout());
|
|
|
|
|
}
|
|
|
|
|
rabbitTemplate.setExchange(templateProperties.getExchange());
|
|
|
|
|
rabbitTemplate.setRoutingKey(templateProperties.getRoutingKey());
|
|
|
|
|
return rabbitTemplate;
|
|
|
|
|
template.setMessageConverter(messageConverter);
|
|
|
|
|
}
|
|
|
|
|
template.setMandatory(determineMandatoryFlag());
|
|
|
|
|
RabbitProperties.Template properties = this.properties.getTemplate();
|
|
|
|
|
if (properties.getRetry().isEnabled()) {
|
|
|
|
|
template.setRetryTemplate(createRetryTemplate(properties.getRetry()));
|
|
|
|
|
}
|
|
|
|
|
map.from(properties::getReceiveTimeout).whenNonNull()
|
|
|
|
|
.to(template::setReceiveTimeout);
|
|
|
|
|
map.from(properties::getReplyTimeout).whenNonNull()
|
|
|
|
|
.to(template::setReplyTimeout);
|
|
|
|
|
map.from(properties::getExchange).to(template::setExchange);
|
|
|
|
|
map.from(properties::getRoutingKey).to(template::setRoutingKey);
|
|
|
|
|
return template;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private boolean determineMandatoryFlag() {
|
|
|
|
@ -199,14 +190,16 @@ public class RabbitAutoConfiguration {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private RetryTemplate createRetryTemplate(RabbitProperties.Retry properties) {
|
|
|
|
|
PropertyMapper map = PropertyMapper.get();
|
|
|
|
|
RetryTemplate template = new RetryTemplate();
|
|
|
|
|
SimpleRetryPolicy policy = new SimpleRetryPolicy();
|
|
|
|
|
policy.setMaxAttempts(properties.getMaxAttempts());
|
|
|
|
|
map.from(properties::getMaxAttempts).to(policy::setMaxAttempts);
|
|
|
|
|
template.setRetryPolicy(policy);
|
|
|
|
|
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
|
|
|
|
|
backOffPolicy.setInitialInterval(properties.getInitialInterval());
|
|
|
|
|
backOffPolicy.setMultiplier(properties.getMultiplier());
|
|
|
|
|
backOffPolicy.setMaxInterval(properties.getMaxInterval());
|
|
|
|
|
map.from(properties::getInitialInterval)
|
|
|
|
|
.to(backOffPolicy::setInitialInterval);
|
|
|
|
|
map.from(properties::getMultiplier).to(backOffPolicy::setMultiplier);
|
|
|
|
|
map.from(properties::getMaxInterval).to(backOffPolicy::setMaxInterval);
|
|
|
|
|
template.setBackOffPolicy(backOffPolicy);
|
|
|
|
|
return template;
|
|
|
|
|
}
|
|
|
|
|