@ -21,9 +21,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.NewTopic ;
import org.apache.kafka.clients.producer.Producer ;
import org.junit.jupiter.api.AfterAll ;
import org.junit.jupiter.api.AfterEach ;
import org.junit.jupiter.api.BeforeAll ;
import org.junit.jupiter.api.Test ;
import org.springframework.boot.test.util.TestPropertyValues ;
@ -33,10 +31,12 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams ;
import org.springframework.kafka.annotation.KafkaListener ;
import org.springframework.kafka.config.StreamsBuilderFactoryBean ;
import org.springframework.kafka.config.TopicBuilder ;
import org.springframework.kafka.core.DefaultKafkaProducerFactory ;
import org.springframework.kafka.core.KafkaTemplate ;
import org.springframework.kafka.support.KafkaHeaders ;
import org.springframework.kafka.test.EmbeddedKafkaBroker ;
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition ;
import org.springframework.kafka.test.context.EmbeddedKafka ;
import org.springframework.messaging.handler.annotation.Header ;
import static org.assertj.core.api.Assertions.assertThat ;
@ -47,21 +47,15 @@ import static org.assertj.core.api.Assertions.assertThat;
* @author Gary Russell
* @author Stephane Nicoll
* /
@EmbeddedKafka ( topics = KafkaAutoConfigurationIntegrationTests . TEST_TOPIC )
class KafkaAutoConfigurationIntegrationTests {
p rivate static final String TEST_TOPIC = "testTopic" ;
p ublic static final String TEST_TOPIC = "testTopic" ;
private static final String ADMIN_CREATED_TOPIC = "adminCreatedTopic" ;
public static final EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaBroker ( 1 , true , TEST_TOPIC ) ;
private AnnotationConfigApplicationContext context ;
@BeforeAll
static void setUp ( ) {
embeddedKafka . afterPropertiesSet ( ) ;
}
@AfterEach
void close ( ) {
if ( this . context ! = null ) {
@ -69,11 +63,6 @@ class KafkaAutoConfigurationIntegrationTests {
}
}
@AfterAll
static void tearDown ( ) {
embeddedKafka . destroy ( ) ;
}
@SuppressWarnings ( { "unchecked" , "rawtypes" } )
@Test
void testEndToEnd ( ) throws Exception {
@ -113,7 +102,7 @@ class KafkaAutoConfigurationIntegrationTests {
}
private String getEmbeddedKafkaBrokersAsString ( ) {
return embeddedKafka . getBrokersAsString ( ) ;
return EmbeddedKafkaCondition. getBroker ( ) . getBrokersAsString ( ) ;
}
@Configuration ( proxyBeanMethods = false )
@ -126,7 +115,7 @@ class KafkaAutoConfigurationIntegrationTests {
@Bean
public NewTopic adminCreated ( ) {
return new NewTopic ( ADMIN_CREATED_TOPIC , 10 , ( short ) 1 ) ;
return TopicBuilder . name ( ADMIN_CREATED_TOPIC ) . partitions ( 10 ) . replicas ( 1 ) . build ( ) ;
}
}