|
|
@ -62,7 +62,6 @@ import org.springframework.kafka.support.converter.BatchMessageConverter;
|
|
|
|
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
|
|
|
|
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
|
|
|
|
import org.springframework.kafka.support.converter.MessagingMessageConverter;
|
|
|
|
import org.springframework.kafka.support.converter.MessagingMessageConverter;
|
|
|
|
import org.springframework.kafka.support.converter.RecordMessageConverter;
|
|
|
|
import org.springframework.kafka.support.converter.RecordMessageConverter;
|
|
|
|
import org.springframework.kafka.test.utils.KafkaTestUtils;
|
|
|
|
|
|
|
|
import org.springframework.kafka.transaction.ChainedKafkaTransactionManager;
|
|
|
|
import org.springframework.kafka.transaction.ChainedKafkaTransactionManager;
|
|
|
|
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
|
|
|
|
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
|
|
|
|
import org.springframework.kafka.transaction.KafkaTransactionManager;
|
|
|
|
import org.springframework.kafka.transaction.KafkaTransactionManager;
|
|
|
@ -285,8 +284,8 @@ public class KafkaAutoConfigurationTests {
|
|
|
|
.isEmpty();
|
|
|
|
.isEmpty();
|
|
|
|
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
|
|
|
|
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
|
|
|
|
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
|
|
|
|
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
|
|
|
|
assertThat(KafkaTestUtils.getPropertyValue(admin,
|
|
|
|
assertThat(admin).hasFieldOrPropertyWithValue(
|
|
|
|
"fatalIfBrokerNotAvailable", Boolean.class)).isTrue();
|
|
|
|
"fatalIfBrokerNotAvailable", true);
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -577,8 +576,8 @@ public class KafkaAutoConfigurationTests {
|
|
|
|
.run((context) -> {
|
|
|
|
.run((context) -> {
|
|
|
|
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
|
|
|
|
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
|
|
|
|
.getBean(ConcurrentKafkaListenerContainerFactory.class);
|
|
|
|
.getBean(ConcurrentKafkaListenerContainerFactory.class);
|
|
|
|
assertThat(KafkaTestUtils.getPropertyValue(factory, "errorHandler"))
|
|
|
|
assertThat(factory).hasFieldOrPropertyWithValue("errorHandler",
|
|
|
|
.isSameAs(context.getBean("errorHandler"));
|
|
|
|
context.getBean("errorHandler"));
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -588,8 +587,8 @@ public class KafkaAutoConfigurationTests {
|
|
|
|
.withPropertyValues("spring.kafka.listener.type=batch").run((context) -> {
|
|
|
|
.withPropertyValues("spring.kafka.listener.type=batch").run((context) -> {
|
|
|
|
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
|
|
|
|
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
|
|
|
|
.getBean(ConcurrentKafkaListenerContainerFactory.class);
|
|
|
|
.getBean(ConcurrentKafkaListenerContainerFactory.class);
|
|
|
|
assertThat(KafkaTestUtils.getPropertyValue(factory, "errorHandler"))
|
|
|
|
assertThat(factory).hasFieldOrPropertyWithValue("errorHandler",
|
|
|
|
.isSameAs(context.getBean("batchErrorHandler"));
|
|
|
|
context.getBean("batchErrorHandler"));
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -599,8 +598,7 @@ public class KafkaAutoConfigurationTests {
|
|
|
|
.run((context) -> {
|
|
|
|
.run((context) -> {
|
|
|
|
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
|
|
|
|
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
|
|
|
|
.getBean(ConcurrentKafkaListenerContainerFactory.class);
|
|
|
|
.getBean(ConcurrentKafkaListenerContainerFactory.class);
|
|
|
|
assertThat(KafkaTestUtils.getPropertyValue(factory, "errorHandler"))
|
|
|
|
assertThat(factory).hasFieldOrPropertyWithValue("errorHandler", null);
|
|
|
|
.isNull();
|
|
|
|
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -610,8 +608,7 @@ public class KafkaAutoConfigurationTests {
|
|
|
|
.withUserConfiguration(ErrorHandlerConfiguration.class).run((context) -> {
|
|
|
|
.withUserConfiguration(ErrorHandlerConfiguration.class).run((context) -> {
|
|
|
|
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
|
|
|
|
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
|
|
|
|
.getBean(ConcurrentKafkaListenerContainerFactory.class);
|
|
|
|
.getBean(ConcurrentKafkaListenerContainerFactory.class);
|
|
|
|
assertThat(KafkaTestUtils.getPropertyValue(factory, "errorHandler"))
|
|
|
|
assertThat(factory).hasFieldOrPropertyWithValue("errorHandler", null);
|
|
|
|
.isNull();
|
|
|
|
|
|
|
|
});
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|