Auto-configure Kafka's threadNameSupplier

Closes gh-36344
pull/36604/head
Moritz Halbritter 1 year ago
parent 7469327c77
commit 49ae8c0998

@ -17,6 +17,7 @@
package org.springframework.boot.autoconfigure.kafka;
import java.time.Duration;
import java.util.function.Function;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener;
import org.springframework.boot.context.properties.PropertyMapper;
@ -28,6 +29,7 @@ import org.springframework.kafka.listener.BatchInterceptor;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.support.converter.BatchMessageConverter;
@ -66,6 +68,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
private BatchInterceptor<Object, Object> batchInterceptor;
private Function<MessageListenerContainer, String> threadNameSupplier;
/**
* Set the {@link KafkaProperties} to use.
* @param properties the properties
@ -156,6 +160,14 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
this.batchInterceptor = batchInterceptor;
}
/**
* Set the thread name supplier to use.
* @param threadNameSupplier the thread name supplier to use
*/
void setThreadNameSupplier(Function<MessageListenerContainer, String> threadNameSupplier) {
this.threadNameSupplier = threadNameSupplier;
}
/**
* Configure the specified Kafka listener container factory. The factory can be
* further tuned and default settings can be overridden.
@ -186,6 +198,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
map.from(this.afterRollbackProcessor).to(factory::setAfterRollbackProcessor);
map.from(this.recordInterceptor).to(factory::setRecordInterceptor);
map.from(this.batchInterceptor).to(factory::setBatchInterceptor);
map.from(this.threadNameSupplier).to(factory::setThreadNameSupplier);
}
private void configureContainer(ContainerProperties container) {

@ -16,6 +16,8 @@
package org.springframework.boot.autoconfigure.kafka;
import java.util.function.Function;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
@ -33,6 +35,7 @@ import org.springframework.kafka.listener.BatchInterceptor;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.support.converter.BatchMessageConverter;
@ -73,6 +76,8 @@ class KafkaAnnotationDrivenConfiguration {
private final BatchInterceptor<Object, Object> batchInterceptor;
private final Function<MessageListenerContainer, String> threadNameSupplier;
KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
ObjectProvider<RecordMessageConverter> recordMessageConverter,
ObjectProvider<RecordFilterStrategy<Object, Object>> recordFilterStrategy,
@ -83,7 +88,8 @@ class KafkaAnnotationDrivenConfiguration {
ObjectProvider<CommonErrorHandler> commonErrorHandler,
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,
ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor,
ObjectProvider<BatchInterceptor<Object, Object>> batchInterceptor) {
ObjectProvider<BatchInterceptor<Object, Object>> batchInterceptor,
ObjectProvider<Function<MessageListenerContainer, String>> threadNameSupplier) {
this.properties = properties;
this.recordMessageConverter = recordMessageConverter.getIfUnique();
this.recordFilterStrategy = recordFilterStrategy.getIfUnique();
@ -96,6 +102,7 @@ class KafkaAnnotationDrivenConfiguration {
this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();
this.recordInterceptor = recordInterceptor.getIfUnique();
this.batchInterceptor = batchInterceptor.getIfUnique();
this.threadNameSupplier = threadNameSupplier.getIfUnique();
}
@Bean
@ -113,6 +120,7 @@ class KafkaAnnotationDrivenConfiguration {
configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
configurer.setRecordInterceptor(this.recordInterceptor);
configurer.setBatchInterceptor(this.batchInterceptor);
configurer.setThreadNameSupplier(this.threadNameSupplier);
return configurer;
}

@ -0,0 +1,63 @@
/*
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.kafka;
import java.util.function.Function;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.MessageListenerContainer;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
/**
* Tests for {@link ConcurrentKafkaListenerContainerFactoryConfigurer}.
*
* @author Moritz Halbritter
*/
class ConcurrentKafkaListenerContainerFactoryConfigurerTests {
private ConcurrentKafkaListenerContainerFactoryConfigurer configurer;
private ConcurrentKafkaListenerContainerFactory<Object, Object> factory;
private ConsumerFactory<Object, Object> consumerFactory;
@BeforeEach
@SuppressWarnings("unchecked")
void setUp() {
this.configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
this.configurer.setKafkaProperties(new KafkaProperties());
this.factory = spy(new ConcurrentKafkaListenerContainerFactory<>());
this.consumerFactory = mock(ConsumerFactory.class);
}
@Test
void shouldApplyThreadNameSupplier() {
Function<MessageListenerContainer, String> function = (container) -> "thread-1";
this.configurer.setThreadNameSupplier(function);
this.configurer.configure(this.factory, this.consumerFactory);
then(this.factory).should().setThreadNameSupplier(function);
}
}
Loading…
Cancel
Save