diff --git a/spring-boot-project/spring-boot/src/test/java/org/springframework/boot/web/reactive/server/AbstractReactiveWebServerFactoryTests.java b/spring-boot-project/spring-boot/src/test/java/org/springframework/boot/web/reactive/server/AbstractReactiveWebServerFactoryTests.java index 457b7403fc..dda81654ff 100644 --- a/spring-boot-project/spring-boot/src/test/java/org/springframework/boot/web/reactive/server/AbstractReactiveWebServerFactoryTests.java +++ b/spring-boot-project/spring-boot/src/test/java/org/springframework/boot/web/reactive/server/AbstractReactiveWebServerFactoryTests.java @@ -45,7 +45,6 @@ import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; import reactor.core.publisher.Sinks; import reactor.netty.NettyPipeline; import reactor.netty.http.client.HttpClient; @@ -499,7 +498,7 @@ public abstract class AbstractReactiveWebServerFactoryTests { protected static class BlockingHandler implements HttpHandler { - private final BlockingQueue> processors = new ArrayBlockingQueue<>(10); + private final BlockingQueue> processors = new ArrayBlockingQueue<>(10); private volatile boolean blocking = true; @@ -510,8 +509,8 @@ public abstract class AbstractReactiveWebServerFactoryTests { @Override public Mono handle(ServerHttpRequest request, ServerHttpResponse response) { if (this.blocking) { - Sinks.One completion = Sinks.one(); - this.processors.add(MonoProcessor.fromSink(completion)); + Sinks.Empty completion = Sinks.empty(); + this.processors.add(completion); return completion.asMono().then(Mono.empty()); } return Mono.empty(); @@ -519,8 +518,8 @@ public abstract class AbstractReactiveWebServerFactoryTests { public void completeOne() { try { - MonoProcessor processor = this.processors.take(); - processor.onComplete(); + Sinks.Empty processor = this.processors.take(); + processor.tryEmitEmpty(); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); @@ -535,7 +534,7 @@ public abstract class AbstractReactiveWebServerFactoryTests { public void stopBlocking() { this.blocking = false; - this.processors.forEach(MonoProcessor::onComplete); + this.processors.forEach(Sinks.Empty::tryEmitEmpty); } }