diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java index 9ed6ae3b09..6af612db08 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java @@ -149,11 +149,12 @@ public class PulsarAutoConfiguration { @ConditionalOnMissingBean(name = "pulsarListenerContainerFactory") ConcurrentPulsarListenerContainerFactory pulsarListenerContainerFactory( PulsarConsumerFactory pulsarConsumerFactory, SchemaResolver schemaResolver, - TopicResolver topicResolver) { + TopicResolver topicResolver, ObjectProvider customizersProvider) { PulsarContainerProperties containerProperties = new PulsarContainerProperties(); containerProperties.setSchemaResolver(schemaResolver); containerProperties.setTopicResolver(topicResolver); this.propertiesMapper.customizeContainerProperties(containerProperties); + customizersProvider.orderedStream().forEach((customizer) -> customizer.customize(containerProperties)); return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties); } @@ -178,10 +179,12 @@ public class PulsarAutoConfiguration { @Bean @ConditionalOnMissingBean(name = "pulsarReaderContainerFactory") DefaultPulsarReaderContainerFactory pulsarReaderContainerFactory(PulsarReaderFactory pulsarReaderFactory, - SchemaResolver schemaResolver) { + SchemaResolver schemaResolver, + ObjectProvider customizersProvider) { PulsarReaderContainerProperties readerContainerProperties = new PulsarReaderContainerProperties(); readerContainerProperties.setSchemaResolver(schemaResolver); this.propertiesMapper.customizeReaderContainerProperties(readerContainerProperties); + customizersProvider.orderedStream().forEach((customizer) -> customizer.customize(readerContainerProperties)); return new DefaultPulsarReaderContainerFactory<>(pulsarReaderFactory, readerContainerProperties); } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarContainerPropertiesCustomizer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarContainerPropertiesCustomizer.java new file mode 100644 index 0000000000..57fc881398 --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarContainerPropertiesCustomizer.java @@ -0,0 +1,36 @@ +/* + * Copyright 2012-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.pulsar; + +import org.springframework.pulsar.listener.PulsarContainerProperties; + +/** + * The interface to customize a {@link PulsarContainerProperties}. + * + * @author Chris Bono + * @since 3.2.0 + */ +@FunctionalInterface +public interface PulsarContainerPropertiesCustomizer { + + /** + * Customizes a {@link PulsarContainerProperties}. + * @param containerProperties the container properties to customize + */ + void customize(PulsarContainerProperties containerProperties); + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java index 4c2aeb172d..0016893787 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java @@ -156,14 +156,25 @@ public class PulsarReactiveAutoConfiguration { @ConditionalOnMissingBean(name = "reactivePulsarListenerContainerFactory") DefaultReactivePulsarListenerContainerFactory reactivePulsarListenerContainerFactory( ReactivePulsarConsumerFactory reactivePulsarConsumerFactory, SchemaResolver schemaResolver, - TopicResolver topicResolver) { + TopicResolver topicResolver, + ObjectProvider> customizersProvider) { ReactivePulsarContainerProperties containerProperties = new ReactivePulsarContainerProperties<>(); containerProperties.setSchemaResolver(schemaResolver); containerProperties.setTopicResolver(topicResolver); - this.propertiesMapper.customizeContainerProperties(containerProperties); + List> customizers = new ArrayList<>(); + customizers.add(this.propertiesMapper::customizeContainerProperties); + customizers.addAll(customizersProvider.orderedStream().toList()); + applyContainerPropertiesCustomizers(customizers, containerProperties); return new DefaultReactivePulsarListenerContainerFactory<>(reactivePulsarConsumerFactory, containerProperties); } + @SuppressWarnings("unchecked") + private void applyContainerPropertiesCustomizers(List> customizers, + ReactivePulsarContainerProperties containerProperties) { + LambdaSafe.callbacks(ReactivePulsarContainerPropertiesCustomizer.class, customizers, containerProperties) + .invoke((customizer) -> customizer.customize(containerProperties)); + } + @Bean @ConditionalOnMissingBean(ReactivePulsarReaderFactory.class) DefaultReactivePulsarReaderFactory reactivePulsarReaderFactory(ReactivePulsarClient reactivePulsarClient, diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReaderContainerPropertiesCustomizer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReaderContainerPropertiesCustomizer.java new file mode 100644 index 0000000000..44949d79b3 --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReaderContainerPropertiesCustomizer.java @@ -0,0 +1,36 @@ +/* + * Copyright 2023-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.pulsar; + +import org.springframework.pulsar.reader.PulsarReaderContainerProperties; + +/** + * The interface to customize a {@link PulsarReaderContainerProperties}. + * + * @author Chris Bono + * @since 3.2.0 + */ +@FunctionalInterface +public interface PulsarReaderContainerPropertiesCustomizer { + + /** + * Customizes a {@link PulsarReaderContainerProperties}. + * @param containerProperties the container properties to customize + */ + void customize(PulsarReaderContainerProperties containerProperties); + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/ReactivePulsarContainerPropertiesCustomizer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/ReactivePulsarContainerPropertiesCustomizer.java new file mode 100644 index 0000000000..53b1e3da03 --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/ReactivePulsarContainerPropertiesCustomizer.java @@ -0,0 +1,37 @@ +/* + * Copyright 2023-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.pulsar; + +import org.springframework.pulsar.reactive.listener.ReactivePulsarContainerProperties; + +/** + * The interface to customize a {@link ReactivePulsarContainerProperties}. + * + * @param the message payload type + * @author Chris Bono + * @since 3.2.0 + */ +@FunctionalInterface +public interface ReactivePulsarContainerPropertiesCustomizer { + + /** + * Customizes a {@link ReactivePulsarContainerProperties}. + * @param containerProperties the container properties to customize + */ + void customize(ReactivePulsarContainerProperties containerProperties); + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java index 16e66c9a47..98a5daf16b 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java @@ -25,8 +25,10 @@ import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.ReaderBuilder; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; import org.apache.pulsar.common.schema.SchemaType; +import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -47,6 +49,7 @@ import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactor import org.springframework.pulsar.config.DefaultPulsarReaderContainerFactory; import org.springframework.pulsar.config.PulsarListenerContainerFactory; import org.springframework.pulsar.config.PulsarListenerEndpointRegistry; +import org.springframework.pulsar.config.PulsarReaderContainerFactory; import org.springframework.pulsar.config.PulsarReaderEndpointRegistry; import org.springframework.pulsar.core.CachingPulsarProducerFactory; import org.springframework.pulsar.core.ConsumerBuilderCustomizer; @@ -65,6 +68,7 @@ import org.springframework.pulsar.core.PulsarTemplate; import org.springframework.pulsar.core.ReaderBuilderCustomizer; import org.springframework.pulsar.core.SchemaResolver; import org.springframework.pulsar.core.TopicResolver; +import org.springframework.pulsar.reader.PulsarReaderContainerProperties; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; @@ -390,7 +394,7 @@ class PulsarAutoConfigurationTests { } @Nested - class ListenerTests { + class ListenerContainerTests { private final ApplicationContextRunner contextRunner = PulsarAutoConfigurationTests.this.contextRunner; @@ -464,6 +468,53 @@ class PulsarAutoConfigurationTests { .hasFieldOrPropertyWithValue("containerProperties.observationEnabled", false)); } + @Test + void whenHasUserDefinedCustomizersAppliesInCorrectOrder() { + this.contextRunner.withPropertyValues("spring.pulsar.consumer.subscription.type=Shared") + .withUserConfiguration(ContainerPropertiesCustomizerConfig.class) + .run((context) -> { + ConcurrentPulsarListenerContainerFactory containerFactory = context + .getBean(ConcurrentPulsarListenerContainerFactory.class); + // We use subscriptionType to prove user customizers come after base + // props customizer. + // We use subscriptionName to prove user customizers are applied in + // their order. + assertThat(containerFactory) + .extracting(ConcurrentPulsarListenerContainerFactory::getContainerProperties) + .satisfies((containerProps) -> { + assertThat(containerProps.getSubscriptionType()).isEqualTo(SubscriptionType.Failover); + assertThat(containerProps.getSubscriptionName()).isEqualTo("/customizer1/customizer2"); + }); + }); + } + + @TestConfiguration(proxyBeanMethods = false) + static class ContainerPropertiesCustomizerConfig { + + @Bean + @Order(200) + PulsarContainerPropertiesCustomizer customizerFoo() { + return (props) -> { + props.setSubscriptionType(SubscriptionType.Failover); + String name = "%s/customizer2" + .formatted((props.getSubscriptionName() != null) ? props.getSubscriptionName() : ""); + props.setSubscriptionName(name); + }; + } + + @Bean + @Order(100) + PulsarContainerPropertiesCustomizer customizerBar() { + return (props) -> { + props.setSubscriptionType(SubscriptionType.Failover); + String name = "%s/customizer1" + .formatted((props.getSubscriptionName() != null) ? props.getSubscriptionName() : ""); + props.setSubscriptionName(name); + }; + } + + } + } @Nested @@ -517,4 +568,88 @@ class PulsarAutoConfigurationTests { } + @Nested + class ReaderContainerTests { + + private final ApplicationContextRunner contextRunner = PulsarAutoConfigurationTests.this.contextRunner; + + @Test + void whenHasUserDefinedReaderContainerFactoryBeanDoesNotAutoConfigureBean() { + PulsarReaderContainerFactory readerContainerFactory = mock(PulsarReaderContainerFactory.class); + this.contextRunner + .withBean("pulsarReaderContainerFactory", PulsarReaderContainerFactory.class, + () -> readerContainerFactory) + .run((context) -> assertThat(context).getBean(PulsarReaderContainerFactory.class) + .isSameAs(readerContainerFactory)); + } + + @Test + @SuppressWarnings("rawtypes") + void injectsExpectedBeans() { + PulsarReaderFactory readerFactory = mock(PulsarReaderFactory.class); + SchemaResolver schemaResolver = mock(SchemaResolver.class); + this.contextRunner.withBean("pulsarReaderFactory", PulsarReaderFactory.class, () -> readerFactory) + .withBean("schemaResolver", SchemaResolver.class, () -> schemaResolver) + .run((context) -> assertThat(context).getBean(DefaultPulsarReaderContainerFactory.class) + .hasFieldOrPropertyWithValue("readerFactory", readerFactory) + .extracting(DefaultPulsarReaderContainerFactory::getContainerProperties) + .hasFieldOrPropertyWithValue("schemaResolver", schemaResolver)); + } + + @Test + @SuppressWarnings("unchecked") + void whenHasUserDefinedReaderAnnotationBeanPostProcessorBeanDoesNotAutoConfigureBean() { + PulsarReaderAnnotationBeanPostProcessor readerAnnotationBeanPostProcessor = mock( + PulsarReaderAnnotationBeanPostProcessor.class); + this.contextRunner + .withBean("org.springframework.pulsar.config.internalPulsarReaderAnnotationProcessor", + PulsarReaderAnnotationBeanPostProcessor.class, () -> readerAnnotationBeanPostProcessor) + .run((context) -> assertThat(context).getBean(PulsarReaderAnnotationBeanPostProcessor.class) + .isSameAs(readerAnnotationBeanPostProcessor)); + } + + @Test + void whenHasCustomProperties() { + List properties = new ArrayList<>(); + properties.add("spring.pulsar.reader.topics=fromPropsCustomizer"); + this.contextRunner.withPropertyValues(properties.toArray(String[]::new)).run((context) -> { + DefaultPulsarReaderContainerFactory factory = context + .getBean(DefaultPulsarReaderContainerFactory.class); + assertThat(factory.getContainerProperties().getTopics()).containsExactly("fromPropsCustomizer"); + }); + } + + @Test + void whenHasUserDefinedCustomizersAppliesInCorrectOrder() { + this.contextRunner.withPropertyValues("spring.pulsar.reader.topics=fromPropsCustomizer") + .withUserConfiguration(ReaderContainerPropertiesCustomizerConfig.class) + .run((context) -> { + DefaultPulsarReaderContainerFactory containerFactory = context + .getBean(DefaultPulsarReaderContainerFactory.class); + assertThat(containerFactory).extracting(DefaultPulsarReaderContainerFactory::getContainerProperties) + .extracting(PulsarReaderContainerProperties::getTopics, + InstanceOfAssertFactories.list(String.class)) + .containsExactly("fromPropsCustomizer", "customizer1", "customizer2"); + }); + } + + @TestConfiguration(proxyBeanMethods = false) + static class ReaderContainerPropertiesCustomizerConfig { + + @Bean + @Order(200) + PulsarReaderContainerPropertiesCustomizer customizerFoo() { + return (props) -> props.getTopics().add("customizer2"); + } + + @Bean + @Order(100) + PulsarReaderContainerPropertiesCustomizer customizerBar() { + return (props) -> props.getTopics().add("customizer1"); + } + + } + + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java index 4f3ab011ea..f92fbd4d55 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java @@ -23,6 +23,7 @@ import java.util.function.Supplier; import com.github.benmanes.caffeine.cache.Caffeine; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.reactive.client.adapter.ProducerCacheProvider; import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerBuilder; @@ -352,6 +353,53 @@ class PulsarReactiveAutoConfigurationTests { }); } + @Test + void whenHasUserDefinedCustomizersAppliesInCorrectOrder() { + this.contextRunner.withPropertyValues("spring.pulsar.consumer.subscription.type=Shared") + .withUserConfiguration(ReactiveContainerPropertiesCustomizerConfig.class) + .run((context) -> { + DefaultReactivePulsarListenerContainerFactory containerFactory = context + .getBean(DefaultReactivePulsarListenerContainerFactory.class); + // Use subscriptionType to prove user customizers come after base + // props customizer. + // Use subscriptionName to prove user customizers are applied in + // their order. + assertThat(containerFactory) + .extracting(DefaultReactivePulsarListenerContainerFactory::getContainerProperties) + .satisfies((containerProps) -> { + assertThat(containerProps.getSubscriptionType()).isEqualTo(SubscriptionType.Failover); + assertThat(containerProps.getSubscriptionName()).isEqualTo("/customizer1/customizer2"); + }); + }); + } + + @TestConfiguration(proxyBeanMethods = false) + static class ReactiveContainerPropertiesCustomizerConfig { + + @Bean + @Order(200) + ReactivePulsarContainerPropertiesCustomizer customizerFoo() { + return (props) -> { + props.setSubscriptionType(SubscriptionType.Failover); + String name = "%s/customizer2" + .formatted((props.getSubscriptionName() != null) ? props.getSubscriptionName() : ""); + props.setSubscriptionName(name); + }; + } + + @Bean + @Order(100) + ReactivePulsarContainerPropertiesCustomizer customizerBar() { + return (props) -> { + props.setSubscriptionType(SubscriptionType.Failover); + String name = "%s/customizer1" + .formatted((props.getSubscriptionName() != null) ? props.getSubscriptionName() : ""); + props.setSubscriptionName(name); + }; + } + + } + } @Nested diff --git a/spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/pulsar.adoc b/spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/pulsar.adoc index f2ced63368..4a378deda8 100644 --- a/spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/pulsar.adoc +++ b/spring-boot-project/spring-boot-docs/src/docs/asciidoc/messaging/pulsar.adoc @@ -140,7 +140,8 @@ If you need more control over the consumer factory configuration, consider regis These customizers are applied to all consumers created by the factory, and therefore all `@PulsarListener` instances. You can also customize a single listener by setting the `consumerCustomizer` attribute of the `@PulsarListener` annotation. - +The properties of the underlying listener container can also be customized by registering one or more `PulsarContainerPropertiesCustomizer` beans. +These customizers are applied to all created listener containers. [[messaging.pulsar.receiving-reactive]] === Receiving a Message Reactively @@ -156,7 +157,8 @@ If you need more control over the consumer factory configuration, consider regis These customizers are applied to all consumers created by the factory, and therefore all `@ReactivePulsarListener` instances. You can also customize a single listener by setting the `consumerCustomizer` attribute of the `@ReactivePulsarListener` annotation. - +The properties of the underlying listener container can also be customized by registering one or more `ReactivePulsarContainerPropertiesCustomizer` beans. +These customizers are applied to all created listener containers. [[messaging.pulsar.reading]] === Reading a Message @@ -175,6 +177,8 @@ If you need more control over the reader factory configuration, consider registe These customizers are applied to all readers created by the factory, and therefore all `@PulsarReader` instances. You can also customize a single listener by setting the `readerCustomizer` attribute of the `@PulsarReader` annotation. +The properties of the underlying reader container can also be customized by registering one or more `PulsarReaderContainerPropertiesCustomizer` beans. +These customizers are applied to all created reader containers. [[messaging.pulsar.reading-reactive]]