From 0ac934df78a041be938499ba41d65ee1500c4805 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Fri, 21 Jan 2022 15:31:08 -0800 Subject: [PATCH 1/8] Fix ingestion error on missing exception message --- .../internal/telemetry/TelemetryChannel.java | 110 ++++++++++-------- 1 file changed, 61 insertions(+), 49 deletions(-) diff --git a/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryChannel.java b/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryChannel.java index 584464adce5..74866e66592 100644 --- a/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryChannel.java +++ b/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryChannel.java @@ -30,7 +30,9 @@ import com.azure.core.util.tracing.Tracer; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.io.SerializedString; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.microsoft.applicationinsights.agent.internal.common.NetworkFriendlyExceptions; @@ -235,30 +237,35 @@ private CompletableResultCode internalSend( .send(request, Context.of(contextKeyValues)) .subscribe( response -> { - parseResponseCode( - response.getStatusCode(), instrumentationKey, byteBuffers, persisted); - LazyHttpClient.consumeResponseBody(response); - if (!isStatsbeat) { - if (response.getStatusCode() == 200) { - statsbeatModule - .getNetworkStatsbeat() - .incrementRequestSuccessCount( - System.currentTimeMillis() - startTime, instrumentationKey); - } else { - statsbeatModule - .getNetworkStatsbeat() - .incrementRequestFailureCount(instrumentationKey); - } - } - if (!persisted) { - // persisted byte buffers don't come from the pool so shouldn't go back to the pool - byteBufferPool.offer(byteBuffers); - } - if (response.getStatusCode() == 200) { - result.succeed(); - } else { - result.fail(); - } + response + .getBodyAsString() + .subscribe( + body -> { + parseResponseCode(response.getStatusCode(), body, instrumentationKey); + LazyHttpClient.consumeResponseBody(response); + if (!isStatsbeat) { + if (response.getStatusCode() == 200) { + statsbeatModule + .getNetworkStatsbeat() + .incrementRequestSuccessCount( + System.currentTimeMillis() - startTime, instrumentationKey); + } else { + statsbeatModule + .getNetworkStatsbeat() + .incrementRequestFailureCount(instrumentationKey); + } + } + if (!persisted) { + // persisted byte buffers don't come from the pool so shouldn't go back to + // the pool + byteBufferPool.offer(byteBuffers); + } + if (response.getStatusCode() == 200) { + result.succeed(); + } else { + result.fail(); + } + }); }, error -> { // AMPLS @@ -302,24 +309,25 @@ private void writeToDiskOnFailure(List byteBuffers, String instrumen } } - private void parseResponseCode( - int statusCode, String instrumentationKey, List byteBuffers, boolean persisted) { + private void parseResponseCode(int statusCode, String body, String instrumentationKey) { switch (statusCode) { + case 200: // SUCCESS + operationLogger.recordSuccess(); + break; + case 206: // PARTIAL CONTENT, Breeze-specific: PARTIAL SUCCESS + operationLogger.recordFailure(getErrorMessageFromPartialSuccessResponse(body)); + break; case 401: // UNAUTHORIZED case 403: // FORBIDDEN - logger.warn( - "Failed to send telemetry with status code:{}, please check your credentials", - statusCode); - // no need to write to disk again when failing to send raw bytes from the persisted file - if (!persisted) { - writeToDiskOnFailure(byteBuffers, instrumentationKey); - } - break; case 408: // REQUEST TIMEOUT case 500: // INTERNAL SERVER ERROR case 503: // SERVICE UNAVAILABLE case 429: // TOO MANY REQUESTS + // TODO (heya) should we write to disk on any of these response codes? + operationLogger.recordFailure("received response code " + statusCode); + break; case 439: // Breeze-specific: THROTTLED OVER EXTENDED TIME + operationLogger.recordFailure("received response code 439 (throttled over extended time)"); // TODO handle throttling // TODO (heya) track throttling count via Statsbeat // instrumentationKey is null when sending persisted file's raw bytes. @@ -327,22 +335,26 @@ private void parseResponseCode( statsbeatModule.getNetworkStatsbeat().incrementThrottlingCount(instrumentationKey); } break; - case 200: // SUCCESS - operationLogger.recordSuccess(); - break; - case 206: // PARTIAL CONTENT, Breeze-specific: PARTIAL SUCCESS - // TODO handle partial success - break; - case 0: // client-side exception - // TODO exponential backoff and retry to a limit - // TODO (heya) track failure count via Statsbeat - // instrumentationKey is null when sending persisted file's raw bytes. - if (!isStatsbeat) { - statsbeatModule.getNetworkStatsbeat().incrementRetryCount(instrumentationKey); - } - break; default: - // ok + operationLogger.recordFailure("received response code: " + statusCode); + } + } + + private static String getErrorMessageFromPartialSuccessResponse(String body) { + JsonNode jsonNode; + try { + jsonNode = new ObjectMapper().readTree(body); + } catch (JsonProcessingException e) { + return "ingestion service returned 206, but could not parse response as json: " + body; + } + List errors = new ArrayList<>(); + jsonNode.get("errors").forEach(errors::add); + StringBuilder message = new StringBuilder(); + message.append(errors.get(0).get("message").asText()); + int moreErrors = errors.size() - 1; + if (moreErrors > 0) { + message.append(" (and ").append(moreErrors).append(" more)"); } + return message.toString(); } } From 52f8d9c9e7e956b61f4bcc6c4ddb98133cef4e98 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Mon, 24 Jan 2022 14:58:27 -0800 Subject: [PATCH 2/8] One option --- .../localstorage/LocalFileLoader.java | 4 +- .../localstorage/LocalFileSender.java | 16 +- .../internal/telemetry/TelemetryChannel.java | 287 +++++++++++------- .../localstorage/IntegrationTests.java | 15 +- 4 files changed, 212 insertions(+), 110 deletions(-) diff --git a/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/localstorage/LocalFileLoader.java b/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/localstorage/LocalFileLoader.java index d0a671d4e75..14aa3378185 100644 --- a/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/localstorage/LocalFileLoader.java +++ b/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/localstorage/LocalFileLoader.java @@ -163,13 +163,13 @@ private static void readFully(FileInputStream fileInputStream, byte[] byteArray, // either delete it permanently on success or add it back to cache to be processed again later on // failure - public void updateProcessedFileStatus(boolean success, File file) { + public void updateProcessedFileStatus(boolean successOrNonRetryableError, File file) { if (!file.exists()) { // not sure why this would happen updateOperationLogger.recordFailure("File no longer exists: " + file.getName()); return; } - if (success) { + if (successOrNonRetryableError) { // delete a file on the queue permanently when http response returns success. if (!LocalStorageUtils.deleteFileWithRetries(file)) { // TODO (heya) track file deletion failure via Statsbeat diff --git a/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/localstorage/LocalFileSender.java b/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/localstorage/LocalFileSender.java index e5f52485ca0..4febb77e3c7 100644 --- a/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/localstorage/LocalFileSender.java +++ b/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/localstorage/LocalFileSender.java @@ -61,9 +61,21 @@ public void run() { LocalFileLoader.PersistedFile persistedFile = localFileLoader.loadTelemetriesFromDisk(); if (persistedFile != null) { CompletableResultCode resultCode = - telemetryChannel.sendRawBytes(persistedFile.rawBytes, persistedFile.instrumentationKey); + telemetryChannel.sendRawBytes( + persistedFile.rawBytes, + persistedFile.instrumentationKey, + new TelemetryChannel.CompletionListener() { + @Override + public void onSuccess() { + localFileLoader.updateProcessedFileStatus(true, persistedFile.file); + } + + @Override + public void onError(boolean retryable) { + localFileLoader.updateProcessedFileStatus(!retryable, persistedFile.file); + } + }); resultCode.join(30, TimeUnit.SECONDS); // wait max 30 seconds for request to be completed. - localFileLoader.updateProcessedFileStatus(resultCode.isSuccess(), persistedFile.file); } } catch (RuntimeException ex) { logger.error( diff --git a/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryChannel.java b/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryChannel.java index 74866e66592..ed21eeb5a56 100644 --- a/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryChannel.java +++ b/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryChannel.java @@ -26,6 +26,7 @@ import com.azure.core.http.HttpMethod; import com.azure.core.http.HttpPipeline; import com.azure.core.http.HttpRequest; +import com.azure.core.http.HttpResponse; import com.azure.core.util.Context; import com.azure.core.util.tracing.Tracer; import com.fasterxml.jackson.annotation.JsonInclude; @@ -55,6 +56,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.zip.GZIPOutputStream; import javax.annotation.Nullable; import org.slf4j.Logger; @@ -71,9 +73,11 @@ public class TelemetryChannel { private static final AppInsightsByteBufferPool byteBufferPool = new AppInsightsByteBufferPool(); private static final OperationLogger operationLogger = + new OperationLogger(TelemetryChannel.class, "Sending telemetry to the ingestion service"); + + private static final OperationLogger retryOperationLogger = new OperationLogger( - TelemetryChannel.class, - "Sending telemetry to the ingestion service (telemetry will be stored to disk on failure and retried later)"); + TelemetryChannel.class, "Sending telemetry to the ingestion service (retry)"); // TODO (kryalama) do we still need this AtomicBoolean, or can we use throttling built in to the // operationLogger? @@ -93,7 +97,7 @@ private static ObjectMapper createObjectMapper() { private final HttpPipeline pipeline; private final URL endpointUrl; - @Nullable private final LocalFileWriter localFileWriter; + private final LocalFileWriter localFileWriter; private final StatsbeatModule statsbeatModule; private final boolean isStatsbeat; @@ -109,8 +113,10 @@ public static TelemetryChannel create( httpPipeline, endpointUrl, localFileWriter, statsbeatModule, isStatsbeat); } - public CompletableResultCode sendRawBytes(ByteBuffer buffer, String instrumentationKey) { - return internalSend(singletonList(buffer), instrumentationKey, true); + public CompletableResultCode sendRawBytes( + ByteBuffer buffer, String instrumentationKey, CompletionListener completionListener) { + return internalSend( + singletonList(buffer), instrumentationKey, completionListener, retryOperationLogger); } // used by tests only @@ -155,7 +161,8 @@ public CompletableResultCode internalSendByInstrumentationKey( return CompletableResultCode.ofFailure(); } try { - return internalSend(byteBuffers, instrumentationKey, false); + return internalSend( + byteBuffers, instrumentationKey, new ReturnByteBuffers(byteBuffers), operationLogger); } catch (Throwable t) { operationLogger.recordFailure("Error sending telemetry items: " + t.getMessage(), t); return CompletableResultCode.ofFailure(); @@ -203,7 +210,10 @@ private static void writeTelemetryItems(JsonGenerator jg, List te * sent as {@code List}. Persisted telemetries will be sent as byte[] */ private CompletableResultCode internalSend( - List byteBuffers, String instrumentationKey, boolean persisted) { + List byteBuffers, + String instrumentationKey, + CompletionListener completionListener, + OperationLogger operationLogger) { HttpRequest request = new HttpRequest(HttpMethod.POST, endpointUrl); request.setBody(Flux.fromIterable(byteBuffers)); @@ -236,110 +246,103 @@ private CompletableResultCode internalSend( pipeline .send(request, Context.of(contextKeyValues)) .subscribe( - response -> { - response - .getBodyAsString() - .subscribe( - body -> { - parseResponseCode(response.getStatusCode(), body, instrumentationKey); - LazyHttpClient.consumeResponseBody(response); - if (!isStatsbeat) { - if (response.getStatusCode() == 200) { - statsbeatModule - .getNetworkStatsbeat() - .incrementRequestSuccessCount( - System.currentTimeMillis() - startTime, instrumentationKey); - } else { - statsbeatModule - .getNetworkStatsbeat() - .incrementRequestFailureCount(instrumentationKey); - } - } - if (!persisted) { - // persisted byte buffers don't come from the pool so shouldn't go back to - // the pool - byteBufferPool.offer(byteBuffers); - } - if (response.getStatusCode() == 200) { - result.succeed(); - } else { - result.fail(); - } - }); - }, - error -> { - // AMPLS - if (isStatsbeat && error instanceof UnknownHostException) { - // when sending a Statsbeat request and server returns an UnknownHostException, it's - // likely that - // it's using a virtual network. In that case, we use the kill-switch to turn off - // Statsbeat. - statsbeatModule.shutdown(); - } else { - if (!NetworkFriendlyExceptions.logSpecialOneTimeFriendlyException( - error, endpointUrl.toString(), friendlyExceptionThrown, logger)) { - operationLogger.recordFailure( - "Error sending telemetry items: " + error.getMessage(), error); - } - - if (!isStatsbeat) { - statsbeatModule - .getNetworkStatsbeat() - .incrementRequestFailureCount(instrumentationKey); - } - // no need to write to disk again when failing to send raw bytes from the persisted - // file - if (!persisted) { - writeToDiskOnFailure(byteBuffers, instrumentationKey); - } - } - - if (!persisted) { - // persisted byte buffers don't come from the pool so shouldn't go back to the pool - byteBufferPool.offer(byteBuffers); - } - result.fail(); - }); + responseHandler(instrumentationKey, startTime, completionListener, operationLogger), + errorHandler(instrumentationKey, completionListener, operationLogger)); return result; } - private void writeToDiskOnFailure(List byteBuffers, String instrumentationKey) { - if (localFileWriter != null) { - localFileWriter.writeToDisk(byteBuffers, instrumentationKey); - } + private Consumer responseHandler( + String instrumentationKey, + long startTime, + CompletionListener completionListener, + OperationLogger operationLogger) { + + return response -> + response + .getBodyAsString() + .subscribe( + body -> { + int statusCode = response.getStatusCode(); + switch (statusCode) { + case 200: // SUCCESS + operationLogger.recordSuccess(); + completionListener.onSuccess(); + break; + case 206: // PARTIAL CONTENT, Breeze-specific: PARTIAL SUCCESS + operationLogger.recordFailure( + getErrorMessageFromPartialSuccessResponse(body)); + completionListener.onError(false); + break; + case 408: // REQUEST TIMEOUT + case 429: // TOO MANY REQUESTS + case 500: // INTERNAL SERVER ERROR + case 503: // SERVICE UNAVAILABLE + operationLogger.recordFailure( + "received response code " + + statusCode + + " (telemetry will be stored to disk and retried later)"); + completionListener.onError(true); + break; + case 439: // Breeze-specific: THROTTLED OVER EXTENDED TIME + // TODO handle throttling + operationLogger.recordFailure( + "received response code 439 (throttled over extended time)"); + completionListener.onError(false); + break; + default: + operationLogger.recordFailure("received response code: " + statusCode); + completionListener.onError(false); + } + LazyHttpClient.consumeResponseBody(response); + if (!isStatsbeat) { + handleStatsbeatOnResponse(instrumentationKey, startTime, statusCode); + } + }); } - private void parseResponseCode(int statusCode, String body, String instrumentationKey) { - switch (statusCode) { - case 200: // SUCCESS - operationLogger.recordSuccess(); - break; - case 206: // PARTIAL CONTENT, Breeze-specific: PARTIAL SUCCESS - operationLogger.recordFailure(getErrorMessageFromPartialSuccessResponse(body)); - break; - case 401: // UNAUTHORIZED - case 403: // FORBIDDEN - case 408: // REQUEST TIMEOUT - case 500: // INTERNAL SERVER ERROR - case 503: // SERVICE UNAVAILABLE - case 429: // TOO MANY REQUESTS - // TODO (heya) should we write to disk on any of these response codes? - operationLogger.recordFailure("received response code " + statusCode); - break; - case 439: // Breeze-specific: THROTTLED OVER EXTENDED TIME - operationLogger.recordFailure("received response code 439 (throttled over extended time)"); - // TODO handle throttling - // TODO (heya) track throttling count via Statsbeat - // instrumentationKey is null when sending persisted file's raw bytes. - if (!isStatsbeat) { - statsbeatModule.getNetworkStatsbeat().incrementThrottlingCount(instrumentationKey); - } - break; - default: - operationLogger.recordFailure("received response code: " + statusCode); + private void handleStatsbeatOnResponse( + String instrumentationKey, long startTime, int statusCode) { + if (statusCode == 200) { + statsbeatModule + .getNetworkStatsbeat() + .incrementRequestSuccessCount(System.currentTimeMillis() - startTime, instrumentationKey); + } else { + statsbeatModule.getNetworkStatsbeat().incrementRequestFailureCount(instrumentationKey); + } + if (statusCode == 439) { + statsbeatModule.getNetworkStatsbeat().incrementThrottlingCount(instrumentationKey); } } + private Consumer errorHandler( + String instrumentationKey, + CompletionListener completionListener, + OperationLogger operationLogger) { + + return error -> { + // AMPLS + if (isStatsbeat && error instanceof UnknownHostException) { + // when sending a Statsbeat request and server returns an UnknownHostException, it's + // likely that it's using a virtual network. In that case, we use the kill-switch to + // turn off Statsbeat. + statsbeatModule.shutdown(); + completionListener.onError(false); + } else if (NetworkFriendlyExceptions.logSpecialOneTimeFriendlyException( + error, endpointUrl.toString(), friendlyExceptionThrown, logger)) { + // don't log failure as that happened in logSpecialOneTimeFriendlyException + completionListener.onError(true); + } else { + operationLogger.recordFailure( + "Error sending telemetry items: " + error.getMessage(), error); + completionListener.onError(true); + } + + if (!isStatsbeat) { + statsbeatModule.getNetworkStatsbeat().incrementRequestFailureCount(instrumentationKey); + } + }; + } + private static String getErrorMessageFromPartialSuccessResponse(String body) { JsonNode jsonNode; try { @@ -357,4 +360,82 @@ private static String getErrorMessageFromPartialSuccessResponse(String body) { } return message.toString(); } + + public interface CompletionListener { + void onSuccess(); + + void onError(boolean retryable); + } + + public static class NoopCompletionListener implements CompletionListener { + + @Override + public void onSuccess() {} + + @Override + public void onError(boolean retryable) {} + } + + public static class CompositeCompletionListener implements CompletionListener { + + private final List delegates; + + public CompositeCompletionListener(List delegates) { + this.delegates = delegates; + } + + @Override + public void onSuccess() { + for (CompletionListener delegate : delegates) { + delegate.onSuccess(); + } + } + + @Override + public void onError(boolean retryable) { + for (CompletionListener delegate : delegates) { + delegate.onError(retryable); + } + } + } + + public static class ReturnByteBuffers implements CompletionListener { + + private final List byteBuffers; + + public ReturnByteBuffers(List byteBuffers) { + this.byteBuffers = byteBuffers; + } + + @Override + public void onSuccess() { + byteBufferPool.offer(byteBuffers); + } + + @Override + public void onError(boolean retryable) { + byteBufferPool.offer(byteBuffers); + } + } + + public class WriteToDiskOnRetryableFailure implements CompletionListener { + + private final List byteBuffers; + private final String instrumentationKey; + + public WriteToDiskOnRetryableFailure(List byteBuffers, String instrumentationKey) { + this.byteBuffers = byteBuffers; + this.instrumentationKey = instrumentationKey; + } + + @Override + public void onSuccess() {} + + @Override + public void onError(boolean retryable) { + if (retryable) { + localFileWriter.writeToDisk(byteBuffers, instrumentationKey); + } + } + } } diff --git a/agent/agent-tooling/src/test/java/com/microsoft/applicationinsights/agent/internal/localstorage/IntegrationTests.java b/agent/agent-tooling/src/test/java/com/microsoft/applicationinsights/agent/internal/localstorage/IntegrationTests.java index 88e5979b884..b84a6363280 100644 --- a/agent/agent-tooling/src/test/java/com/microsoft/applicationinsights/agent/internal/localstorage/IntegrationTests.java +++ b/agent/agent-tooling/src/test/java/com/microsoft/applicationinsights/agent/internal/localstorage/IntegrationTests.java @@ -35,6 +35,8 @@ import com.azure.core.util.Context; import com.microsoft.applicationinsights.agent.internal.common.TestUtils; import com.microsoft.applicationinsights.agent.internal.exporter.models.TelemetryItem; +import com.microsoft.applicationinsights.agent.internal.statsbeat.NetworkStatsbeat; +import com.microsoft.applicationinsights.agent.internal.statsbeat.StatsbeatModule; import com.microsoft.applicationinsights.agent.internal.telemetry.TelemetryChannel; import io.opentelemetry.sdk.common.CompletableResultCode; import java.io.ByteArrayInputStream; @@ -58,6 +60,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mockito; import reactor.core.publisher.Mono; public class IntegrationTests { @@ -84,23 +87,26 @@ public void setup() throws Exception { () -> new Exception("this is expected to be logged by the operation logger"))); } else { - // 401 and 403 response codes are the only ones the result in storing to disk + // 408, 429, 500, and 503 response codes are the only ones the result in storing to disk when(mockedClient.send(any(HttpRequest.class), any(Context.class))) .then( invocation -> Mono.just( - new MockHttpResponse(invocation.getArgument(0, HttpRequest.class), 401))); + new MockHttpResponse(invocation.getArgument(0, HttpRequest.class), 500))); } HttpPipelineBuilder pipelineBuilder = new HttpPipelineBuilder().httpClient(mockedClient); localFileCache = new LocalFileCache(tempFolder); localFileLoader = new LocalFileLoader(localFileCache, tempFolder, null); + StatsbeatModule statsbeatModule = Mockito.mock(StatsbeatModule.class); + when(statsbeatModule.getNetworkStatsbeat()).thenReturn(Mockito.mock(NetworkStatsbeat.class)); + telemetryChannel = new TelemetryChannel( pipelineBuilder.build(), new URL("http://foo.bar"), new LocalFileWriter(localFileCache, tempFolder, null), - null, + statsbeatModule, false); } @@ -127,6 +133,9 @@ public void integrationTest() throws Exception { executorService.shutdown(); executorService.awaitTermination(10, TimeUnit.MINUTES); + + Thread.sleep(1000); + assertThat(localFileCache.getPersistedFilesCache().size()).isEqualTo(100); for (int i = 100; i > 0; i--) { From 71f0919753c0cdba708748ed13c48d872501f26c Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Mon, 24 Jan 2022 14:58:32 -0800 Subject: [PATCH 3/8] Better option --- .../localstorage/LocalFileLoader.java | 2 +- .../localstorage/LocalFileSender.java | 14 +- .../internal/telemetry/TelemetryChannel.java | 152 ++++++------------ .../localstorage/LocalFileLoaderTests.java | 6 +- 4 files changed, 58 insertions(+), 116 deletions(-) diff --git a/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/localstorage/LocalFileLoader.java b/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/localstorage/LocalFileLoader.java index 14aa3378185..3982d92edec 100644 --- a/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/localstorage/LocalFileLoader.java +++ b/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/localstorage/LocalFileLoader.java @@ -114,7 +114,7 @@ PersistedFile loadTelemetriesFromDisk() { byte[] ikeyBytes = new byte[36]; int rawByteLength = (int) tempFile.length() - 36; byte[] telemetryBytes = new byte[rawByteLength]; - String instrumentationKey = null; + String instrumentationKey; try (FileInputStream fileInputStream = new FileInputStream(tempFile)) { readFully(fileInputStream, ikeyBytes, 36); instrumentationKey = new String(ikeyBytes, UTF_8); diff --git a/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/localstorage/LocalFileSender.java b/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/localstorage/LocalFileSender.java index 4febb77e3c7..265b923b461 100644 --- a/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/localstorage/LocalFileSender.java +++ b/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/localstorage/LocalFileSender.java @@ -64,17 +64,9 @@ public void run() { telemetryChannel.sendRawBytes( persistedFile.rawBytes, persistedFile.instrumentationKey, - new TelemetryChannel.CompletionListener() { - @Override - public void onSuccess() { - localFileLoader.updateProcessedFileStatus(true, persistedFile.file); - } - - @Override - public void onError(boolean retryable) { - localFileLoader.updateProcessedFileStatus(!retryable, persistedFile.file); - } - }); + () -> localFileLoader.updateProcessedFileStatus(true, persistedFile.file), + retryable -> + localFileLoader.updateProcessedFileStatus(!retryable, persistedFile.file)); resultCode.join(30, TimeUnit.SECONDS); // wait max 30 seconds for request to be completed. } } catch (RuntimeException ex) { diff --git a/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryChannel.java b/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryChannel.java index ed21eeb5a56..065c56c42bf 100644 --- a/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryChannel.java +++ b/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryChannel.java @@ -114,9 +114,12 @@ public static TelemetryChannel create( } public CompletableResultCode sendRawBytes( - ByteBuffer buffer, String instrumentationKey, CompletionListener completionListener) { + ByteBuffer buffer, + String instrumentationKey, + Runnable onSuccess, + Consumer onFailure) { return internalSend( - singletonList(buffer), instrumentationKey, completionListener, retryOperationLogger); + singletonList(buffer), instrumentationKey, onSuccess, onFailure, retryOperationLogger); } // used by tests only @@ -162,7 +165,14 @@ public CompletableResultCode internalSendByInstrumentationKey( } try { return internalSend( - byteBuffers, instrumentationKey, new ReturnByteBuffers(byteBuffers), operationLogger); + byteBuffers, + instrumentationKey, + () -> byteBufferPool.offer(byteBuffers), + retryable -> { + localFileWriter.writeToDisk(byteBuffers, instrumentationKey); + byteBufferPool.offer(byteBuffers); + }, + operationLogger); } catch (Throwable t) { operationLogger.recordFailure("Error sending telemetry items: " + t.getMessage(), t); return CompletableResultCode.ofFailure(); @@ -212,7 +222,8 @@ private static void writeTelemetryItems(JsonGenerator jg, List te private CompletableResultCode internalSend( List byteBuffers, String instrumentationKey, - CompletionListener completionListener, + Runnable onSuccess, + Consumer onFailure, OperationLogger operationLogger) { HttpRequest request = new HttpRequest(HttpMethod.POST, endpointUrl); @@ -246,15 +257,33 @@ private CompletableResultCode internalSend( pipeline .send(request, Context.of(contextKeyValues)) .subscribe( - responseHandler(instrumentationKey, startTime, completionListener, operationLogger), - errorHandler(instrumentationKey, completionListener, operationLogger)); + responseHandler( + instrumentationKey, + startTime, + () -> { + onSuccess.run(); + result.succeed(); + }, + retryable -> { + onFailure.accept(retryable); + result.fail(); + }, + operationLogger), + errorHandler( + instrumentationKey, + retryable -> { + onFailure.accept(retryable); + result.fail(); + }, + operationLogger)); return result; } private Consumer responseHandler( String instrumentationKey, long startTime, - CompletionListener completionListener, + Runnable onSuccess, + Consumer onFailure, OperationLogger operationLogger) { return response -> @@ -266,12 +295,12 @@ private Consumer responseHandler( switch (statusCode) { case 200: // SUCCESS operationLogger.recordSuccess(); - completionListener.onSuccess(); + onSuccess.run(); break; case 206: // PARTIAL CONTENT, Breeze-specific: PARTIAL SUCCESS operationLogger.recordFailure( getErrorMessageFromPartialSuccessResponse(body)); - completionListener.onError(false); + onFailure.accept(false); break; case 408: // REQUEST TIMEOUT case 429: // TOO MANY REQUESTS @@ -281,17 +310,17 @@ private Consumer responseHandler( "received response code " + statusCode + " (telemetry will be stored to disk and retried later)"); - completionListener.onError(true); + onFailure.accept(true); break; case 439: // Breeze-specific: THROTTLED OVER EXTENDED TIME // TODO handle throttling operationLogger.recordFailure( "received response code 439 (throttled over extended time)"); - completionListener.onError(false); + onFailure.accept(false); break; default: operationLogger.recordFailure("received response code: " + statusCode); - completionListener.onError(false); + onFailure.accept(false); } LazyHttpClient.consumeResponseBody(response); if (!isStatsbeat) { @@ -315,31 +344,28 @@ private void handleStatsbeatOnResponse( } private Consumer errorHandler( - String instrumentationKey, - CompletionListener completionListener, - OperationLogger operationLogger) { + String instrumentationKey, Consumer onFailure, OperationLogger operationLogger) { return error -> { - // AMPLS if (isStatsbeat && error instanceof UnknownHostException) { // when sending a Statsbeat request and server returns an UnknownHostException, it's - // likely that it's using a virtual network. In that case, we use the kill-switch to - // turn off Statsbeat. + // likely that it's using AMPLS. In that case, we use the kill-switch to turn off Statsbeat. statsbeatModule.shutdown(); - completionListener.onError(false); - } else if (NetworkFriendlyExceptions.logSpecialOneTimeFriendlyException( + onFailure.accept(false); + return; + } + + if (!NetworkFriendlyExceptions.logSpecialOneTimeFriendlyException( error, endpointUrl.toString(), friendlyExceptionThrown, logger)) { - // don't log failure as that happened in logSpecialOneTimeFriendlyException - completionListener.onError(true); - } else { operationLogger.recordFailure( "Error sending telemetry items: " + error.getMessage(), error); - completionListener.onError(true); } if (!isStatsbeat) { statsbeatModule.getNetworkStatsbeat().incrementRequestFailureCount(instrumentationKey); } + + onFailure.accept(true); }; } @@ -360,82 +386,4 @@ private static String getErrorMessageFromPartialSuccessResponse(String body) { } return message.toString(); } - - public interface CompletionListener { - void onSuccess(); - - void onError(boolean retryable); - } - - public static class NoopCompletionListener implements CompletionListener { - - @Override - public void onSuccess() {} - - @Override - public void onError(boolean retryable) {} - } - - public static class CompositeCompletionListener implements CompletionListener { - - private final List delegates; - - public CompositeCompletionListener(List delegates) { - this.delegates = delegates; - } - - @Override - public void onSuccess() { - for (CompletionListener delegate : delegates) { - delegate.onSuccess(); - } - } - - @Override - public void onError(boolean retryable) { - for (CompletionListener delegate : delegates) { - delegate.onError(retryable); - } - } - } - - public static class ReturnByteBuffers implements CompletionListener { - - private final List byteBuffers; - - public ReturnByteBuffers(List byteBuffers) { - this.byteBuffers = byteBuffers; - } - - @Override - public void onSuccess() { - byteBufferPool.offer(byteBuffers); - } - - @Override - public void onError(boolean retryable) { - byteBufferPool.offer(byteBuffers); - } - } - - public class WriteToDiskOnRetryableFailure implements CompletionListener { - - private final List byteBuffers; - private final String instrumentationKey; - - public WriteToDiskOnRetryableFailure(List byteBuffers, String instrumentationKey) { - this.byteBuffers = byteBuffers; - this.instrumentationKey = instrumentationKey; - } - - @Override - public void onSuccess() {} - - @Override - public void onError(boolean retryable) { - if (retryable) { - localFileWriter.writeToDisk(byteBuffers, instrumentationKey); - } - } - } } diff --git a/agent/agent-tooling/src/test/java/com/microsoft/applicationinsights/agent/internal/localstorage/LocalFileLoaderTests.java b/agent/agent-tooling/src/test/java/com/microsoft/applicationinsights/agent/internal/localstorage/LocalFileLoaderTests.java index ddeab73929d..5f7e3996224 100644 --- a/agent/agent-tooling/src/test/java/com/microsoft/applicationinsights/agent/internal/localstorage/LocalFileLoaderTests.java +++ b/agent/agent-tooling/src/test/java/com/microsoft/applicationinsights/agent/internal/localstorage/LocalFileLoaderTests.java @@ -277,7 +277,8 @@ public void testDeleteFilePermanentlyOnSuccess() throws Exception { for (int i = 0; i < 10; i++) { LocalFileLoader.PersistedFile persistedFile = localFileLoader.loadTelemetriesFromDisk(); CompletableResultCode completableResultCode = - telemetryChannel.sendRawBytes(persistedFile.rawBytes, persistedFile.instrumentationKey); + telemetryChannel.sendRawBytes( + persistedFile.rawBytes, persistedFile.instrumentationKey, () -> {}, retryable -> {}); completableResultCode.join(10, SECONDS); assertThat(completableResultCode.isSuccess()).isEqualTo(true); localFileLoader.updateProcessedFileStatus(true, persistedFile.file); @@ -331,7 +332,8 @@ public void testDeleteFilePermanentlyOnFailure() throws Exception { assertThat(persistedFile.instrumentationKey).isEqualTo(INSTRUMENTATION_KEY); CompletableResultCode completableResultCode = - telemetryChannel.sendRawBytes(persistedFile.rawBytes, persistedFile.instrumentationKey); + telemetryChannel.sendRawBytes( + persistedFile.rawBytes, persistedFile.instrumentationKey, () -> {}, retryable -> {}); completableResultCode.join(10, SECONDS); assertThat(completableResultCode.isSuccess()).isEqualTo(false); localFileLoader.updateProcessedFileStatus(false, persistedFile.file); From 7e26ea7d906fab24fc0818199f007550a03e5465 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Mon, 24 Jan 2022 17:04:25 -0800 Subject: [PATCH 4/8] Fix --- .../agent/internal/telemetry/TelemetryChannel.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryChannel.java b/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryChannel.java index 065c56c42bf..83318c28701 100644 --- a/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryChannel.java +++ b/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryChannel.java @@ -62,6 +62,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; // TODO performance testing public class TelemetryChannel { @@ -72,6 +73,7 @@ public class TelemetryChannel { private static final AppInsightsByteBufferPool byteBufferPool = new AppInsightsByteBufferPool(); + // TODO (heya) should we suppress logging statsbeat telemetry ingestion issues? private static final OperationLogger operationLogger = new OperationLogger(TelemetryChannel.class, "Sending telemetry to the ingestion service"); @@ -81,7 +83,7 @@ public class TelemetryChannel { // TODO (kryalama) do we still need this AtomicBoolean, or can we use throttling built in to the // operationLogger? - private static final AtomicBoolean friendlyExceptionThrown = new AtomicBoolean(); + private final AtomicBoolean friendlyExceptionThrown = new AtomicBoolean(); @SuppressWarnings("CatchAndPrintStackTrace") private static ObjectMapper createObjectMapper() { @@ -289,6 +291,7 @@ private Consumer responseHandler( return response -> response .getBodyAsString() + .switchIfEmpty(Mono.just("")) .subscribe( body -> { int statusCode = response.getStatusCode(); @@ -322,10 +325,13 @@ private Consumer responseHandler( operationLogger.recordFailure("received response code: " + statusCode); onFailure.accept(false); } - LazyHttpClient.consumeResponseBody(response); if (!isStatsbeat) { handleStatsbeatOnResponse(instrumentationKey, startTime, statusCode); } + }, + exception -> { + operationLogger.recordFailure("exception retrieving response body", exception); + onFailure.accept(false); }); } @@ -355,6 +361,7 @@ private Consumer errorHandler( return; } + // TODO (trask) only log one-time friendly exception if no prior successes if (!NetworkFriendlyExceptions.logSpecialOneTimeFriendlyException( error, endpointUrl.toString(), friendlyExceptionThrown, logger)) { operationLogger.recordFailure( From 1c8c2a87164996d39934f2c8afe85b955ade4573 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Mon, 24 Jan 2022 18:15:05 -0800 Subject: [PATCH 5/8] TEMP --- .../agent/internal/telemetry/TelemetryChannel.java | 1 + 1 file changed, 1 insertion(+) diff --git a/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryChannel.java b/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryChannel.java index 83318c28701..96abb65472d 100644 --- a/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryChannel.java +++ b/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryChannel.java @@ -305,6 +305,7 @@ private Consumer responseHandler( getErrorMessageFromPartialSuccessResponse(body)); onFailure.accept(false); break; + case 403: // AUTHORIZATION (AAD) case 408: // REQUEST TIMEOUT case 429: // TOO MANY REQUESTS case 500: // INTERNAL SERVER ERROR From 461eeb1b429dd83296d8459e8ac37d09d249baaa Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Tue, 25 Jan 2022 13:00:30 -0800 Subject: [PATCH 6/8] Retry on 401/403 --- .../agent/internal/telemetry/TelemetryChannel.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryChannel.java b/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryChannel.java index 96abb65472d..83f55bb938c 100644 --- a/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryChannel.java +++ b/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryChannel.java @@ -305,7 +305,9 @@ private Consumer responseHandler( getErrorMessageFromPartialSuccessResponse(body)); onFailure.accept(false); break; - case 403: // AUTHORIZATION (AAD) + case 401: // breeze returns if aad enabled and no authentication token provided + case 403: // breeze returns if aad enabled or disabled (both cases) and + // wrong/expired credentials provided case 408: // REQUEST TIMEOUT case 429: // TOO MANY REQUESTS case 500: // INTERNAL SERVER ERROR From da2c214dfaf8446d4e5c04cba55b89ba641883a9 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Tue, 25 Jan 2022 13:12:20 -0800 Subject: [PATCH 7/8] Update --- .../agent/internal/localstorage/IntegrationTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/agent/agent-tooling/src/test/java/com/microsoft/applicationinsights/agent/internal/localstorage/IntegrationTests.java b/agent/agent-tooling/src/test/java/com/microsoft/applicationinsights/agent/internal/localstorage/IntegrationTests.java index b84a6363280..9338159efb4 100644 --- a/agent/agent-tooling/src/test/java/com/microsoft/applicationinsights/agent/internal/localstorage/IntegrationTests.java +++ b/agent/agent-tooling/src/test/java/com/microsoft/applicationinsights/agent/internal/localstorage/IntegrationTests.java @@ -87,12 +87,12 @@ public void setup() throws Exception { () -> new Exception("this is expected to be logged by the operation logger"))); } else { - // 408, 429, 500, and 503 response codes are the only ones the result in storing to disk + // 401, 403, 408, 429, 500, and 503 response codes result in storing to disk when(mockedClient.send(any(HttpRequest.class), any(Context.class))) .then( invocation -> Mono.just( - new MockHttpResponse(invocation.getArgument(0, HttpRequest.class), 500))); + new MockHttpResponse(invocation.getArgument(0, HttpRequest.class), 401))); } HttpPipelineBuilder pipelineBuilder = new HttpPipelineBuilder().httpClient(mockedClient); localFileCache = new LocalFileCache(tempFolder); From adff32f79001e03b2d414d395fb3435774fe5ee3 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Tue, 25 Jan 2022 13:37:45 -0800 Subject: [PATCH 8/8] Spotless --- .../agent/internal/telemetry/TelemetryChannel.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryChannel.java b/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryChannel.java index 83f55bb938c..ab1400d7b35 100644 --- a/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryChannel.java +++ b/agent/agent-tooling/src/main/java/com/microsoft/applicationinsights/agent/internal/telemetry/TelemetryChannel.java @@ -307,7 +307,7 @@ private Consumer responseHandler( break; case 401: // breeze returns if aad enabled and no authentication token provided case 403: // breeze returns if aad enabled or disabled (both cases) and - // wrong/expired credentials provided + // wrong/expired credentials provided case 408: // REQUEST TIMEOUT case 429: // TOO MANY REQUESTS case 500: // INTERNAL SERVER ERROR