Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log ingestion error on 206 response #2065

Merged
merged 11 commits into from
Jan 25, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import com.azure.core.util.tracing.Tracer;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.io.SerializedString;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.microsoft.applicationinsights.agent.internal.common.NetworkFriendlyExceptions;
Expand Down Expand Up @@ -235,30 +237,35 @@ private CompletableResultCode internalSend(
.send(request, Context.of(contextKeyValues))
.subscribe(
response -> {
parseResponseCode(
response.getStatusCode(), instrumentationKey, byteBuffers, persisted);
LazyHttpClient.consumeResponseBody(response);
if (!isStatsbeat) {
if (response.getStatusCode() == 200) {
statsbeatModule
.getNetworkStatsbeat()
.incrementRequestSuccessCount(
System.currentTimeMillis() - startTime, instrumentationKey);
} else {
statsbeatModule
.getNetworkStatsbeat()
.incrementRequestFailureCount(instrumentationKey);
}
}
if (!persisted) {
// persisted byte buffers don't come from the pool so shouldn't go back to the pool
byteBufferPool.offer(byteBuffers);
}
if (response.getStatusCode() == 200) {
result.succeed();
} else {
result.fail();
}
response
.getBodyAsString()
.subscribe(
body -> {
parseResponseCode(response.getStatusCode(), body, instrumentationKey);
LazyHttpClient.consumeResponseBody(response);
if (!isStatsbeat) {
if (response.getStatusCode() == 200) {
statsbeatModule
.getNetworkStatsbeat()
.incrementRequestSuccessCount(
System.currentTimeMillis() - startTime, instrumentationKey);
} else {
statsbeatModule
.getNetworkStatsbeat()
.incrementRequestFailureCount(instrumentationKey);
}
}
if (!persisted) {
// persisted byte buffers don't come from the pool so shouldn't go back to
// the pool
byteBufferPool.offer(byteBuffers);
}
if (response.getStatusCode() == 200) {
result.succeed();
} else {
result.fail();
}
});
},
error -> {
// AMPLS
Expand Down Expand Up @@ -302,47 +309,52 @@ private void writeToDiskOnFailure(List<ByteBuffer> byteBuffers, String instrumen
}
}

private void parseResponseCode(
int statusCode, String instrumentationKey, List<ByteBuffer> byteBuffers, boolean persisted) {
private void parseResponseCode(int statusCode, String body, String instrumentationKey) {
switch (statusCode) {
case 200: // SUCCESS
operationLogger.recordSuccess();
break;
case 206: // PARTIAL CONTENT, Breeze-specific: PARTIAL SUCCESS
operationLogger.recordFailure(getErrorMessageFromPartialSuccessResponse(body));
break;
case 401: // UNAUTHORIZED
case 403: // FORBIDDEN
logger.warn(
"Failed to send telemetry with status code:{}, please check your credentials",
statusCode);
// no need to write to disk again when failing to send raw bytes from the persisted file
if (!persisted) {
writeToDiskOnFailure(byteBuffers, instrumentationKey);
trask marked this conversation as resolved.
Show resolved Hide resolved
}
break;
case 408: // REQUEST TIMEOUT
case 500: // INTERNAL SERVER ERROR
case 503: // SERVICE UNAVAILABLE
case 429: // TOO MANY REQUESTS
// TODO (heya) should we write to disk on any of these response codes?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure... if we don't write to disk, we should consider tracking how many requests we drop. Let's think about it.. need to come up with new property in customdimension.. it can be part of network statsbeat..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you update the todo to track data drop via statsbeat?

operationLogger.recordFailure("received response code " + statusCode);
break;
case 439: // Breeze-specific: THROTTLED OVER EXTENDED TIME
operationLogger.recordFailure("received response code 439 (throttled over extended time)");
// TODO handle throttling
// TODO (heya) track throttling count via Statsbeat
// instrumentationKey is null when sending persisted file's raw bytes.
if (!isStatsbeat) {
statsbeatModule.getNetworkStatsbeat().incrementThrottlingCount(instrumentationKey);
}
break;
case 200: // SUCCESS
operationLogger.recordSuccess();
break;
case 206: // PARTIAL CONTENT, Breeze-specific: PARTIAL SUCCESS
// TODO handle partial success
break;
case 0: // client-side exception
// TODO exponential backoff and retry to a limit
// TODO (heya) track failure count via Statsbeat
// instrumentationKey is null when sending persisted file's raw bytes.
if (!isStatsbeat) {
statsbeatModule.getNetworkStatsbeat().incrementRetryCount(instrumentationKey);
}
break;
default:
// ok
operationLogger.recordFailure("received response code: " + statusCode);
}
}

private static String getErrorMessageFromPartialSuccessResponse(String body) {
trask marked this conversation as resolved.
Show resolved Hide resolved
JsonNode jsonNode;
try {
jsonNode = new ObjectMapper().readTree(body);
} catch (JsonProcessingException e) {
return "ingestion service returned 206, but could not parse response as json: " + body;
}
List<JsonNode> errors = new ArrayList<>();
jsonNode.get("errors").forEach(errors::add);
StringBuilder message = new StringBuilder();
message.append(errors.get(0).get("message").asText());
int moreErrors = errors.size() - 1;
if (moreErrors > 0) {
message.append(" (and ").append(moreErrors).append(" more)");
}
return message.toString();
}
}