Replace use of deprecated Reactor MonoProcessor in tests

Fixes gh-23627
pull/23632/head
Scott Frederick 4 years ago
parent f2721abf31
commit 0f9e8315bc

@ -45,7 +45,6 @@ import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks;
import reactor.netty.NettyPipeline; import reactor.netty.NettyPipeline;
import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClient;
@ -499,7 +498,7 @@ public abstract class AbstractReactiveWebServerFactoryTests {
protected static class BlockingHandler implements HttpHandler { protected static class BlockingHandler implements HttpHandler {
private final BlockingQueue<MonoProcessor<Void>> processors = new ArrayBlockingQueue<>(10); private final BlockingQueue<Sinks.Empty<Void>> processors = new ArrayBlockingQueue<>(10);
private volatile boolean blocking = true; private volatile boolean blocking = true;
@ -510,8 +509,8 @@ public abstract class AbstractReactiveWebServerFactoryTests {
@Override @Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) { public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
if (this.blocking) { if (this.blocking) {
Sinks.One<Void> completion = Sinks.one(); Sinks.Empty<Void> completion = Sinks.empty();
this.processors.add(MonoProcessor.fromSink(completion)); this.processors.add(completion);
return completion.asMono().then(Mono.empty()); return completion.asMono().then(Mono.empty());
} }
return Mono.empty(); return Mono.empty();
@ -519,8 +518,8 @@ public abstract class AbstractReactiveWebServerFactoryTests {
public void completeOne() { public void completeOne() {
try { try {
MonoProcessor<Void> processor = this.processors.take(); Sinks.Empty<Void> processor = this.processors.take();
processor.onComplete(); processor.tryEmitEmpty();
} }
catch (InterruptedException ex) { catch (InterruptedException ex) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@ -535,7 +534,7 @@ public abstract class AbstractReactiveWebServerFactoryTests {
public void stopBlocking() { public void stopBlocking() {
this.blocking = false; this.blocking = false;
this.processors.forEach(MonoProcessor::onComplete); this.processors.forEach(Sinks.Empty::tryEmitEmpty);
} }
} }

Loading…
Cancel
Save