diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java index 3b7f668dbe..765f5ca279 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2019 the original author or authors. + * Copyright 2012-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -116,6 +116,7 @@ public abstract class AbstractRabbitListenerContainerFactoryConfigurer builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless() diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java index 0add22bf46..7511b37240 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java @@ -662,6 +662,12 @@ public class RabbitProperties { */ private Duration idleEventInterval; + /** + * Whether the container should present batched messages as discrete messages or + * call the listener with the batch. + */ + private boolean deBatchingEnabled = true; + /** * Optional properties for a retry interceptor. */ @@ -709,6 +715,14 @@ public class RabbitProperties { public abstract boolean isMissingQueuesFatal(); + public boolean isDeBatchingEnabled() { + return this.deBatchingEnabled; + } + + public void setDeBatchingEnabled(boolean deBatchingEnabled) { + this.deBatchingEnabled = deBatchingEnabled; + } + public ListenerRetry getRetry() { return this.retry; } @@ -743,6 +757,14 @@ public class RabbitProperties { */ private boolean missingQueuesFatal = true; + /** + * Whether the container creates a batch of messages based on the + * 'receive-timeout' and 'batch-size'. Coerces 'de-batching-enabled' to true to + * include the contents of a producer created batch in the batch as discrete + * records. + */ + private boolean consumerBatchEnabled; + public Integer getConcurrency() { return this.concurrency; } @@ -776,6 +798,14 @@ public class RabbitProperties { this.missingQueuesFatal = missingQueuesFatal; } + public boolean isConsumerBatchEnabled() { + return this.consumerBatchEnabled; + } + + public void setConsumerBatchEnabled(boolean consumerBatchEnabled) { + this.consumerBatchEnabled = consumerBatchEnabled; + } + } /** diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/SimpleRabbitListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/SimpleRabbitListenerContainerFactoryConfigurer.java index cb9c86cc71..146a5a7aed 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/SimpleRabbitListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/SimpleRabbitListenerContainerFactoryConfigurer.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2019 the original author or authors. + * Copyright 2012-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,6 +39,10 @@ public final class SimpleRabbitListenerContainerFactoryConfigurer map.from(config::getConcurrency).whenNonNull().to(factory::setConcurrentConsumers); map.from(config::getMaxConcurrency).whenNonNull().to(factory::setMaxConcurrentConsumers); map.from(config::getBatchSize).whenNonNull().to(factory::setBatchSize); + map.from(config::isConsumerBatchEnabled).to(factory::setConsumerBatchEnabled); + if (config.isConsumerBatchEnabled()) { + factory.setDeBatchingEnabled(true); + } } } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java index 811c9111be..0904cf507d 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java @@ -33,6 +33,8 @@ import com.rabbitmq.client.impl.CredentialsRefreshService; import com.rabbitmq.client.impl.DefaultCredentialsProvider; import org.aopalliance.aop.Advice; import org.junit.jupiter.api.Test; +import org.mockito.InOrder; +import org.mockito.Mockito; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.AmqpAdmin; @@ -550,12 +552,31 @@ class RabbitAutoConfigurationTests { }); } + @Test + void testSimpleRabbitListenerContainerFactoryConfigurerEnableDeBatchingWithConsumerBatchEnabled() { + this.contextRunner.withUserConfiguration(TestConfiguration.class) + .withPropertyValues("spring.rabbitmq.listener.type:direct", + "spring.rabbitmq.listener.simple.consumer-batch-enabled:true", + "spring.rabbitmq.listener.simple.de-batching-enabled:false") + .run((context) -> { + SimpleRabbitListenerContainerFactoryConfigurer configurer = context + .getBean(SimpleRabbitListenerContainerFactoryConfigurer.class); + SimpleRabbitListenerContainerFactory factory = mock(SimpleRabbitListenerContainerFactory.class); + configurer.configure(factory, mock(ConnectionFactory.class)); + InOrder inOrder = Mockito.inOrder(factory); + verify(factory).setConsumerBatchEnabled(true); + inOrder.verify(factory).setDeBatchingEnabled(false); + inOrder.verify(factory).setDeBatchingEnabled(true); + }); + } + @Test void testDirectRabbitListenerContainerFactoryConfigurerUsesConfig() { this.contextRunner.withUserConfiguration(TestConfiguration.class) .withPropertyValues("spring.rabbitmq.listener.type:simple", "spring.rabbitmq.listener.direct.consumers-per-queue:5", - "spring.rabbitmq.listener.direct.prefetch:40") + "spring.rabbitmq.listener.direct.prefetch:40", + "spring.rabbitmq.listener.direct.de-batching-enabled:false") .run((context) -> { DirectRabbitListenerContainerFactoryConfigurer configurer = context .getBean(DirectRabbitListenerContainerFactoryConfigurer.class); @@ -563,6 +584,7 @@ class RabbitAutoConfigurationTests { configurer.configure(factory, mock(ConnectionFactory.class)); verify(factory).setConsumersPerQueue(5); verify(factory).setPrefetchCount(40); + verify(factory).setDeBatchingEnabled(false); }); } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitPropertiesTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitPropertiesTests.java index 7224ef5e17..7eded032ec 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitPropertiesTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitPropertiesTests.java @@ -303,6 +303,8 @@ class RabbitPropertiesTests { RabbitProperties.SimpleContainer simple = this.properties.getListener().getSimple(); assertThat(simple.isAutoStartup()).isEqualTo(container.isAutoStartup()); assertThat(container).hasFieldOrPropertyWithValue("missingQueuesFatal", simple.isMissingQueuesFatal()); + assertThat(container).hasFieldOrPropertyWithValue("deBatchingEnabled", simple.isDeBatchingEnabled()); + assertThat(container).hasFieldOrPropertyWithValue("consumerBatchEnabled", simple.isConsumerBatchEnabled()); } @Test @@ -312,6 +314,7 @@ class RabbitPropertiesTests { RabbitProperties.DirectContainer direct = this.properties.getListener().getDirect(); assertThat(direct.isAutoStartup()).isEqualTo(container.isAutoStartup()); assertThat(container).hasFieldOrPropertyWithValue("missingQueuesFatal", direct.isMissingQueuesFatal()); + assertThat(container).hasFieldOrPropertyWithValue("deBatchingEnabled", direct.isDeBatchingEnabled()); } }