diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java index 2bff6719dd..4a1ffa780e 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java @@ -20,6 +20,7 @@ import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.boot.autoconfigure.amqp.RabbitProperties.Listener; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; @@ -29,6 +30,7 @@ import org.springframework.context.annotation.Configuration; * Configuration for Spring AMQP annotation driven endpoints. * * @author Stephane Nicoll + * @author Josh Thornhill * @since 1.2.0 */ @Configuration @@ -41,6 +43,22 @@ class RabbitAnnotationDrivenConfiguration { ConnectionFactory connectionFactory, RabbitProperties config) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); + Listener listenerConfig = config.getListener(); + if (listenerConfig.getAcknowledgeMode() != null) { + factory.setAcknowledgeMode(listenerConfig.getAcknowledgeMode()); + } + if (listenerConfig.getConcurrency() != null) { + factory.setConcurrentConsumers(listenerConfig.getConcurrency()); + } + if (listenerConfig.getMaxConcurrency() != null) { + factory.setMaxConcurrentConsumers(listenerConfig.getMaxConcurrency()); + } + if (listenerConfig.getPrefetch() != null) { + factory.setPrefetchCount(listenerConfig.getPrefetch()); + } + if (listenerConfig.getTransactionSize() != null) { + factory.setTxSize(listenerConfig.getTransactionSize()); + } return factory; } diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java index 8b29f1c031..c9f4b6694f 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java @@ -20,6 +20,7 @@ import java.util.LinkedHashSet; import java.util.Properties; import java.util.Set; +import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.util.StringUtils; @@ -30,6 +31,7 @@ import org.springframework.util.StringUtils; * @author Dave Syer * @author Stephane Nicoll * @author Andy Wilkinson + * @author Josh Thornhill */ @ConfigurationProperties(prefix = "spring.rabbitmq") public class RabbitProperties { @@ -74,6 +76,11 @@ public class RabbitProperties { */ private Integer requestedHeartbeat; + /** + * Listener container configuration. + */ + private final Listener listener = new Listener(); + public String getHost() { if (this.addresses == null) { return this.host; @@ -180,6 +187,10 @@ public class RabbitProperties { this.requestedHeartbeat = requestedHeartbeat; } + public Listener getListener() { + return this.listener; + } + public static class Ssl { /** @@ -271,4 +282,75 @@ public class RabbitProperties { } } + + public static class Listener { + + /** + * Acknowledge mode of container. + */ + private AcknowledgeMode acknowledgeMode; + + /** + * Minimum number of consumers. + */ + private Integer concurrency; + + /** + * Maximum number of consumers. + */ + private Integer maxConcurrency; + + /** + * Number of messages to be handled in a single request. It should be greater than + * or equal to the transaction size (if used). + */ + private Integer prefetch; + + /** + * Number of messages to be processed in a transaction. For best results it should + * be less than or equal to the prefetch count. + */ + private Integer transactionSize; + + public AcknowledgeMode getAcknowledgeMode() { + return this.acknowledgeMode; + } + + public void setAcknowledgeMode(AcknowledgeMode acknowledgeMode) { + this.acknowledgeMode = acknowledgeMode; + } + + public Integer getConcurrency() { + return this.concurrency; + } + + public void setConcurrency(Integer concurrency) { + this.concurrency = concurrency; + } + + public Integer getMaxConcurrency() { + return this.maxConcurrency; + } + + public void setMaxConcurrency(Integer maxConcurrency) { + this.maxConcurrency = maxConcurrency; + } + + public Integer getPrefetch() { + return this.prefetch; + } + + public void setPrefetch(Integer prefetch) { + this.prefetch = prefetch; + } + + public Integer getTransactionSize() { + return this.transactionSize; + } + + public void setTransactionSize(Integer transactionSize) { + this.transactionSize = transactionSize; + } + } + } diff --git a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java index 532107b715..1d03612c28 100644 --- a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java +++ b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java @@ -23,6 +23,8 @@ import org.junit.After; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; + +import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils; @@ -185,6 +187,26 @@ public class RabbitAutoConfigurationTests { verify(rabbitListenerContainerFactory).setTxSize(10); } + @Test + public void testRabbitListenerContainerFactoryWithCustomSettings() { + load(TestConfiguration.class, + "spring.rabbitmq.listener.acknowledgeMode:manual", + "spring.rabbitmq.listener.concurrency:5", + "spring.rabbitmq.listener.maxConcurrency:10", + "spring.rabbitmq.listener.prefetch=40", + "spring.rabbitmq.listener.transactionSize:20"); + SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = this.context + .getBean("rabbitListenerContainerFactory", + SimpleRabbitListenerContainerFactory.class); + DirectFieldAccessor dfa = new DirectFieldAccessor(rabbitListenerContainerFactory); + assertEquals(AcknowledgeMode.MANUAL, + dfa.getPropertyValue("acknowledgeMode")); + assertEquals(5, dfa.getPropertyValue("concurrentConsumers")); + assertEquals(10, dfa.getPropertyValue("maxConcurrentConsumers")); + assertEquals(40, dfa.getPropertyValue("prefetchCount")); + assertEquals(20, dfa.getPropertyValue("txSize")); + } + @Test public void enableRabbitAutomatically() throws Exception { load(NoEnableRabbitConfiguration.class); diff --git a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc index 4a94c77bed..d9163e1778 100644 --- a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc +++ b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc @@ -469,6 +469,11 @@ content into your application; rather pick only the properties that you need. spring.rabbitmq.port= # connection port spring.rabbitmq.password= # login password spring.rabbitmq.requested-heartbeat= # requested heartbeat timeout, in seconds; zero for none + spring.rabbitmq.listener.acknowledge-mode= # acknowledge mode of container + spring.rabbitmq.listener.concurrency= # minimum number of consumers + spring.rabbitmq.listener.max-concurrency= # maximum number of consumers + spring.rabbitmq.listener.prefetch= # number of messages to be handled in a single request + spring.rabbitmq.listener.transaction-size= # number of messages to be processed in a transaction spring.rabbitmq.ssl.enabled=false # enable SSL support spring.rabbitmq.ssl.key-store= # path to the key store that holds the SSL certificate spring.rabbitmq.ssl.key-store-password= # password used to access the key store