From 3aa247f1ca0d7688df0979c5996394416e40e5ae Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Tue, 20 Oct 2020 16:31:11 -0400 Subject: [PATCH 1/2] Add configuration options for RabbitMQ's batch listener config See gh-23766 --- ...bitListenerContainerFactoryConfigurer.java | 3 +- .../autoconfigure/amqp/RabbitProperties.java | 28 +++++++++++++++++++ ...bitListenerContainerFactoryConfigurer.java | 6 +++- .../amqp/RabbitAutoConfigurationTests.java | 16 ++++++++--- .../amqp/RabbitPropertiesTests.java | 3 ++ 5 files changed, 50 insertions(+), 6 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java index 3b7f668dbe..765f5ca279 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2019 the original author or authors. + * Copyright 2012-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -116,6 +116,7 @@ public abstract class AbstractRabbitListenerContainerFactoryConfigurer builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless() diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java index 0add22bf46..07b861d21d 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java @@ -662,6 +662,11 @@ public class RabbitProperties { */ private Duration idleEventInterval; + /** + * Whether to present batched messages (created by a BatchingRabbitTemplate) as discrete messages. + */ + private boolean deBatchingEnabled = true; + /** * Optional properties for a retry interceptor. */ @@ -709,6 +714,14 @@ public class RabbitProperties { public abstract boolean isMissingQueuesFatal(); + public boolean isDeBatchingEnabled() { + return deBatchingEnabled; + } + + public void setDeBatchingEnabled(boolean deBatchingEnabled) { + this.deBatchingEnabled = deBatchingEnabled; + } + public ListenerRetry getRetry() { return this.retry; } @@ -743,6 +756,13 @@ public class RabbitProperties { */ private boolean missingQueuesFatal = true; + /** + * When true, the container will create a batch of messages based on the 'receiveTimeout' and 'batchSize'. + * Coerces 'deBatchingEnabled' to true to include the contents of a producer created batch in the batch as + * discrete records. + */ + private boolean consumerBatchEnabled; + public Integer getConcurrency() { return this.concurrency; } @@ -776,6 +796,14 @@ public class RabbitProperties { this.missingQueuesFatal = missingQueuesFatal; } + public boolean isConsumerBatchEnabled() { + return consumerBatchEnabled; + } + + public void setConsumerBatchEnabled(boolean consumerBatchEnabled) { + this.consumerBatchEnabled = consumerBatchEnabled; + } + } /** diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/SimpleRabbitListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/SimpleRabbitListenerContainerFactoryConfigurer.java index cb9c86cc71..146a5a7aed 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/SimpleRabbitListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/SimpleRabbitListenerContainerFactoryConfigurer.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2019 the original author or authors. + * Copyright 2012-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,6 +39,10 @@ public final class SimpleRabbitListenerContainerFactoryConfigurer map.from(config::getConcurrency).whenNonNull().to(factory::setConcurrentConsumers); map.from(config::getMaxConcurrency).whenNonNull().to(factory::setMaxConcurrentConsumers); map.from(config::getBatchSize).whenNonNull().to(factory::setBatchSize); + map.from(config::isConsumerBatchEnabled).to(factory::setConsumerBatchEnabled); + if (config.isConsumerBatchEnabled()) { + factory.setDeBatchingEnabled(true); + } } } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java index 811c9111be..efca9ee97a 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java @@ -34,6 +34,7 @@ import com.rabbitmq.client.impl.DefaultCredentialsProvider; import org.aopalliance.aop.Advice; import org.junit.jupiter.api.Test; +import org.mockito.InOrder; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Message; @@ -75,8 +76,7 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.BDDMockito.given; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.*; /** * Tests for {@link RabbitAutoConfiguration}. @@ -538,15 +538,21 @@ class RabbitAutoConfigurationTests { .withPropertyValues("spring.rabbitmq.listener.type:direct", "spring.rabbitmq.listener.simple.concurrency:5", "spring.rabbitmq.listener.simple.maxConcurrency:10", - "spring.rabbitmq.listener.simple.prefetch:40") + "spring.rabbitmq.listener.simple.prefetch:40", + "spring.rabbitmq.listener.simple.consumer-batch-enabled:true", + "spring.rabbitmq.listener.simple.de-batching-enabled:false") .run((context) -> { SimpleRabbitListenerContainerFactoryConfigurer configurer = context .getBean(SimpleRabbitListenerContainerFactoryConfigurer.class); SimpleRabbitListenerContainerFactory factory = mock(SimpleRabbitListenerContainerFactory.class); configurer.configure(factory, mock(ConnectionFactory.class)); + InOrder inOrder = inOrder(factory); verify(factory).setConcurrentConsumers(5); verify(factory).setMaxConcurrentConsumers(10); verify(factory).setPrefetchCount(40); + verify(factory).setConsumerBatchEnabled(true); + inOrder.verify(factory).setDeBatchingEnabled(false); + inOrder.verify(factory).setDeBatchingEnabled(true); }); } @@ -555,7 +561,8 @@ class RabbitAutoConfigurationTests { this.contextRunner.withUserConfiguration(TestConfiguration.class) .withPropertyValues("spring.rabbitmq.listener.type:simple", "spring.rabbitmq.listener.direct.consumers-per-queue:5", - "spring.rabbitmq.listener.direct.prefetch:40") + "spring.rabbitmq.listener.direct.prefetch:40", + "spring.rabbitmq.listener.direct.de-batching-enabled:false") .run((context) -> { DirectRabbitListenerContainerFactoryConfigurer configurer = context .getBean(DirectRabbitListenerContainerFactoryConfigurer.class); @@ -563,6 +570,7 @@ class RabbitAutoConfigurationTests { configurer.configure(factory, mock(ConnectionFactory.class)); verify(factory).setConsumersPerQueue(5); verify(factory).setPrefetchCount(40); + verify(factory).setDeBatchingEnabled(false); }); } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitPropertiesTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitPropertiesTests.java index 7224ef5e17..7eded032ec 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitPropertiesTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitPropertiesTests.java @@ -303,6 +303,8 @@ class RabbitPropertiesTests { RabbitProperties.SimpleContainer simple = this.properties.getListener().getSimple(); assertThat(simple.isAutoStartup()).isEqualTo(container.isAutoStartup()); assertThat(container).hasFieldOrPropertyWithValue("missingQueuesFatal", simple.isMissingQueuesFatal()); + assertThat(container).hasFieldOrPropertyWithValue("deBatchingEnabled", simple.isDeBatchingEnabled()); + assertThat(container).hasFieldOrPropertyWithValue("consumerBatchEnabled", simple.isConsumerBatchEnabled()); } @Test @@ -312,6 +314,7 @@ class RabbitPropertiesTests { RabbitProperties.DirectContainer direct = this.properties.getListener().getDirect(); assertThat(direct.isAutoStartup()).isEqualTo(container.isAutoStartup()); assertThat(container).hasFieldOrPropertyWithValue("missingQueuesFatal", direct.isMissingQueuesFatal()); + assertThat(container).hasFieldOrPropertyWithValue("deBatchingEnabled", direct.isDeBatchingEnabled()); } } From 17e12ea025a6d2994d6038e2adf2a76780b87260 Mon Sep 17 00:00:00 2001 From: Stephane Nicoll Date: Wed, 21 Oct 2020 08:49:34 +0200 Subject: [PATCH 2/2] Polish "Add configuration options for RabbitMQ's batch listener config" See gh-23766 --- .../autoconfigure/amqp/RabbitProperties.java | 14 +++++----- .../amqp/RabbitAutoConfigurationTests.java | 26 ++++++++++++++----- 2 files changed, 28 insertions(+), 12 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java index 07b861d21d..7511b37240 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java @@ -663,7 +663,8 @@ public class RabbitProperties { private Duration idleEventInterval; /** - * Whether to present batched messages (created by a BatchingRabbitTemplate) as discrete messages. + * Whether the container should present batched messages as discrete messages or + * call the listener with the batch. */ private boolean deBatchingEnabled = true; @@ -715,7 +716,7 @@ public class RabbitProperties { public abstract boolean isMissingQueuesFatal(); public boolean isDeBatchingEnabled() { - return deBatchingEnabled; + return this.deBatchingEnabled; } public void setDeBatchingEnabled(boolean deBatchingEnabled) { @@ -757,9 +758,10 @@ public class RabbitProperties { private boolean missingQueuesFatal = true; /** - * When true, the container will create a batch of messages based on the 'receiveTimeout' and 'batchSize'. - * Coerces 'deBatchingEnabled' to true to include the contents of a producer created batch in the batch as - * discrete records. + * Whether the container creates a batch of messages based on the + * 'receive-timeout' and 'batch-size'. Coerces 'de-batching-enabled' to true to + * include the contents of a producer created batch in the batch as discrete + * records. */ private boolean consumerBatchEnabled; @@ -797,7 +799,7 @@ public class RabbitProperties { } public boolean isConsumerBatchEnabled() { - return consumerBatchEnabled; + return this.consumerBatchEnabled; } public void setConsumerBatchEnabled(boolean consumerBatchEnabled) { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java index efca9ee97a..0904cf507d 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java @@ -33,8 +33,9 @@ import com.rabbitmq.client.impl.CredentialsRefreshService; import com.rabbitmq.client.impl.DefaultCredentialsProvider; import org.aopalliance.aop.Advice; import org.junit.jupiter.api.Test; - import org.mockito.InOrder; +import org.mockito.Mockito; + import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Message; @@ -76,7 +77,8 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.BDDMockito.given; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; /** * Tests for {@link RabbitAutoConfiguration}. @@ -538,18 +540,30 @@ class RabbitAutoConfigurationTests { .withPropertyValues("spring.rabbitmq.listener.type:direct", "spring.rabbitmq.listener.simple.concurrency:5", "spring.rabbitmq.listener.simple.maxConcurrency:10", - "spring.rabbitmq.listener.simple.prefetch:40", - "spring.rabbitmq.listener.simple.consumer-batch-enabled:true", - "spring.rabbitmq.listener.simple.de-batching-enabled:false") + "spring.rabbitmq.listener.simple.prefetch:40") .run((context) -> { SimpleRabbitListenerContainerFactoryConfigurer configurer = context .getBean(SimpleRabbitListenerContainerFactoryConfigurer.class); SimpleRabbitListenerContainerFactory factory = mock(SimpleRabbitListenerContainerFactory.class); configurer.configure(factory, mock(ConnectionFactory.class)); - InOrder inOrder = inOrder(factory); verify(factory).setConcurrentConsumers(5); verify(factory).setMaxConcurrentConsumers(10); verify(factory).setPrefetchCount(40); + }); + } + + @Test + void testSimpleRabbitListenerContainerFactoryConfigurerEnableDeBatchingWithConsumerBatchEnabled() { + this.contextRunner.withUserConfiguration(TestConfiguration.class) + .withPropertyValues("spring.rabbitmq.listener.type:direct", + "spring.rabbitmq.listener.simple.consumer-batch-enabled:true", + "spring.rabbitmq.listener.simple.de-batching-enabled:false") + .run((context) -> { + SimpleRabbitListenerContainerFactoryConfigurer configurer = context + .getBean(SimpleRabbitListenerContainerFactoryConfigurer.class); + SimpleRabbitListenerContainerFactory factory = mock(SimpleRabbitListenerContainerFactory.class); + configurer.configure(factory, mock(ConnectionFactory.class)); + InOrder inOrder = Mockito.inOrder(factory); verify(factory).setConsumerBatchEnabled(true); inOrder.verify(factory).setDeBatchingEnabled(false); inOrder.verify(factory).setDeBatchingEnabled(true);