Skip to content

Commit

Permalink
Make VertxGrpcExporter more robust
Browse files Browse the repository at this point in the history
Introduce retry when a network related error occurs

Relates to: #35686
  • Loading branch information
geoand committed Feb 20, 2024
1 parent ccef139 commit a58aba3
Showing 1 changed file with 85 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ private CompletableResultCode export(TraceRequestMarshaler marshaler, int numIte
exporterMetrics.addSeen(numItems);

var result = new CompletableResultCode();
var onSuccessHandler = new ClientRequestOnSuccessHandler(headers, compressionEnabled, exporterMetrics, marshaler,
loggedUnimplemented, logger, type, numItems, result);
var onSuccessHandler = new ClientRequestOnSuccessHandler(client, server, headers, compressionEnabled, exporterMetrics,
marshaler,
loggedUnimplemented, logger, type, numItems, result, 1);
client.request(server)
.onSuccess(onSuccessHandler)
.onFailure(new Handler<>() {
Expand All @@ -97,12 +98,23 @@ public void handle(Throwable t) {
// TODO: is there a better way todo retry?
// TODO: should we only retry on a specific errors?

// try a 2nd time
var newOnSuccessHandler = onSuccessHandler.newAttempt();
client.request(server)
.onSuccess(onSuccessHandler)
.onSuccess(newOnSuccessHandler)
.onFailure(new Handler<>() {
@Override
public void handle(Throwable event) {
failOnClientRequest(numItems, t, result);

// try a 3rd time
client.request(server)
.onSuccess(newOnSuccessHandler.newAttempt())
.onFailure(new Handler<>() {
@Override
public void handle(Throwable event) {
failOnClientRequest(numItems, t, result);
}
});
}
});
}
Expand Down Expand Up @@ -146,6 +158,9 @@ public CompletableResultCode shutdown() {

private static final class ClientRequestOnSuccessHandler implements Handler<GrpcClientRequest<Buffer, Buffer>> {

private static final int MAX_ATTEMPTS = 3;
private final GrpcClient client;
private final SocketAddress server;
private final Map<String, String> headers;
private final boolean compressionEnabled;
private final ExporterMetrics exporterMetrics;
Expand All @@ -157,15 +172,22 @@ private static final class ClientRequestOnSuccessHandler implements Handler<Grpc
private final int numItems;
private final CompletableResultCode result;

public ClientRequestOnSuccessHandler(Map<String, String> headers,
private final int attemptNumber;

public ClientRequestOnSuccessHandler(GrpcClient client,
SocketAddress server,
Map<String, String> headers,
boolean compressionEnabled,
ExporterMetrics exporterMetrics,
TraceRequestMarshaler marshaler,
AtomicBoolean loggedUnimplemented,
ThrottlingLogger logger,
String type,
int numItems,
CompletableResultCode result) {
CompletableResultCode result,
int attemptNumber) {
this.client = client;
this.server = server;
this.headers = headers;
this.compressionEnabled = compressionEnabled;
this.exporterMetrics = exporterMetrics;
Expand All @@ -175,6 +197,7 @@ public ClientRequestOnSuccessHandler(Map<String, String> headers,
this.type = type;
this.numItems = numItems;
this.result = result;
this.attemptNumber = attemptNumber;
}

@Override
Expand Down Expand Up @@ -205,14 +228,26 @@ public void handle(GrpcClientResponse<Buffer, Buffer> response) {
response.exceptionHandler(new Handler<>() {
@Override
public void handle(Throwable t) {
exporterMetrics.addFailed(numItems);
logger.log(
Level.SEVERE,
"Failed to export "
+ type
+ "s. The stream failed. Full error message: "
+ t.getMessage());
result.fail();
if (attemptNumber <= MAX_ATTEMPTS) {
// retry
client.request(server)
.onSuccess(newAttempt())
.onFailure(new Handler<>() {
@Override
public void handle(Throwable event) {
failOnClientRequest(numItems, t, result);
}
});
} else {
exporterMetrics.addFailed(numItems);
logger.log(
Level.SEVERE,
"Failed to export "
+ type
+ "s. The stream failed. Full error message: "
+ t.getMessage());
result.fail();
}
}
}).errorHandler(new Handler<>() {
@Override
Expand Down Expand Up @@ -336,14 +371,26 @@ private String getStatusMessage(GrpcClientResponse<Buffer, Buffer> response) {
}).onFailure(new Handler<>() {
@Override
public void handle(Throwable t) {
exporterMetrics.addFailed(numItems);
logger.log(
Level.SEVERE,
"Failed to export "
+ type
+ "s. The request could not be executed. Full error message: "
+ t.getMessage());
result.fail();
if (attemptNumber <= MAX_ATTEMPTS) {
// retry
client.request(server)
.onSuccess(newAttempt())
.onFailure(new Handler<>() {
@Override
public void handle(Throwable event) {
failOnClientRequest(numItems, t, result);
}
});
} else {
exporterMetrics.addFailed(numItems);
logger.log(
Level.SEVERE,
"Failed to export "
+ type
+ "s. The request could not be executed. Full error message: "
+ t.getMessage());
result.fail();
}
}
});
} catch (IOException e) {
Expand All @@ -357,5 +404,21 @@ public void handle(Throwable t) {
result.fail();
}
}

private void failOnClientRequest(int numItems, Throwable t, CompletableResultCode result) {
exporterMetrics.addFailed(numItems);
logger.log(
Level.SEVERE,
"Failed to export "
+ type
+ "s. The request could not be executed. Full error message: "
+ t.getMessage());
result.fail();
}

public ClientRequestOnSuccessHandler newAttempt() {
return new ClientRequestOnSuccessHandler(client, server, headers, compressionEnabled, exporterMetrics, marshaler,
loggedUnimplemented, logger, type, numItems, result, attemptNumber + 1);
}
}
}

0 comments on commit a58aba3

Please sign in to comment.