Polish "Add WebClient based sender for Zipkin"

See gh-30792
pull/31494/head
Moritz Halbritter 2 years ago
parent 12037bd131
commit bb6c56e5f0

@ -111,11 +111,14 @@ abstract class HttpSender extends Sender {
} }
protected byte[] getBody() { protected byte[] getBody() {
return getBody(true); if (needsCompression()) {
return compress(this.body);
}
return this.body;
} }
protected byte[] getBody(boolean compressIfNeeded) { protected byte[] getUncompressedBody() {
return (compressIfNeeded && needsCompression()) ? compress(this.body) : this.body; return this.body;
} }
protected HttpHeaders getDefaultHeaders() { protected HttpHeaders getDefaultHeaders() {

@ -41,6 +41,7 @@ import org.springframework.web.reactive.function.client.WebClient;
* Configurations for Zipkin. Those are imported by {@link ZipkinAutoConfiguration}. * Configurations for Zipkin. Those are imported by {@link ZipkinAutoConfiguration}.
* *
* @author Moritz Halbritter * @author Moritz Halbritter
* @author Stefan Bratanov
*/ */
class ZipkinConfigurations { class ZipkinConfigurations {

@ -59,7 +59,7 @@ class ZipkinRestTemplateSender extends HttpSender {
@Override @Override
public Call<Void> clone() { public Call<Void> clone() {
return new RestTemplateHttpPostCall(this.endpoint, getBody(false), this.restTemplate); return new RestTemplateHttpPostCall(this.endpoint, getUncompressedBody(), this.restTemplate);
} }
@Override @Override

@ -58,7 +58,7 @@ class ZipkinWebClientSender extends HttpSender {
@Override @Override
public Call<Void> clone() { public Call<Void> clone() {
return new WebClientHttpPostCall(this.endpoint, getBody(false), this.webClient); return new WebClientHttpPostCall(this.endpoint, getUncompressedBody(), this.webClient);
} }
@Override @Override

@ -24,7 +24,6 @@ import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.awaitility.Awaitility; import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import zipkin2.Callback; import zipkin2.Callback;
@ -42,39 +41,34 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
*/ */
abstract class ZipkinHttpSenderTests { abstract class ZipkinHttpSenderTests {
protected Sender senderUnderTest; protected Sender sut;
abstract Sender getZipkinSender(); abstract Sender createSut();
@BeforeEach @BeforeEach
void setUp() { void setUp() {
this.senderUnderTest = getZipkinSender(); this.sut = createSut();
} }
@Test @Test
void sendSpansShouldThrowIfCloseWasCalled() throws IOException { void sendSpansShouldThrowIfCloseWasCalled() throws IOException {
this.senderUnderTest.close(); this.sut.close();
assertThatThrownBy(() -> this.senderUnderTest.sendSpans(List.of())).isInstanceOf(ClosedSenderException.class); assertThatThrownBy(() -> this.sut.sendSpans(List.of())).isInstanceOf(ClosedSenderException.class);
} }
protected void makeRequest(List<byte[]> encodedSpans, boolean async) { protected void makeRequest(List<byte[]> encodedSpans, boolean async) throws IOException {
if (async) { if (async) {
CallbackResult callbackResult = this.makeAsyncRequest(encodedSpans); CallbackResult callbackResult = this.makeAsyncRequest(encodedSpans);
assertThat(callbackResult.isSuccess()).isTrue(); assertThat(callbackResult.success()).isTrue();
} }
else { else {
try { this.makeSyncRequest(encodedSpans);
this.makeSyncRequest(encodedSpans);
}
catch (IOException ex) {
Assertions.fail(ex);
}
} }
} }
protected CallbackResult makeAsyncRequest(List<byte[]> encodedSpans) { protected CallbackResult makeAsyncRequest(List<byte[]> encodedSpans) {
AtomicReference<CallbackResult> callbackResult = new AtomicReference<>(); AtomicReference<CallbackResult> callbackResult = new AtomicReference<>();
this.senderUnderTest.sendSpans(encodedSpans).enqueue(new Callback<>() { this.sut.sendSpans(encodedSpans).enqueue(new Callback<>() {
@Override @Override
public void onSuccess(Void value) { public void onSuccess(Void value) {
callbackResult.set(new CallbackResult(true, null)); callbackResult.set(new CallbackResult(true, null));
@ -89,32 +83,14 @@ abstract class ZipkinHttpSenderTests {
} }
protected void makeSyncRequest(List<byte[]> encodedSpans) throws IOException { protected void makeSyncRequest(List<byte[]> encodedSpans) throws IOException {
this.senderUnderTest.sendSpans(encodedSpans).execute(); this.sut.sendSpans(encodedSpans).execute();
} }
protected byte[] toByteArray(String input) { protected byte[] toByteArray(String input) {
return input.getBytes(StandardCharsets.UTF_8); return input.getBytes(StandardCharsets.UTF_8);
} }
protected static final class CallbackResult { record CallbackResult(boolean success, Throwable error) {
private final boolean isSuccess;
private final Throwable error;
private CallbackResult(boolean isSuccess, Throwable error) {
this.isSuccess = isSuccess;
this.error = error;
}
public boolean isSuccess() {
return this.isSuccess;
}
public Throwable getError() {
return this.error;
}
} }
} }

@ -16,6 +16,7 @@
package org.springframework.boot.actuate.autoconfigure.tracing.zipkin; package org.springframework.boot.actuate.autoconfigure.tracing.zipkin;
import java.io.IOException;
import java.util.Base64; import java.util.Base64;
import java.util.List; import java.util.List;
@ -52,7 +53,7 @@ class ZipkinRestTemplateSenderTests extends ZipkinHttpSenderTests {
private MockRestServiceServer mockServer; private MockRestServiceServer mockServer;
@Override @Override
Sender getZipkinSender() { Sender createSut() {
RestTemplate restTemplate = new RestTemplate(); RestTemplate restTemplate = new RestTemplate();
this.mockServer = MockRestServiceServer.createServer(restTemplate); this.mockServer = MockRestServiceServer.createServer(restTemplate);
return new ZipkinRestTemplateSender(ZIPKIN_URL, restTemplate); return new ZipkinRestTemplateSender(ZIPKIN_URL, restTemplate);
@ -67,21 +68,21 @@ class ZipkinRestTemplateSenderTests extends ZipkinHttpSenderTests {
void checkShouldSendEmptySpanList() { void checkShouldSendEmptySpanList() {
this.mockServer.expect(requestTo(ZIPKIN_URL)).andExpect(method(HttpMethod.POST)) this.mockServer.expect(requestTo(ZIPKIN_URL)).andExpect(method(HttpMethod.POST))
.andExpect(content().string("[]")).andRespond(withStatus(HttpStatus.ACCEPTED)); .andExpect(content().string("[]")).andRespond(withStatus(HttpStatus.ACCEPTED));
assertThat(this.senderUnderTest.check()).isEqualTo(CheckResult.OK); assertThat(this.sut.check()).isEqualTo(CheckResult.OK);
} }
@Test @Test
void checkShouldNotRaiseException() { void checkShouldNotRaiseException() {
this.mockServer.expect(requestTo(ZIPKIN_URL)).andExpect(method(HttpMethod.POST)) this.mockServer.expect(requestTo(ZIPKIN_URL)).andExpect(method(HttpMethod.POST))
.andRespond(withStatus(HttpStatus.INTERNAL_SERVER_ERROR)); .andRespond(withStatus(HttpStatus.INTERNAL_SERVER_ERROR));
CheckResult result = this.senderUnderTest.check(); CheckResult result = this.sut.check();
assertThat(result.ok()).isFalse(); assertThat(result.ok()).isFalse();
assertThat(result.error()).hasMessageContaining("500 Internal Server Error"); assertThat(result.error()).hasMessageContaining("500 Internal Server Error");
} }
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = { true, false }) @ValueSource(booleans = { true, false })
void sendSpansShouldSendSpansToZipkin(boolean async) { void sendSpansShouldSendSpansToZipkin(boolean async) throws IOException {
this.mockServer.expect(requestTo(ZIPKIN_URL)).andExpect(method(HttpMethod.POST)) this.mockServer.expect(requestTo(ZIPKIN_URL)).andExpect(method(HttpMethod.POST))
.andExpect(content().contentType("application/json")).andExpect(content().string("[span1,span2]")) .andExpect(content().contentType("application/json")).andExpect(content().string("[span1,span2]"))
.andRespond(withStatus(HttpStatus.ACCEPTED)); .andRespond(withStatus(HttpStatus.ACCEPTED));
@ -95,8 +96,8 @@ class ZipkinRestTemplateSenderTests extends ZipkinHttpSenderTests {
.andRespond(withStatus(HttpStatus.INTERNAL_SERVER_ERROR)); .andRespond(withStatus(HttpStatus.INTERNAL_SERVER_ERROR));
if (async) { if (async) {
CallbackResult callbackResult = this.makeAsyncRequest(List.of()); CallbackResult callbackResult = this.makeAsyncRequest(List.of());
assertThat(callbackResult.isSuccess()).isFalse(); assertThat(callbackResult.success()).isFalse();
assertThat(callbackResult.getError()).isNotNull().hasMessageContaining("500 Internal Server Error"); assertThat(callbackResult.error()).isNotNull().hasMessageContaining("500 Internal Server Error");
} }
else { else {
assertThatThrownBy(() -> this.makeSyncRequest(List.of())).hasMessageContaining("500 Internal Server Error"); assertThatThrownBy(() -> this.makeSyncRequest(List.of())).hasMessageContaining("500 Internal Server Error");
@ -105,7 +106,7 @@ class ZipkinRestTemplateSenderTests extends ZipkinHttpSenderTests {
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = { true, false }) @ValueSource(booleans = { true, false })
void sendSpansShouldCompressData(boolean async) { void sendSpansShouldCompressData(boolean async) throws IOException {
String uncompressed = "a".repeat(10000); String uncompressed = "a".repeat(10000);
// This is gzip compressed 10000 times 'a' // This is gzip compressed 10000 times 'a'
byte[] compressed = Base64.getDecoder() byte[] compressed = Base64.getDecoder()

@ -25,7 +25,6 @@ import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest; import okhttp3.mockwebserver.RecordedRequest;
import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
@ -62,15 +61,15 @@ class ZipkinWebClientSenderTests extends ZipkinHttpSenderTests {
} }
@Override @Override
Sender getZipkinSender() { Sender createSut() {
WebClient webClient = WebClient.builder().build(); WebClient webClient = WebClient.builder().build();
return new ZipkinWebClientSender(ZIPKIN_URL, webClient); return new ZipkinWebClientSender(ZIPKIN_URL, webClient);
} }
@Test @Test
void checkShouldSendEmptySpanList() { void checkShouldSendEmptySpanList() throws InterruptedException {
mockBackEnd.enqueue(new MockResponse()); mockBackEnd.enqueue(new MockResponse());
assertThat(this.senderUnderTest.check()).isEqualTo(CheckResult.OK); assertThat(this.sut.check()).isEqualTo(CheckResult.OK);
requestAssertions((request) -> { requestAssertions((request) -> {
assertThat(request.getMethod()).isEqualTo("POST"); assertThat(request.getMethod()).isEqualTo("POST");
@ -79,9 +78,9 @@ class ZipkinWebClientSenderTests extends ZipkinHttpSenderTests {
} }
@Test @Test
void checkShouldNotRaiseException() { void checkShouldNotRaiseException() throws InterruptedException {
mockBackEnd.enqueue(new MockResponse().setResponseCode(500)); mockBackEnd.enqueue(new MockResponse().setResponseCode(500));
CheckResult result = this.senderUnderTest.check(); CheckResult result = this.sut.check();
assertThat(result.ok()).isFalse(); assertThat(result.ok()).isFalse();
assertThat(result.error()).hasMessageContaining("500 Internal Server Error"); assertThat(result.error()).hasMessageContaining("500 Internal Server Error");
@ -90,7 +89,7 @@ class ZipkinWebClientSenderTests extends ZipkinHttpSenderTests {
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = { true, false }) @ValueSource(booleans = { true, false })
void sendSpansShouldSendSpansToZipkin(boolean async) { void sendSpansShouldSendSpansToZipkin(boolean async) throws IOException, InterruptedException {
mockBackEnd.enqueue(new MockResponse()); mockBackEnd.enqueue(new MockResponse());
List<byte[]> encodedSpans = List.of(toByteArray("span1"), toByteArray("span2")); List<byte[]> encodedSpans = List.of(toByteArray("span1"), toByteArray("span2"));
this.makeRequest(encodedSpans, async); this.makeRequest(encodedSpans, async);
@ -104,12 +103,12 @@ class ZipkinWebClientSenderTests extends ZipkinHttpSenderTests {
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = { true, false }) @ValueSource(booleans = { true, false })
void sendSpansShouldHandleHttpFailures(boolean async) { void sendSpansShouldHandleHttpFailures(boolean async) throws InterruptedException {
mockBackEnd.enqueue(new MockResponse().setResponseCode(500)); mockBackEnd.enqueue(new MockResponse().setResponseCode(500));
if (async) { if (async) {
CallbackResult callbackResult = this.makeAsyncRequest(List.of()); CallbackResult callbackResult = this.makeAsyncRequest(List.of());
assertThat(callbackResult.isSuccess()).isFalse(); assertThat(callbackResult.success()).isFalse();
assertThat(callbackResult.getError()).isNotNull().hasMessageContaining("500 Internal Server Error"); assertThat(callbackResult.error()).isNotNull().hasMessageContaining("500 Internal Server Error");
} }
else { else {
assertThatThrownBy(() -> this.makeSyncRequest(List.of())).hasMessageContaining("500 Internal Server Error"); assertThatThrownBy(() -> this.makeSyncRequest(List.of())).hasMessageContaining("500 Internal Server Error");
@ -120,7 +119,7 @@ class ZipkinWebClientSenderTests extends ZipkinHttpSenderTests {
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = { true, false }) @ValueSource(booleans = { true, false })
void sendSpansShouldCompressData(boolean async) { void sendSpansShouldCompressData(boolean async) throws IOException, InterruptedException {
String uncompressed = "a".repeat(10000); String uncompressed = "a".repeat(10000);
// This is gzip compressed 10000 times 'a' // This is gzip compressed 10000 times 'a'
byte[] compressed = Base64.getDecoder() byte[] compressed = Base64.getDecoder()
@ -139,14 +138,9 @@ class ZipkinWebClientSenderTests extends ZipkinHttpSenderTests {
} }
private void requestAssertions(Consumer<RecordedRequest> assertions) { private void requestAssertions(Consumer<RecordedRequest> assertions) throws InterruptedException {
try { RecordedRequest request = mockBackEnd.takeRequest();
RecordedRequest request = mockBackEnd.takeRequest(); assertThat(request).satisfies(assertions);
assertThat(request).satisfies(assertions);
}
catch (InterruptedException ex) {
Assertions.fail(ex);
}
} }
} }

Loading…
Cancel
Save