From a58aba35e3f6214dde5415c279590facd8f70f26 Mon Sep 17 00:00:00 2001 From: Georgios Andrianakis Date: Tue, 20 Feb 2024 18:01:00 +0200 Subject: [PATCH] Make VertxGrpcExporter more robust Introduce retry when a network related error occurs Relates to: #35686 --- .../exporter/otlp/VertxGrpcExporter.java | 107 ++++++++++++++---- 1 file changed, 85 insertions(+), 22 deletions(-) diff --git a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxGrpcExporter.java b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxGrpcExporter.java index 5e034cdbd24163..eccac35f3c0f51 100644 --- a/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxGrpcExporter.java +++ b/extensions/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/exporter/otlp/VertxGrpcExporter.java @@ -87,8 +87,9 @@ private CompletableResultCode export(TraceRequestMarshaler marshaler, int numIte exporterMetrics.addSeen(numItems); var result = new CompletableResultCode(); - var onSuccessHandler = new ClientRequestOnSuccessHandler(headers, compressionEnabled, exporterMetrics, marshaler, - loggedUnimplemented, logger, type, numItems, result); + var onSuccessHandler = new ClientRequestOnSuccessHandler(client, server, headers, compressionEnabled, exporterMetrics, + marshaler, + loggedUnimplemented, logger, type, numItems, result, 1); client.request(server) .onSuccess(onSuccessHandler) .onFailure(new Handler<>() { @@ -97,12 +98,23 @@ public void handle(Throwable t) { // TODO: is there a better way todo retry? // TODO: should we only retry on a specific errors? + // try a 2nd time + var newOnSuccessHandler = onSuccessHandler.newAttempt(); client.request(server) - .onSuccess(onSuccessHandler) + .onSuccess(newOnSuccessHandler) .onFailure(new Handler<>() { @Override public void handle(Throwable event) { - failOnClientRequest(numItems, t, result); + + // try a 3rd time + client.request(server) + .onSuccess(newOnSuccessHandler.newAttempt()) + .onFailure(new Handler<>() { + @Override + public void handle(Throwable event) { + failOnClientRequest(numItems, t, result); + } + }); } }); } @@ -146,6 +158,9 @@ public CompletableResultCode shutdown() { private static final class ClientRequestOnSuccessHandler implements Handler> { + private static final int MAX_ATTEMPTS = 3; + private final GrpcClient client; + private final SocketAddress server; private final Map headers; private final boolean compressionEnabled; private final ExporterMetrics exporterMetrics; @@ -157,7 +172,11 @@ private static final class ClientRequestOnSuccessHandler implements Handler headers, + private final int attemptNumber; + + public ClientRequestOnSuccessHandler(GrpcClient client, + SocketAddress server, + Map headers, boolean compressionEnabled, ExporterMetrics exporterMetrics, TraceRequestMarshaler marshaler, @@ -165,7 +184,10 @@ public ClientRequestOnSuccessHandler(Map headers, ThrottlingLogger logger, String type, int numItems, - CompletableResultCode result) { + CompletableResultCode result, + int attemptNumber) { + this.client = client; + this.server = server; this.headers = headers; this.compressionEnabled = compressionEnabled; this.exporterMetrics = exporterMetrics; @@ -175,6 +197,7 @@ public ClientRequestOnSuccessHandler(Map headers, this.type = type; this.numItems = numItems; this.result = result; + this.attemptNumber = attemptNumber; } @Override @@ -205,14 +228,26 @@ public void handle(GrpcClientResponse response) { response.exceptionHandler(new Handler<>() { @Override public void handle(Throwable t) { - exporterMetrics.addFailed(numItems); - logger.log( - Level.SEVERE, - "Failed to export " - + type - + "s. The stream failed. Full error message: " - + t.getMessage()); - result.fail(); + if (attemptNumber <= MAX_ATTEMPTS) { + // retry + client.request(server) + .onSuccess(newAttempt()) + .onFailure(new Handler<>() { + @Override + public void handle(Throwable event) { + failOnClientRequest(numItems, t, result); + } + }); + } else { + exporterMetrics.addFailed(numItems); + logger.log( + Level.SEVERE, + "Failed to export " + + type + + "s. The stream failed. Full error message: " + + t.getMessage()); + result.fail(); + } } }).errorHandler(new Handler<>() { @Override @@ -336,14 +371,26 @@ private String getStatusMessage(GrpcClientResponse response) { }).onFailure(new Handler<>() { @Override public void handle(Throwable t) { - exporterMetrics.addFailed(numItems); - logger.log( - Level.SEVERE, - "Failed to export " - + type - + "s. The request could not be executed. Full error message: " - + t.getMessage()); - result.fail(); + if (attemptNumber <= MAX_ATTEMPTS) { + // retry + client.request(server) + .onSuccess(newAttempt()) + .onFailure(new Handler<>() { + @Override + public void handle(Throwable event) { + failOnClientRequest(numItems, t, result); + } + }); + } else { + exporterMetrics.addFailed(numItems); + logger.log( + Level.SEVERE, + "Failed to export " + + type + + "s. The request could not be executed. Full error message: " + + t.getMessage()); + result.fail(); + } } }); } catch (IOException e) { @@ -357,5 +404,21 @@ public void handle(Throwable t) { result.fail(); } } + + private void failOnClientRequest(int numItems, Throwable t, CompletableResultCode result) { + exporterMetrics.addFailed(numItems); + logger.log( + Level.SEVERE, + "Failed to export " + + type + + "s. The request could not be executed. Full error message: " + + t.getMessage()); + result.fail(); + } + + public ClientRequestOnSuccessHandler newAttempt() { + return new ClientRequestOnSuccessHandler(client, server, headers, compressionEnabled, exporterMetrics, marshaler, + loggedUnimplemented, logger, type, numItems, result, attemptNumber + 1); + } } }