Add Spring Pulsar container property customizers

This commit adds the ability for users to register
property customizers for the auto-configured Spring
Pulsar listener containers.

See #36347
pull/37559/head
Chris Bono 1 year ago
parent 845c4dd057
commit 3a2d562cef

@ -149,11 +149,12 @@ public class PulsarAutoConfiguration {
@ConditionalOnMissingBean(name = "pulsarListenerContainerFactory") @ConditionalOnMissingBean(name = "pulsarListenerContainerFactory")
ConcurrentPulsarListenerContainerFactory<Object> pulsarListenerContainerFactory( ConcurrentPulsarListenerContainerFactory<Object> pulsarListenerContainerFactory(
PulsarConsumerFactory<Object> pulsarConsumerFactory, SchemaResolver schemaResolver, PulsarConsumerFactory<Object> pulsarConsumerFactory, SchemaResolver schemaResolver,
TopicResolver topicResolver) { TopicResolver topicResolver, ObjectProvider<PulsarContainerPropertiesCustomizer> customizersProvider) {
PulsarContainerProperties containerProperties = new PulsarContainerProperties(); PulsarContainerProperties containerProperties = new PulsarContainerProperties();
containerProperties.setSchemaResolver(schemaResolver); containerProperties.setSchemaResolver(schemaResolver);
containerProperties.setTopicResolver(topicResolver); containerProperties.setTopicResolver(topicResolver);
this.propertiesMapper.customizeContainerProperties(containerProperties); this.propertiesMapper.customizeContainerProperties(containerProperties);
customizersProvider.orderedStream().forEach((customizer) -> customizer.customize(containerProperties));
return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties); return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties);
} }
@ -178,10 +179,12 @@ public class PulsarAutoConfiguration {
@Bean @Bean
@ConditionalOnMissingBean(name = "pulsarReaderContainerFactory") @ConditionalOnMissingBean(name = "pulsarReaderContainerFactory")
DefaultPulsarReaderContainerFactory<?> pulsarReaderContainerFactory(PulsarReaderFactory<?> pulsarReaderFactory, DefaultPulsarReaderContainerFactory<?> pulsarReaderContainerFactory(PulsarReaderFactory<?> pulsarReaderFactory,
SchemaResolver schemaResolver) { SchemaResolver schemaResolver,
ObjectProvider<PulsarReaderContainerPropertiesCustomizer> customizersProvider) {
PulsarReaderContainerProperties readerContainerProperties = new PulsarReaderContainerProperties(); PulsarReaderContainerProperties readerContainerProperties = new PulsarReaderContainerProperties();
readerContainerProperties.setSchemaResolver(schemaResolver); readerContainerProperties.setSchemaResolver(schemaResolver);
this.propertiesMapper.customizeReaderContainerProperties(readerContainerProperties); this.propertiesMapper.customizeReaderContainerProperties(readerContainerProperties);
customizersProvider.orderedStream().forEach((customizer) -> customizer.customize(readerContainerProperties));
return new DefaultPulsarReaderContainerFactory<>(pulsarReaderFactory, readerContainerProperties); return new DefaultPulsarReaderContainerFactory<>(pulsarReaderFactory, readerContainerProperties);
} }

@ -0,0 +1,36 @@
/*
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.pulsar;
import org.springframework.pulsar.listener.PulsarContainerProperties;
/**
* The interface to customize a {@link PulsarContainerProperties}.
*
* @author Chris Bono
* @since 3.2.0
*/
@FunctionalInterface
public interface PulsarContainerPropertiesCustomizer {
/**
* Customizes a {@link PulsarContainerProperties}.
* @param containerProperties the container properties to customize
*/
void customize(PulsarContainerProperties containerProperties);
}

@ -156,14 +156,25 @@ public class PulsarReactiveAutoConfiguration {
@ConditionalOnMissingBean(name = "reactivePulsarListenerContainerFactory") @ConditionalOnMissingBean(name = "reactivePulsarListenerContainerFactory")
DefaultReactivePulsarListenerContainerFactory<?> reactivePulsarListenerContainerFactory( DefaultReactivePulsarListenerContainerFactory<?> reactivePulsarListenerContainerFactory(
ReactivePulsarConsumerFactory<Object> reactivePulsarConsumerFactory, SchemaResolver schemaResolver, ReactivePulsarConsumerFactory<Object> reactivePulsarConsumerFactory, SchemaResolver schemaResolver,
TopicResolver topicResolver) { TopicResolver topicResolver,
ObjectProvider<ReactivePulsarContainerPropertiesCustomizer<?>> customizersProvider) {
ReactivePulsarContainerProperties<Object> containerProperties = new ReactivePulsarContainerProperties<>(); ReactivePulsarContainerProperties<Object> containerProperties = new ReactivePulsarContainerProperties<>();
containerProperties.setSchemaResolver(schemaResolver); containerProperties.setSchemaResolver(schemaResolver);
containerProperties.setTopicResolver(topicResolver); containerProperties.setTopicResolver(topicResolver);
this.propertiesMapper.customizeContainerProperties(containerProperties); List<ReactivePulsarContainerPropertiesCustomizer<?>> customizers = new ArrayList<>();
customizers.add(this.propertiesMapper::customizeContainerProperties);
customizers.addAll(customizersProvider.orderedStream().toList());
applyContainerPropertiesCustomizers(customizers, containerProperties);
return new DefaultReactivePulsarListenerContainerFactory<>(reactivePulsarConsumerFactory, containerProperties); return new DefaultReactivePulsarListenerContainerFactory<>(reactivePulsarConsumerFactory, containerProperties);
} }
@SuppressWarnings("unchecked")
private void applyContainerPropertiesCustomizers(List<ReactivePulsarContainerPropertiesCustomizer<?>> customizers,
ReactivePulsarContainerProperties<?> containerProperties) {
LambdaSafe.callbacks(ReactivePulsarContainerPropertiesCustomizer.class, customizers, containerProperties)
.invoke((customizer) -> customizer.customize(containerProperties));
}
@Bean @Bean
@ConditionalOnMissingBean(ReactivePulsarReaderFactory.class) @ConditionalOnMissingBean(ReactivePulsarReaderFactory.class)
DefaultReactivePulsarReaderFactory<?> reactivePulsarReaderFactory(ReactivePulsarClient reactivePulsarClient, DefaultReactivePulsarReaderFactory<?> reactivePulsarReaderFactory(ReactivePulsarClient reactivePulsarClient,

@ -0,0 +1,36 @@
/*
* Copyright 2023-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.pulsar;
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
/**
* The interface to customize a {@link PulsarReaderContainerProperties}.
*
* @author Chris Bono
* @since 3.2.0
*/
@FunctionalInterface
public interface PulsarReaderContainerPropertiesCustomizer {
/**
* Customizes a {@link PulsarReaderContainerProperties}.
* @param containerProperties the container properties to customize
*/
void customize(PulsarReaderContainerProperties containerProperties);
}

@ -0,0 +1,37 @@
/*
* Copyright 2023-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.pulsar;
import org.springframework.pulsar.reactive.listener.ReactivePulsarContainerProperties;
/**
* The interface to customize a {@link ReactivePulsarContainerProperties}.
*
* @param <T> the message payload type
* @author Chris Bono
* @since 3.2.0
*/
@FunctionalInterface
public interface ReactivePulsarContainerPropertiesCustomizer<T> {
/**
* Customizes a {@link ReactivePulsarContainerProperties}.
* @param containerProperties the container properties to customize
*/
void customize(ReactivePulsarContainerProperties<T> containerProperties);
}

@ -25,8 +25,10 @@ import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.schema.SchemaType;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
@ -47,6 +49,7 @@ import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactor
import org.springframework.pulsar.config.DefaultPulsarReaderContainerFactory; import org.springframework.pulsar.config.DefaultPulsarReaderContainerFactory;
import org.springframework.pulsar.config.PulsarListenerContainerFactory; import org.springframework.pulsar.config.PulsarListenerContainerFactory;
import org.springframework.pulsar.config.PulsarListenerEndpointRegistry; import org.springframework.pulsar.config.PulsarListenerEndpointRegistry;
import org.springframework.pulsar.config.PulsarReaderContainerFactory;
import org.springframework.pulsar.config.PulsarReaderEndpointRegistry; import org.springframework.pulsar.config.PulsarReaderEndpointRegistry;
import org.springframework.pulsar.core.CachingPulsarProducerFactory; import org.springframework.pulsar.core.CachingPulsarProducerFactory;
import org.springframework.pulsar.core.ConsumerBuilderCustomizer; import org.springframework.pulsar.core.ConsumerBuilderCustomizer;
@ -65,6 +68,7 @@ import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.core.ReaderBuilderCustomizer; import org.springframework.pulsar.core.ReaderBuilderCustomizer;
import org.springframework.pulsar.core.SchemaResolver; import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.TopicResolver; import org.springframework.pulsar.core.TopicResolver;
import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -390,7 +394,7 @@ class PulsarAutoConfigurationTests {
} }
@Nested @Nested
class ListenerTests { class ListenerContainerTests {
private final ApplicationContextRunner contextRunner = PulsarAutoConfigurationTests.this.contextRunner; private final ApplicationContextRunner contextRunner = PulsarAutoConfigurationTests.this.contextRunner;
@ -464,6 +468,53 @@ class PulsarAutoConfigurationTests {
.hasFieldOrPropertyWithValue("containerProperties.observationEnabled", false)); .hasFieldOrPropertyWithValue("containerProperties.observationEnabled", false));
} }
@Test
void whenHasUserDefinedCustomizersAppliesInCorrectOrder() {
this.contextRunner.withPropertyValues("spring.pulsar.consumer.subscription.type=Shared")
.withUserConfiguration(ContainerPropertiesCustomizerConfig.class)
.run((context) -> {
ConcurrentPulsarListenerContainerFactory<?> containerFactory = context
.getBean(ConcurrentPulsarListenerContainerFactory.class);
// We use subscriptionType to prove user customizers come after base
// props customizer.
// We use subscriptionName to prove user customizers are applied in
// their order.
assertThat(containerFactory)
.extracting(ConcurrentPulsarListenerContainerFactory::getContainerProperties)
.satisfies((containerProps) -> {
assertThat(containerProps.getSubscriptionType()).isEqualTo(SubscriptionType.Failover);
assertThat(containerProps.getSubscriptionName()).isEqualTo("/customizer1/customizer2");
});
});
}
@TestConfiguration(proxyBeanMethods = false)
static class ContainerPropertiesCustomizerConfig {
@Bean
@Order(200)
PulsarContainerPropertiesCustomizer customizerFoo() {
return (props) -> {
props.setSubscriptionType(SubscriptionType.Failover);
String name = "%s/customizer2"
.formatted((props.getSubscriptionName() != null) ? props.getSubscriptionName() : "");
props.setSubscriptionName(name);
};
}
@Bean
@Order(100)
PulsarContainerPropertiesCustomizer customizerBar() {
return (props) -> {
props.setSubscriptionType(SubscriptionType.Failover);
String name = "%s/customizer1"
.formatted((props.getSubscriptionName() != null) ? props.getSubscriptionName() : "");
props.setSubscriptionName(name);
};
}
}
} }
@Nested @Nested
@ -517,4 +568,88 @@ class PulsarAutoConfigurationTests {
} }
@Nested
class ReaderContainerTests {
private final ApplicationContextRunner contextRunner = PulsarAutoConfigurationTests.this.contextRunner;
@Test
void whenHasUserDefinedReaderContainerFactoryBeanDoesNotAutoConfigureBean() {
PulsarReaderContainerFactory readerContainerFactory = mock(PulsarReaderContainerFactory.class);
this.contextRunner
.withBean("pulsarReaderContainerFactory", PulsarReaderContainerFactory.class,
() -> readerContainerFactory)
.run((context) -> assertThat(context).getBean(PulsarReaderContainerFactory.class)
.isSameAs(readerContainerFactory));
}
@Test
@SuppressWarnings("rawtypes")
void injectsExpectedBeans() {
PulsarReaderFactory<?> readerFactory = mock(PulsarReaderFactory.class);
SchemaResolver schemaResolver = mock(SchemaResolver.class);
this.contextRunner.withBean("pulsarReaderFactory", PulsarReaderFactory.class, () -> readerFactory)
.withBean("schemaResolver", SchemaResolver.class, () -> schemaResolver)
.run((context) -> assertThat(context).getBean(DefaultPulsarReaderContainerFactory.class)
.hasFieldOrPropertyWithValue("readerFactory", readerFactory)
.extracting(DefaultPulsarReaderContainerFactory::getContainerProperties)
.hasFieldOrPropertyWithValue("schemaResolver", schemaResolver));
}
@Test
@SuppressWarnings("unchecked")
void whenHasUserDefinedReaderAnnotationBeanPostProcessorBeanDoesNotAutoConfigureBean() {
PulsarReaderAnnotationBeanPostProcessor<String> readerAnnotationBeanPostProcessor = mock(
PulsarReaderAnnotationBeanPostProcessor.class);
this.contextRunner
.withBean("org.springframework.pulsar.config.internalPulsarReaderAnnotationProcessor",
PulsarReaderAnnotationBeanPostProcessor.class, () -> readerAnnotationBeanPostProcessor)
.run((context) -> assertThat(context).getBean(PulsarReaderAnnotationBeanPostProcessor.class)
.isSameAs(readerAnnotationBeanPostProcessor));
}
@Test
void whenHasCustomProperties() {
List<String> properties = new ArrayList<>();
properties.add("spring.pulsar.reader.topics=fromPropsCustomizer");
this.contextRunner.withPropertyValues(properties.toArray(String[]::new)).run((context) -> {
DefaultPulsarReaderContainerFactory<?> factory = context
.getBean(DefaultPulsarReaderContainerFactory.class);
assertThat(factory.getContainerProperties().getTopics()).containsExactly("fromPropsCustomizer");
});
}
@Test
void whenHasUserDefinedCustomizersAppliesInCorrectOrder() {
this.contextRunner.withPropertyValues("spring.pulsar.reader.topics=fromPropsCustomizer")
.withUserConfiguration(ReaderContainerPropertiesCustomizerConfig.class)
.run((context) -> {
DefaultPulsarReaderContainerFactory<?> containerFactory = context
.getBean(DefaultPulsarReaderContainerFactory.class);
assertThat(containerFactory).extracting(DefaultPulsarReaderContainerFactory::getContainerProperties)
.extracting(PulsarReaderContainerProperties::getTopics,
InstanceOfAssertFactories.list(String.class))
.containsExactly("fromPropsCustomizer", "customizer1", "customizer2");
});
}
@TestConfiguration(proxyBeanMethods = false)
static class ReaderContainerPropertiesCustomizerConfig {
@Bean
@Order(200)
PulsarReaderContainerPropertiesCustomizer customizerFoo() {
return (props) -> props.getTopics().add("customizer2");
}
@Bean
@Order(100)
PulsarReaderContainerPropertiesCustomizer customizerBar() {
return (props) -> props.getTopics().add("customizer1");
}
}
}
} }

@ -23,6 +23,7 @@ import java.util.function.Supplier;
import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.reactive.client.adapter.ProducerCacheProvider; import org.apache.pulsar.reactive.client.adapter.ProducerCacheProvider;
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerBuilder; import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerBuilder;
@ -352,6 +353,53 @@ class PulsarReactiveAutoConfigurationTests {
}); });
} }
@Test
void whenHasUserDefinedCustomizersAppliesInCorrectOrder() {
this.contextRunner.withPropertyValues("spring.pulsar.consumer.subscription.type=Shared")
.withUserConfiguration(ReactiveContainerPropertiesCustomizerConfig.class)
.run((context) -> {
DefaultReactivePulsarListenerContainerFactory<?> containerFactory = context
.getBean(DefaultReactivePulsarListenerContainerFactory.class);
// Use subscriptionType to prove user customizers come after base
// props customizer.
// Use subscriptionName to prove user customizers are applied in
// their order.
assertThat(containerFactory)
.extracting(DefaultReactivePulsarListenerContainerFactory::getContainerProperties)
.satisfies((containerProps) -> {
assertThat(containerProps.getSubscriptionType()).isEqualTo(SubscriptionType.Failover);
assertThat(containerProps.getSubscriptionName()).isEqualTo("/customizer1/customizer2");
});
});
}
@TestConfiguration(proxyBeanMethods = false)
static class ReactiveContainerPropertiesCustomizerConfig {
@Bean
@Order(200)
ReactivePulsarContainerPropertiesCustomizer<?> customizerFoo() {
return (props) -> {
props.setSubscriptionType(SubscriptionType.Failover);
String name = "%s/customizer2"
.formatted((props.getSubscriptionName() != null) ? props.getSubscriptionName() : "");
props.setSubscriptionName(name);
};
}
@Bean
@Order(100)
ReactivePulsarContainerPropertiesCustomizer<?> customizerBar() {
return (props) -> {
props.setSubscriptionType(SubscriptionType.Failover);
String name = "%s/customizer1"
.formatted((props.getSubscriptionName() != null) ? props.getSubscriptionName() : "");
props.setSubscriptionName(name);
};
}
}
} }
@Nested @Nested

@ -140,7 +140,8 @@ If you need more control over the consumer factory configuration, consider regis
These customizers are applied to all consumers created by the factory, and therefore all `@PulsarListener` instances. These customizers are applied to all consumers created by the factory, and therefore all `@PulsarListener` instances.
You can also customize a single listener by setting the `consumerCustomizer` attribute of the `@PulsarListener` annotation. You can also customize a single listener by setting the `consumerCustomizer` attribute of the `@PulsarListener` annotation.
The properties of the underlying listener container can also be customized by registering one or more `PulsarContainerPropertiesCustomizer` beans.
These customizers are applied to all created listener containers.
[[messaging.pulsar.receiving-reactive]] [[messaging.pulsar.receiving-reactive]]
=== Receiving a Message Reactively === Receiving a Message Reactively
@ -156,7 +157,8 @@ If you need more control over the consumer factory configuration, consider regis
These customizers are applied to all consumers created by the factory, and therefore all `@ReactivePulsarListener` instances. These customizers are applied to all consumers created by the factory, and therefore all `@ReactivePulsarListener` instances.
You can also customize a single listener by setting the `consumerCustomizer` attribute of the `@ReactivePulsarListener` annotation. You can also customize a single listener by setting the `consumerCustomizer` attribute of the `@ReactivePulsarListener` annotation.
The properties of the underlying listener container can also be customized by registering one or more `ReactivePulsarContainerPropertiesCustomizer` beans.
These customizers are applied to all created listener containers.
[[messaging.pulsar.reading]] [[messaging.pulsar.reading]]
=== Reading a Message === Reading a Message
@ -175,6 +177,8 @@ If you need more control over the reader factory configuration, consider registe
These customizers are applied to all readers created by the factory, and therefore all `@PulsarReader` instances. These customizers are applied to all readers created by the factory, and therefore all `@PulsarReader` instances.
You can also customize a single listener by setting the `readerCustomizer` attribute of the `@PulsarReader` annotation. You can also customize a single listener by setting the `readerCustomizer` attribute of the `@PulsarReader` annotation.
The properties of the underlying reader container can also be customized by registering one or more `PulsarReaderContainerPropertiesCustomizer` beans.
These customizers are applied to all created reader containers.
[[messaging.pulsar.reading-reactive]] [[messaging.pulsar.reading-reactive]]

Loading…
Cancel
Save