Merge pull request #23766 from garyrussell

* pr/23766:
  Polish "Add configuration options for RabbitMQ's batch listener config"
  Add configuration options for RabbitMQ's batch listener config

Closes gh-23766
pull/23799/head
Stephane Nicoll 4 years ago
commit faa01c7619

@ -1,5 +1,5 @@
/* /*
* Copyright 2012-2019 the original author or authors. * Copyright 2012-2020 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -116,6 +116,7 @@ public abstract class AbstractRabbitListenerContainerFactoryConfigurer<T extends
factory.setIdleEventInterval(configuration.getIdleEventInterval().toMillis()); factory.setIdleEventInterval(configuration.getIdleEventInterval().toMillis());
} }
factory.setMissingQueuesFatal(configuration.isMissingQueuesFatal()); factory.setMissingQueuesFatal(configuration.isMissingQueuesFatal());
factory.setDeBatchingEnabled(configuration.isDeBatchingEnabled());
ListenerRetry retryConfig = configuration.getRetry(); ListenerRetry retryConfig = configuration.getRetry();
if (retryConfig.isEnabled()) { if (retryConfig.isEnabled()) {
RetryInterceptorBuilder<?, ?> builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless() RetryInterceptorBuilder<?, ?> builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless()

@ -662,6 +662,12 @@ public class RabbitProperties {
*/ */
private Duration idleEventInterval; private Duration idleEventInterval;
/**
* Whether the container should present batched messages as discrete messages or
* call the listener with the batch.
*/
private boolean deBatchingEnabled = true;
/** /**
* Optional properties for a retry interceptor. * Optional properties for a retry interceptor.
*/ */
@ -709,6 +715,14 @@ public class RabbitProperties {
public abstract boolean isMissingQueuesFatal(); public abstract boolean isMissingQueuesFatal();
public boolean isDeBatchingEnabled() {
return this.deBatchingEnabled;
}
public void setDeBatchingEnabled(boolean deBatchingEnabled) {
this.deBatchingEnabled = deBatchingEnabled;
}
public ListenerRetry getRetry() { public ListenerRetry getRetry() {
return this.retry; return this.retry;
} }
@ -743,6 +757,14 @@ public class RabbitProperties {
*/ */
private boolean missingQueuesFatal = true; private boolean missingQueuesFatal = true;
/**
* Whether the container creates a batch of messages based on the
* 'receive-timeout' and 'batch-size'. Coerces 'de-batching-enabled' to true to
* include the contents of a producer created batch in the batch as discrete
* records.
*/
private boolean consumerBatchEnabled;
public Integer getConcurrency() { public Integer getConcurrency() {
return this.concurrency; return this.concurrency;
} }
@ -776,6 +798,14 @@ public class RabbitProperties {
this.missingQueuesFatal = missingQueuesFatal; this.missingQueuesFatal = missingQueuesFatal;
} }
public boolean isConsumerBatchEnabled() {
return this.consumerBatchEnabled;
}
public void setConsumerBatchEnabled(boolean consumerBatchEnabled) {
this.consumerBatchEnabled = consumerBatchEnabled;
}
} }
/** /**

@ -1,5 +1,5 @@
/* /*
* Copyright 2012-2019 the original author or authors. * Copyright 2012-2020 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -39,6 +39,10 @@ public final class SimpleRabbitListenerContainerFactoryConfigurer
map.from(config::getConcurrency).whenNonNull().to(factory::setConcurrentConsumers); map.from(config::getConcurrency).whenNonNull().to(factory::setConcurrentConsumers);
map.from(config::getMaxConcurrency).whenNonNull().to(factory::setMaxConcurrentConsumers); map.from(config::getMaxConcurrency).whenNonNull().to(factory::setMaxConcurrentConsumers);
map.from(config::getBatchSize).whenNonNull().to(factory::setBatchSize); map.from(config::getBatchSize).whenNonNull().to(factory::setBatchSize);
map.from(config::isConsumerBatchEnabled).to(factory::setConsumerBatchEnabled);
if (config.isConsumerBatchEnabled()) {
factory.setDeBatchingEnabled(true);
}
} }
} }

@ -33,6 +33,8 @@ import com.rabbitmq.client.impl.CredentialsRefreshService;
import com.rabbitmq.client.impl.DefaultCredentialsProvider; import com.rabbitmq.client.impl.DefaultCredentialsProvider;
import org.aopalliance.aop.Advice; import org.aopalliance.aop.Advice;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.AmqpAdmin;
@ -550,12 +552,31 @@ class RabbitAutoConfigurationTests {
}); });
} }
@Test
void testSimpleRabbitListenerContainerFactoryConfigurerEnableDeBatchingWithConsumerBatchEnabled() {
this.contextRunner.withUserConfiguration(TestConfiguration.class)
.withPropertyValues("spring.rabbitmq.listener.type:direct",
"spring.rabbitmq.listener.simple.consumer-batch-enabled:true",
"spring.rabbitmq.listener.simple.de-batching-enabled:false")
.run((context) -> {
SimpleRabbitListenerContainerFactoryConfigurer configurer = context
.getBean(SimpleRabbitListenerContainerFactoryConfigurer.class);
SimpleRabbitListenerContainerFactory factory = mock(SimpleRabbitListenerContainerFactory.class);
configurer.configure(factory, mock(ConnectionFactory.class));
InOrder inOrder = Mockito.inOrder(factory);
verify(factory).setConsumerBatchEnabled(true);
inOrder.verify(factory).setDeBatchingEnabled(false);
inOrder.verify(factory).setDeBatchingEnabled(true);
});
}
@Test @Test
void testDirectRabbitListenerContainerFactoryConfigurerUsesConfig() { void testDirectRabbitListenerContainerFactoryConfigurerUsesConfig() {
this.contextRunner.withUserConfiguration(TestConfiguration.class) this.contextRunner.withUserConfiguration(TestConfiguration.class)
.withPropertyValues("spring.rabbitmq.listener.type:simple", .withPropertyValues("spring.rabbitmq.listener.type:simple",
"spring.rabbitmq.listener.direct.consumers-per-queue:5", "spring.rabbitmq.listener.direct.consumers-per-queue:5",
"spring.rabbitmq.listener.direct.prefetch:40") "spring.rabbitmq.listener.direct.prefetch:40",
"spring.rabbitmq.listener.direct.de-batching-enabled:false")
.run((context) -> { .run((context) -> {
DirectRabbitListenerContainerFactoryConfigurer configurer = context DirectRabbitListenerContainerFactoryConfigurer configurer = context
.getBean(DirectRabbitListenerContainerFactoryConfigurer.class); .getBean(DirectRabbitListenerContainerFactoryConfigurer.class);
@ -563,6 +584,7 @@ class RabbitAutoConfigurationTests {
configurer.configure(factory, mock(ConnectionFactory.class)); configurer.configure(factory, mock(ConnectionFactory.class));
verify(factory).setConsumersPerQueue(5); verify(factory).setConsumersPerQueue(5);
verify(factory).setPrefetchCount(40); verify(factory).setPrefetchCount(40);
verify(factory).setDeBatchingEnabled(false);
}); });
} }

@ -303,6 +303,8 @@ class RabbitPropertiesTests {
RabbitProperties.SimpleContainer simple = this.properties.getListener().getSimple(); RabbitProperties.SimpleContainer simple = this.properties.getListener().getSimple();
assertThat(simple.isAutoStartup()).isEqualTo(container.isAutoStartup()); assertThat(simple.isAutoStartup()).isEqualTo(container.isAutoStartup());
assertThat(container).hasFieldOrPropertyWithValue("missingQueuesFatal", simple.isMissingQueuesFatal()); assertThat(container).hasFieldOrPropertyWithValue("missingQueuesFatal", simple.isMissingQueuesFatal());
assertThat(container).hasFieldOrPropertyWithValue("deBatchingEnabled", simple.isDeBatchingEnabled());
assertThat(container).hasFieldOrPropertyWithValue("consumerBatchEnabled", simple.isConsumerBatchEnabled());
} }
@Test @Test
@ -312,6 +314,7 @@ class RabbitPropertiesTests {
RabbitProperties.DirectContainer direct = this.properties.getListener().getDirect(); RabbitProperties.DirectContainer direct = this.properties.getListener().getDirect();
assertThat(direct.isAutoStartup()).isEqualTo(container.isAutoStartup()); assertThat(direct.isAutoStartup()).isEqualTo(container.isAutoStartup());
assertThat(container).hasFieldOrPropertyWithValue("missingQueuesFatal", direct.isMissingQueuesFatal()); assertThat(container).hasFieldOrPropertyWithValue("missingQueuesFatal", direct.isMissingQueuesFatal());
assertThat(container).hasFieldOrPropertyWithValue("deBatchingEnabled", direct.isDeBatchingEnabled());
} }
} }

Loading…
Cancel
Save