From 3e82403fc67109d35bda067eaa66035988c1c2d8 Mon Sep 17 00:00:00 2001 From: Neha Rao Date: Sun, 24 Nov 2024 20:27:04 -0800 Subject: [PATCH 01/11] float vmId to rntbd layer --- .../RntbdTransportClientTest.java | 11 ++-- .../RntbdClientChannelHealthCheckerTests.java | 60 ++++++++++++++++--- .../rntbd/RntbdRequestManagerTests.java | 18 +++++- .../RntbdClientChannelHealthChecker.java | 21 +++++-- .../rntbd/RntbdClientChannelPool.java | 11 ++-- 5 files changed, 96 insertions(+), 25 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java index 03d295a8a662..f0641d762c98 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java @@ -37,6 +37,7 @@ import com.azure.cosmos.implementation.UserAgentContainer; import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.apachecommons.lang.NotImplementedException; +import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry; import com.azure.cosmos.implementation.clienttelemetry.TagName; import com.azure.cosmos.implementation.directconnectivity.rntbd.AsyncRntbdRequestRecord; import com.azure.cosmos.implementation.directconnectivity.rntbd.OpenConnectionRntbdRequestRecord; @@ -984,7 +985,7 @@ private static final class FakeEndpoint implements RntbdEndpoint { private final RntbdDurableEndpointMetrics durableEndpointMetrics; private FakeEndpoint( - final Config config, final RntbdRequestTimer timer, final Uri addressUri, + final Config config, final ClientTelemetry clientTelemetry, RntbdRequestTimer timer, final Uri addressUri, final RntbdResponse... expected ) { @@ -1013,7 +1014,7 @@ private FakeEndpoint( ); RntbdRequestManager requestManager = new RntbdRequestManager( - new RntbdClientChannelHealthChecker(config), + new RntbdClientChannelHealthChecker(config, clientTelemetry), config, null, null); @@ -1183,12 +1184,14 @@ public OpenConnectionRntbdRequestRecord openConnection(RntbdRequestArgs openConn static class Provider implements RntbdEndpoint.Provider { final Config config; + final ClientTelemetry clientTelemetry; final RntbdResponse expected; final RntbdRequestTimer timer; final IAddressResolver addressResolver; Provider(RntbdTransportClient.Options options, SslContext sslContext, RntbdResponse expected, IAddressResolver addressResolver) { this.config = new Config(options, sslContext, LogLevel.WARN); + this.clientTelemetry = new ClientTelemetry(mockDiagnosticsClientContext(), false, null, null, null, null, null, null, null, null, null, null); this.timer = new RntbdRequestTimer( config.tcpNetworkRequestTimeoutInNanos(), config.requestTimerResolutionInNanos()); @@ -1218,12 +1221,12 @@ public int evictions() { @Override public RntbdEndpoint createIfAbsent(URI serviceEndpoint, Uri addressUri, ProactiveOpenConnectionsProcessor proactiveOpenConnectionsProcessor, int minRequiredChannelsForEndpoint, AddressSelector addressSelector) { - return new FakeEndpoint(config, timer, addressUri, expected); + return new FakeEndpoint(config, clientTelemetry, timer, addressUri, expected); } @Override public RntbdEndpoint get(URI physicalAddress) { - return new FakeEndpoint(config, timer, new Uri(physicalAddress.toString()), expected); + return new FakeEndpoint(config, clientTelemetry, timer, new Uri(physicalAddress.toString()), expected); } @Override diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthCheckerTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthCheckerTests.java index 973a1a9b6c19..21028f72d091 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthCheckerTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthCheckerTests.java @@ -4,6 +4,8 @@ package com.azure.cosmos.implementation.directconnectivity.rntbd; import com.azure.cosmos.implementation.ConnectionPolicy; +import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry; +import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetryInfo; import com.azure.cosmos.implementation.cpu.CpuLoadHistory; import com.azure.cosmos.implementation.cpu.CpuMemoryMonitor; import com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient; @@ -16,6 +18,7 @@ import io.netty.handler.logging.LogLevel; import io.netty.handler.ssl.SslContext; import io.netty.util.concurrent.Future; +import org.mockito.Mock; import org.mockito.MockedStatic; import org.mockito.Mockito; import org.testng.annotations.DataProvider; @@ -48,7 +51,12 @@ public void isHealthyForWriteHangTests(boolean withFailureReason) throws Interru sslContextMock, LogLevel.INFO); - RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config); + ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class); + ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class); + Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock); + Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId"); + + RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock); Channel channelMock = Mockito.mock(Channel.class); ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class); RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class); @@ -74,6 +82,7 @@ public void isHealthyForWriteHangTests(boolean withFailureReason) throws Interru assertThat(healthyResult.isSuccess()).isTrue(); assertThat(healthyResult.getNow()).isNotEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue); assertThat(healthyResult.getNow().contains("health check failed due to non-responding write")); + assertThat(healthyResult.getNow().contains("clientVmId: testClientVmId")); } else { Future healthyResult = healthChecker.isHealthy(channelMock).sync(); assertThat(healthyResult.isSuccess()).isTrue(); @@ -90,7 +99,12 @@ public void isHealthyForReadHangTests(boolean withFailureReason) throws Interrup sslContextMock, LogLevel.INFO); - RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config); + ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class); + ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class); + Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock); + Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId"); + + RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock); Channel channelMock = Mockito.mock(Channel.class); ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class); RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class); @@ -131,7 +145,12 @@ public void transitTimeoutTimeLimitTests(boolean withFailureReason) throws Inter sslContextMock, LogLevel.INFO); - RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config); + ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class); + ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class); + Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock); + Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId"); + + RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock); Channel channelMock = Mockito.mock(Channel.class); ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class); RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class); @@ -174,7 +193,12 @@ public void transitTimeoutHighFrequencyTests(boolean withFailureReason) throws I sslContextMock, LogLevel.INFO); - RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config); + ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class); + ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class); + Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock); + Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId"); + + RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock); Channel channelMock = Mockito.mock(Channel.class); ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class); RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class); @@ -218,7 +242,12 @@ public void transitTimeoutOnWriteTests(boolean withFailureReason) throws Interru sslContextMock, LogLevel.INFO); - RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config); + ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class); + ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class); + Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock); + Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId"); + + RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock); Channel channelMock = Mockito.mock(Channel.class); ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class); RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class); @@ -264,7 +293,12 @@ public void transitTimeoutOnWrite_HighCPULoadTests(boolean withFailureReason) th sslContextMock, LogLevel.INFO); - RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config); + ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class); + ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class); + Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock); + Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId"); + + RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock); Channel channelMock = Mockito.mock(Channel.class); ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class); RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class); @@ -322,7 +356,12 @@ public void cancellationPronenessOfChannel_Test(boolean withFailureReason) throw sslContextMock, LogLevel.INFO); - RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config); + ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class); + ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class); + Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock); + Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId"); + + RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock); Channel channelMock = Mockito.mock(Channel.class); ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class); RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class); @@ -375,7 +414,12 @@ public void cancellationPronenessOfChannelWithHighCpuLoad_Test(boolean withFailu sslContextMock, LogLevel.INFO); - RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config); + ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class); + ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class); + Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock); + Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId"); + + RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock); Channel channelMock = Mockito.mock(Channel.class); ChannelPipeline channelPipelineMock = Mockito.mock(ChannelPipeline.class); RntbdRequestManager rntbdRequestManagerMock = Mockito.mock(RntbdRequestManager.class); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManagerTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManagerTests.java index ef4662420145..64908b863660 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManagerTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManagerTests.java @@ -7,6 +7,8 @@ import com.azure.cosmos.implementation.OperationType; import com.azure.cosmos.implementation.ResourceType; import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry; +import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetryInfo; import com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient; import com.azure.cosmos.implementation.directconnectivity.Uri; import io.netty.channel.Channel; @@ -40,7 +42,13 @@ public void transitTimeoutTimestampTests() throws URISyntaxException { new RntbdTransportClient.Options.Builder(ConnectionPolicy.getDefaultPolicy()).build(), sslContextMock, LogLevel.INFO); - RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config); + + ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class); + ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class); + Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock); + Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId"); + + RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock); RntbdConnectionStateListener connectionStateListener = Mockito.mock(RntbdConnectionStateListener.class); @@ -119,7 +127,13 @@ public void rntbdContextResponseTests() { new RntbdTransportClient.Options.Builder(ConnectionPolicy.getDefaultPolicy()).build(), sslContextMock, LogLevel.INFO); - RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config); + + ClientTelemetry clientTelemetryMock = Mockito.mock(ClientTelemetry.class); + ClientTelemetryInfo clientTelemetryInfoMock = Mockito.mock(ClientTelemetryInfo.class); + Mockito.when(clientTelemetryMock.getClientTelemetryInfo()).thenReturn(clientTelemetryInfoMock); + Mockito.when(clientTelemetryInfoMock.getMachineId()).thenReturn("testClientVmId"); + + RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetryMock); RntbdConnectionStateListener connectionStateListener = Mockito.mock(RntbdConnectionStateListener.class); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java index 6e890534adb1..9eb2a3e85ec5 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java @@ -4,6 +4,7 @@ package com.azure.cosmos.implementation.directconnectivity.rntbd; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; +import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry; import com.azure.cosmos.implementation.cpu.CpuMemoryMonitor; import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpoint.Config; import com.fasterxml.jackson.annotation.JsonProperty; @@ -75,12 +76,19 @@ public final class RntbdClientChannelHealthChecker implements ChannelHealthCheck private final long nonRespondingChannelReadDelayTimeLimitInNanos; @JsonProperty private final int cancellationCountSinceLastReadThreshold; + // VM ID of the SDK client instance. + // We have this property in client diagnostics, but they can only be obtained if + // an operation fails or succeeds. If an operation hangs, the diagnostics will not + // be available. Floating this value to the health check logs will give us access to + // the client VM ID in scenarios where the operation hangs. + @JsonProperty + private final String clientVmId; // endregion // region Constructors - public RntbdClientChannelHealthChecker(final Config config) { + public RntbdClientChannelHealthChecker(final Config config, final ClientTelemetry clientTelemetry) { checkNotNull(config, "expected non-null config"); @@ -104,6 +112,7 @@ public RntbdClientChannelHealthChecker(final Config config) { this.timeoutOnWriteTimeLimitInNanos = config.timeoutDetectionOnWriteTimeLimitInNanos(); this.nonRespondingChannelReadDelayTimeLimitInNanos = config.nonRespondingChannelReadDelayTimeLimitInNanos(); this.cancellationCountSinceLastReadThreshold = config.cancellationCountSinceLastReadThreshold(); + this.clientVmId = clientTelemetry.getClientTelemetryInfo().getMachineId(); } // endregion @@ -266,14 +275,15 @@ private String isWriteHang(Timestamps timestamps, Instant currentTime, RntbdRequ writeHangMessage = MessageFormat.format( "{0} health check failed due to non-responding write: [lastChannelWriteAttemptTime: {1}, " + "lastChannelWriteTime: {2}, writeDelayInNanos: {3}, writeDelayLimitInNanos: {4}, " + - "rntbdContext: {5}, pendingRequestCount: {6}]", + "rntbdContext: {5}, pendingRequestCount: {6}, clientVmId: {7}]", channel, timestamps.lastChannelWriteAttemptTime(), timestamps.lastChannelWriteTime(), writeDelayInNanos, this.writeDelayLimitInNanos, rntbdContext, - pendingRequestCount); + pendingRequestCount, + this.clientVmId); logger.warn(writeHangMessage); } @@ -298,14 +308,15 @@ private String isReadHang(Timestamps timestamps, Instant currentTime, RntbdReque readHangMessage = MessageFormat.format( "{0} health check failed due to non-responding read: [lastChannelWrite: {1}, lastChannelRead: {2}, " - + "readDelay: {3}, readDelayLimit: {4}, rntbdContext: {5}, pendingRequestCount: {6}]", + + "readDelay: {3}, readDelayLimit: {4}, rntbdContext: {5}, pendingRequestCount: {6}, clientVmId: {7}]", channel, timestamps.lastChannelWriteTime(), timestamps.lastChannelReadTime(), readDelay, this.readDelayLimitInNanos, rntbdContext, - pendingRequestCount); + pendingRequestCount, + this.clientVmId); logger.warn(readHangMessage); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java index f0bd021b11ee..97e987edc6e3 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java @@ -206,7 +206,7 @@ public final class RntbdClientChannelPool implements ChannelPool { * @param durableEndpointMetrics a holder for the metric state (which should be * durable for endpoints with the same address) */ - RntbdClientChannelPool( +/* RntbdClientChannelPool( final RntbdServiceEndpoint endpoint, final Bootstrap bootstrap, final Config config, @@ -223,13 +223,12 @@ public final class RntbdClientChannelPool implements ChannelPool { connectionStateListener, faultInjectionInterceptors, durableEndpointMetrics); - } + }*/ - private RntbdClientChannelPool( + RntbdClientChannelPool( final RntbdServiceEndpoint endpoint, final Bootstrap bootstrap, final Config config, - final RntbdClientChannelHealthChecker healthChecker, final ClientTelemetry clientTelemetry, final RntbdConnectionStateListener connectionStateListener, final RntbdServerErrorInjector serverErrorInjector, @@ -238,9 +237,9 @@ private RntbdClientChannelPool( checkNotNull(endpoint, "expected non-null endpoint"); checkNotNull(bootstrap, "expected non-null bootstrap"); checkNotNull(config, "expected non-null config"); - checkNotNull(healthChecker, "expected non-null healthChecker"); checkNotNull(durableEndpointMetrics, "expected non-null durableEndpointMetrics"); + RntbdClientChannelHealthChecker healthChecker = new RntbdClientChannelHealthChecker(config, clientTelemetry); this.poolHandler = new RntbdClientChannelHandler(config, healthChecker, connectionStateListener, serverErrorInjector); this.executor = bootstrap.config().group().next(); this.healthChecker = healthChecker; @@ -1204,7 +1203,7 @@ private void notifyChannelConnect(final ChannelFuture future, final Promise Date: Sun, 24 Nov 2024 20:48:06 -0800 Subject: [PATCH 02/11] remove comment and extra newline --- .../rntbd/RntbdClientChannelPool.java | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java index 97e987edc6e3..b6e2976a1afc 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java @@ -206,25 +206,6 @@ public final class RntbdClientChannelPool implements ChannelPool { * @param durableEndpointMetrics a holder for the metric state (which should be * durable for endpoints with the same address) */ -/* RntbdClientChannelPool( - final RntbdServiceEndpoint endpoint, - final Bootstrap bootstrap, - final Config config, - final ClientTelemetry clientTelemetry, - final RntbdConnectionStateListener connectionStateListener, - final RntbdServerErrorInjector faultInjectionInterceptors, - final RntbdDurableEndpointMetrics durableEndpointMetrics) { - this( - endpoint, - bootstrap, - config, - new RntbdClientChannelHealthChecker(config), - clientTelemetry, - connectionStateListener, - faultInjectionInterceptors, - durableEndpointMetrics); - }*/ - RntbdClientChannelPool( final RntbdServiceEndpoint endpoint, final Bootstrap bootstrap, @@ -1203,7 +1184,6 @@ private void notifyChannelConnect(final ChannelFuture future, final Promise Date: Sun, 24 Nov 2024 20:54:23 -0800 Subject: [PATCH 03/11] CHANGELOG --- sdk/cosmos/azure-cosmos/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 2641c091c7a9..6ccab016d108 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -9,6 +9,7 @@ #### Bugs Fixed #### Other Changes +* Added client vmId info to Rntbd health check logs - See [43079](https://github.com/Azure/azure-sdk-for-java/pull/43079) ### 4.65.0 (2024-11-19) From 5d4457d47070bece376aa04f5b046131c7e2d451 Mon Sep 17 00:00:00 2001 From: Neha Rao Date: Mon, 2 Dec 2024 13:01:26 -0800 Subject: [PATCH 04/11] fetch system info --- .../rntbd/RntbdClientChannelHealthChecker.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java index 9eb2a3e85ec5..bb90a922ceef 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java @@ -3,6 +3,7 @@ package com.azure.cosmos.implementation.directconnectivity.rntbd; +import com.azure.cosmos.implementation.ClientSideRequestStatistics; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry; import com.azure.cosmos.implementation.cpu.CpuMemoryMonitor; @@ -18,6 +19,7 @@ import java.text.MessageFormat; import java.time.Duration; import java.time.Instant; +import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -272,6 +274,8 @@ private String isWriteHang(Timestamps timestamps, Instant currentTime, RntbdRequ final Optional rntbdContext = requestManager.rntbdContext(); final int pendingRequestCount = requestManager.pendingRequestCount(); + Map systemInfo = ClientSideRequestStatistics.fetchSystemInformation().toMap(); + writeHangMessage = MessageFormat.format( "{0} health check failed due to non-responding write: [lastChannelWriteAttemptTime: {1}, " + "lastChannelWriteTime: {2}, writeDelayInNanos: {3}, writeDelayLimitInNanos: {4}, " + From a7d7b0ad3f7504dfbccf7aba6bf99185e71d9b52 Mon Sep 17 00:00:00 2001 From: Neha Rao Date: Mon, 9 Dec 2024 18:21:51 -0800 Subject: [PATCH 05/11] progress --- .../RntbdClientChannelHealthChecker.java | 23 +++++++++++++++---- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java index bb90a922ceef..f1d3bb7b9403 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java @@ -4,6 +4,7 @@ package com.azure.cosmos.implementation.directconnectivity.rntbd; import com.azure.cosmos.implementation.ClientSideRequestStatistics; +import com.azure.cosmos.implementation.CosmosDiagnosticsSystemUsageSnapshot; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry; import com.azure.cosmos.implementation.cpu.CpuMemoryMonitor; @@ -274,12 +275,13 @@ private String isWriteHang(Timestamps timestamps, Instant currentTime, RntbdRequ final Optional rntbdContext = requestManager.rntbdContext(); final int pendingRequestCount = requestManager.pendingRequestCount(); - Map systemInfo = ClientSideRequestStatistics.fetchSystemInformation().toMap(); + CosmosDiagnosticsSystemUsageSnapshot systemInfo = ClientSideRequestStatistics.fetchSystemInformation(); writeHangMessage = MessageFormat.format( "{0} health check failed due to non-responding write: [lastChannelWriteAttemptTime: {1}, " + "lastChannelWriteTime: {2}, writeDelayInNanos: {3}, writeDelayLimitInNanos: {4}, " + - "rntbdContext: {5}, pendingRequestCount: {6}, clientVmId: {7}]", + "rntbdContext: {5}, pendingRequestCount: {6}, clientVmId: {7}, clientUsedMemory: {8}, " + + "clientAvailableMemory: {9}, clientSystemCpuLoad: {10}, clientAvailableProcessors: {11}]", channel, timestamps.lastChannelWriteAttemptTime(), timestamps.lastChannelWriteTime(), @@ -287,7 +289,11 @@ private String isWriteHang(Timestamps timestamps, Instant currentTime, RntbdRequ this.writeDelayLimitInNanos, rntbdContext, pendingRequestCount, - this.clientVmId); + this.clientVmId, + systemInfo.getUsedMemory(), + systemInfo.getAvailableMemory(), + systemInfo.getSystemCpuLoad(), + systemInfo.getAvailableProcessors()); logger.warn(writeHangMessage); } @@ -310,9 +316,12 @@ private String isReadHang(Timestamps timestamps, Instant currentTime, RntbdReque final Optional rntbdContext = requestManager.rntbdContext(); final int pendingRequestCount = requestManager.pendingRequestCount(); + CosmosDiagnosticsSystemUsageSnapshot systemInfo = ClientSideRequestStatistics.fetchSystemInformation(); + readHangMessage = MessageFormat.format( "{0} health check failed due to non-responding read: [lastChannelWrite: {1}, lastChannelRead: {2}, " - + "readDelay: {3}, readDelayLimit: {4}, rntbdContext: {5}, pendingRequestCount: {6}, clientVmId: {7}]", + + "readDelay: {3}, readDelayLimit: {4}, rntbdContext: {5}, pendingRequestCount: {6}, clientVmId: {7}" + + "clientUsedMemory: {8}, clientAvailableMemory: {9}, clientSystemCpuLoad: {10}, clientAvailableProcessors: {11}]", channel, timestamps.lastChannelWriteTime(), timestamps.lastChannelReadTime(), @@ -320,7 +329,11 @@ private String isReadHang(Timestamps timestamps, Instant currentTime, RntbdReque this.readDelayLimitInNanos, rntbdContext, pendingRequestCount, - this.clientVmId); + this.clientVmId, + systemInfo.getUsedMemory(), + systemInfo.getAvailableMemory(), + systemInfo.getSystemCpuLoad(), + systemInfo.getAvailableProcessors()); logger.warn(readHangMessage); From 1fb1ea9092c19ffa840054232994156e35f2010d Mon Sep 17 00:00:00 2001 From: Neha Rao Date: Sat, 28 Dec 2024 11:29:11 -0800 Subject: [PATCH 06/11] tests --- .../rntbd/RntbdClientChannelHealthCheckerTests.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthCheckerTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthCheckerTests.java index 21028f72d091..9f11ecc71b0a 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthCheckerTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthCheckerTests.java @@ -83,6 +83,10 @@ public void isHealthyForWriteHangTests(boolean withFailureReason) throws Interru assertThat(healthyResult.getNow()).isNotEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue); assertThat(healthyResult.getNow().contains("health check failed due to non-responding write")); assertThat(healthyResult.getNow().contains("clientVmId: testClientVmId")); + assertThat(healthyResult.getNow().contains("clientUsedMemory")); + assertThat(healthyResult.getNow().contains("clientAvailableMemory")); + assertThat(healthyResult.getNow().contains("clientSystemCpuLoad")); + assertThat(healthyResult.getNow().contains("clientAvailableProcessors")); } else { Future healthyResult = healthChecker.isHealthy(channelMock).sync(); assertThat(healthyResult.isSuccess()).isTrue(); @@ -129,6 +133,11 @@ public void isHealthyForReadHangTests(boolean withFailureReason) throws Interrup assertThat(healthyResult.isSuccess()).isTrue(); assertThat(healthyResult.getNow()).isNotEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue); assertThat(healthyResult.getNow().contains("health check failed due to non-responding read")); + assertThat(healthyResult.getNow().contains("clientVmId: testClientVmId")); + assertThat(healthyResult.getNow().contains("clientUsedMemory")); + assertThat(healthyResult.getNow().contains("clientAvailableMemory")); + assertThat(healthyResult.getNow().contains("clientSystemCpuLoad")); + assertThat(healthyResult.getNow().contains("clientAvailableProcessors")); } else { Future healthyResult = healthChecker.isHealthy(channelMock).sync(); assertThat(healthyResult.isSuccess()).isTrue(); From 4ecff084f817999ec84cb61f640d0f4de17eec05 Mon Sep 17 00:00:00 2001 From: Neha Rao Date: Mon, 6 Jan 2025 11:24:09 -0800 Subject: [PATCH 07/11] pr comment --- .../RntbdClientChannelHealthChecker.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java index f1d3bb7b9403..746257678cfb 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java @@ -37,6 +37,13 @@ public final class RntbdClientChannelHealthChecker implements ChannelHealthCheck private static final Logger logger = LoggerFactory.getLogger(RntbdClientChannelHealthChecker.class); + // We need to keep a reference to client telemetry to reference the VM ID of the SDK + // client instance. We have this property in client diagnostics, but it can only be + // obtained if an operation fails or succeeds. If an operation hangs, the diagnostics + // will not be available. Floating this value to the health check logs will give us access + // to the client VM ID in scenarios where the operation hangs. + private static ClientTelemetry clientTelemetry; + // A channel will be declared healthy if a read succeeded recently as defined by this value. private static final long recentReadWindowInNanos = 1_000_000_000L; @@ -79,13 +86,6 @@ public final class RntbdClientChannelHealthChecker implements ChannelHealthCheck private final long nonRespondingChannelReadDelayTimeLimitInNanos; @JsonProperty private final int cancellationCountSinceLastReadThreshold; - // VM ID of the SDK client instance. - // We have this property in client diagnostics, but they can only be obtained if - // an operation fails or succeeds. If an operation hangs, the diagnostics will not - // be available. Floating this value to the health check logs will give us access to - // the client VM ID in scenarios where the operation hangs. - @JsonProperty - private final String clientVmId; // endregion @@ -115,7 +115,7 @@ public RntbdClientChannelHealthChecker(final Config config, final ClientTelemetr this.timeoutOnWriteTimeLimitInNanos = config.timeoutDetectionOnWriteTimeLimitInNanos(); this.nonRespondingChannelReadDelayTimeLimitInNanos = config.nonRespondingChannelReadDelayTimeLimitInNanos(); this.cancellationCountSinceLastReadThreshold = config.cancellationCountSinceLastReadThreshold(); - this.clientVmId = clientTelemetry.getClientTelemetryInfo().getMachineId(); + this.clientTelemetry = clientTelemetry; } // endregion @@ -289,7 +289,7 @@ private String isWriteHang(Timestamps timestamps, Instant currentTime, RntbdRequ this.writeDelayLimitInNanos, rntbdContext, pendingRequestCount, - this.clientVmId, + clientTelemetry.getClientTelemetryInfo().getMachineId(), systemInfo.getUsedMemory(), systemInfo.getAvailableMemory(), systemInfo.getSystemCpuLoad(), @@ -329,7 +329,7 @@ private String isReadHang(Timestamps timestamps, Instant currentTime, RntbdReque this.readDelayLimitInNanos, rntbdContext, pendingRequestCount, - this.clientVmId, + clientTelemetry.getClientTelemetryInfo().getMachineId(), systemInfo.getUsedMemory(), systemInfo.getAvailableMemory(), systemInfo.getSystemCpuLoad(), From 73022d9ef70b144ef3be6ae929ca36c589f21a28 Mon Sep 17 00:00:00 2001 From: Neha Rao Date: Mon, 6 Jan 2025 12:24:02 -0800 Subject: [PATCH 08/11] fix --- .../directconnectivity/RntbdTransportClientTest.java | 1 - .../rntbd/RntbdClientChannelHealthChecker.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java index f0641d762c98..7d25deb33662 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java @@ -59,7 +59,6 @@ import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdResponseDecoder; import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdServiceEndpoint; import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdUUID; -import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdUtils; import com.azure.cosmos.implementation.guava25.base.Strings; import com.azure.cosmos.implementation.guava25.collect.ImmutableMap; import io.micrometer.core.instrument.Tag; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java index 746257678cfb..fa98b1653ca1 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java @@ -42,7 +42,7 @@ public final class RntbdClientChannelHealthChecker implements ChannelHealthCheck // obtained if an operation fails or succeeds. If an operation hangs, the diagnostics // will not be available. Floating this value to the health check logs will give us access // to the client VM ID in scenarios where the operation hangs. - private static ClientTelemetry clientTelemetry; + private final ClientTelemetry clientTelemetry; // A channel will be declared healthy if a read succeeded recently as defined by this value. private static final long recentReadWindowInNanos = 1_000_000_000L; From 7e359cfcf4767e2e0bee1ddff7737ce275c0590a Mon Sep 17 00:00:00 2001 From: Neha Rao Date: Mon, 6 Jan 2025 13:52:18 -0800 Subject: [PATCH 09/11] pr comments --- .../RntbdClientChannelHealthCheckerTests.java | 25 +++++++++---------- .../RntbdClientChannelHealthChecker.java | 1 - 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthCheckerTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthCheckerTests.java index 9f11ecc71b0a..7fb964b85dff 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthCheckerTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthCheckerTests.java @@ -18,7 +18,6 @@ import io.netty.handler.logging.LogLevel; import io.netty.handler.ssl.SslContext; import io.netty.util.concurrent.Future; -import org.mockito.Mock; import org.mockito.MockedStatic; import org.mockito.Mockito; import org.testng.annotations.DataProvider; @@ -81,12 +80,12 @@ public void isHealthyForWriteHangTests(boolean withFailureReason) throws Interru Future healthyResult = healthChecker.isHealthyWithFailureReason(channelMock).sync(); assertThat(healthyResult.isSuccess()).isTrue(); assertThat(healthyResult.getNow()).isNotEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue); - assertThat(healthyResult.getNow().contains("health check failed due to non-responding write")); - assertThat(healthyResult.getNow().contains("clientVmId: testClientVmId")); - assertThat(healthyResult.getNow().contains("clientUsedMemory")); - assertThat(healthyResult.getNow().contains("clientAvailableMemory")); - assertThat(healthyResult.getNow().contains("clientSystemCpuLoad")); - assertThat(healthyResult.getNow().contains("clientAvailableProcessors")); + assertThat(healthyResult.getNow().contains("health check failed due to non-responding write")).isTrue(); + assertThat(healthyResult.getNow().contains("clientVmId: testClientVmId")).isTrue(); + assertThat(healthyResult.getNow().contains("clientUsedMemory")).isTrue(); + assertThat(healthyResult.getNow().contains("clientAvailableMemory")).isTrue(); + assertThat(healthyResult.getNow().contains("clientSystemCpuLoad")).isTrue(); + assertThat(healthyResult.getNow().contains("clientAvailableProcessors")).isTrue(); } else { Future healthyResult = healthChecker.isHealthy(channelMock).sync(); assertThat(healthyResult.isSuccess()).isTrue(); @@ -132,12 +131,12 @@ public void isHealthyForReadHangTests(boolean withFailureReason) throws Interrup Future healthyResult = healthChecker.isHealthyWithFailureReason(channelMock).sync(); assertThat(healthyResult.isSuccess()).isTrue(); assertThat(healthyResult.getNow()).isNotEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue); - assertThat(healthyResult.getNow().contains("health check failed due to non-responding read")); - assertThat(healthyResult.getNow().contains("clientVmId: testClientVmId")); - assertThat(healthyResult.getNow().contains("clientUsedMemory")); - assertThat(healthyResult.getNow().contains("clientAvailableMemory")); - assertThat(healthyResult.getNow().contains("clientSystemCpuLoad")); - assertThat(healthyResult.getNow().contains("clientAvailableProcessors")); + assertThat(healthyResult.getNow().contains("health check failed due to non-responding read")).isTrue(); + assertThat(healthyResult.getNow().contains("clientVmId: testClientVmId")).isTrue(); + assertThat(healthyResult.getNow().contains("clientUsedMemory")).isTrue(); + assertThat(healthyResult.getNow().contains("clientAvailableMemory")).isTrue(); + assertThat(healthyResult.getNow().contains("clientSystemCpuLoad")).isTrue(); + assertThat(healthyResult.getNow().contains("clientAvailableProcessors")).isTrue(); } else { Future healthyResult = healthChecker.isHealthy(channelMock).sync(); assertThat(healthyResult.isSuccess()).isTrue(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java index fa98b1653ca1..63ef797d780d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java @@ -20,7 +20,6 @@ import java.text.MessageFormat; import java.time.Duration; import java.time.Instant; -import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; From 45b26947ff3e0ba5c3c1720a36b00b518ecdbf31 Mon Sep 17 00:00:00 2001 From: Neha Rao Date: Wed, 8 Jan 2025 08:56:21 -0800 Subject: [PATCH 10/11] pr comments --- .../RntbdClientChannelHealthCheckerTests.java | 43 +++++++++----- .../RntbdClientChannelHealthChecker.java | 59 +++++++++++-------- 2 files changed, 62 insertions(+), 40 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthCheckerTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthCheckerTests.java index 7fb964b85dff..f4795bc8b932 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthCheckerTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthCheckerTests.java @@ -82,10 +82,6 @@ public void isHealthyForWriteHangTests(boolean withFailureReason) throws Interru assertThat(healthyResult.getNow()).isNotEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue); assertThat(healthyResult.getNow().contains("health check failed due to non-responding write")).isTrue(); assertThat(healthyResult.getNow().contains("clientVmId: testClientVmId")).isTrue(); - assertThat(healthyResult.getNow().contains("clientUsedMemory")).isTrue(); - assertThat(healthyResult.getNow().contains("clientAvailableMemory")).isTrue(); - assertThat(healthyResult.getNow().contains("clientSystemCpuLoad")).isTrue(); - assertThat(healthyResult.getNow().contains("clientAvailableProcessors")).isTrue(); } else { Future healthyResult = healthChecker.isHealthy(channelMock).sync(); assertThat(healthyResult.isSuccess()).isTrue(); @@ -130,13 +126,12 @@ public void isHealthyForReadHangTests(boolean withFailureReason) throws Interrup if (withFailureReason) { Future healthyResult = healthChecker.isHealthyWithFailureReason(channelMock).sync(); assertThat(healthyResult.isSuccess()).isTrue(); - assertThat(healthyResult.getNow()).isNotEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue); - assertThat(healthyResult.getNow().contains("health check failed due to non-responding read")).isTrue(); - assertThat(healthyResult.getNow().contains("clientVmId: testClientVmId")).isTrue(); - assertThat(healthyResult.getNow().contains("clientUsedMemory")).isTrue(); - assertThat(healthyResult.getNow().contains("clientAvailableMemory")).isTrue(); - assertThat(healthyResult.getNow().contains("clientSystemCpuLoad")).isTrue(); - assertThat(healthyResult.getNow().contains("clientAvailableProcessors")).isTrue(); + + String message = healthyResult.getNow(); + assertThat(message).isNotEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue); + assertThat(message.contains("health check failed due to non-responding read")).isTrue(); + assertThat(message.contains("clientVmId: testClientVmId")).isTrue(); + validateSystemDiagnostics(message); } else { Future healthyResult = healthChecker.isHealthy(channelMock).sync(); assertThat(healthyResult.isSuccess()).isTrue(); @@ -232,8 +227,12 @@ public void transitTimeoutHighFrequencyTests(boolean withFailureReason) throws I if (withFailureReason) { Future healthyResult = healthChecker.isHealthyWithFailureReason(channelMock).sync(); assertThat(healthyResult.isSuccess()).isTrue(); - assertThat(healthyResult.getNow()).isNotEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue); - assertThat(healthyResult.getNow().contains("health check failed due to transit timeout high frequency threshold hit")); + + String message = healthyResult.getNow(); + assertThat(message).isNotEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue); + assertThat(message.contains("health check failed due to transit timeout high frequency threshold hit")).isTrue(); + assertThat(message.contains("clientVmId: testClientVmId")).isTrue(); + validateSystemDiagnostics(message); } else { Future healthyResult = healthChecker.isHealthy(channelMock).sync(); assertThat(healthyResult.isSuccess()).isTrue(); @@ -282,8 +281,12 @@ public void transitTimeoutOnWriteTests(boolean withFailureReason) throws Interru if (withFailureReason) { Future healthyResult = healthChecker.isHealthyWithFailureReason(channelMock).sync(); assertThat(healthyResult.isSuccess()).isTrue(); - assertThat(healthyResult.getNow()).isNotEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue); - assertThat(healthyResult.getNow()).contains("health check failed due to transit timeout on write threshold hit"); + + String message = healthyResult.getNow(); + assertThat(message).isNotEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue); + assertThat(message).contains("health check failed due to transit timeout on write threshold hit"); + assertThat(message).contains("clientVmId: testClientVmId"); + validateSystemDiagnostics(message); } else { Future healthyResult = healthChecker.isHealthy(channelMock).sync(); assertThat(healthyResult.isSuccess()).isTrue(); @@ -345,6 +348,7 @@ public void transitTimeoutOnWrite_HighCPULoadTests(boolean withFailureReason) th assertThat(healthyResult.getNow()).isEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue); // Verify under high CPU load, the transitTimeout will be reset Mockito.verify(timestampsMock, Mockito.times(1)).resetTransitTimeout(); + } else { Future healthyResult = healthChecker.isHealthy(channelMock).sync(); assertThat(healthyResult.isSuccess()).isTrue(); @@ -461,7 +465,8 @@ public void cancellationPronenessOfChannelWithHighCpuLoad_Test(boolean withFailu if (withFailureReason) { Future healthyResult = healthChecker.isHealthyWithFailureReason(channelMock).sync(); assertThat(healthyResult.isSuccess()).isTrue(); - assertThat(healthyResult.getNow()).isEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue); + String message = healthyResult.getNow(); + assertThat(message).isEqualTo(RntbdConstants.RntbdHealthCheckResults.SuccessValue); // Verify under high CPU load, the cancellationCount will be reset Mockito.verify(timestampsMock, Mockito.times(1)).resetCancellationCount(); } else { @@ -474,4 +479,10 @@ public void cancellationPronenessOfChannelWithHighCpuLoad_Test(boolean withFailu } } + private void validateSystemDiagnostics(String string) { + assertThat(string.contains("clientUsedMemory")).isTrue(); + assertThat(string.contains("clientAvailableMemory")).isTrue(); + assertThat(string.contains("clientSystemCpuLoad")).isTrue(); + assertThat(string.contains("clientAvailableProcessors")).isTrue(); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java index 63ef797d780d..e628fa64894b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelHealthChecker.java @@ -279,8 +279,7 @@ private String isWriteHang(Timestamps timestamps, Instant currentTime, RntbdRequ writeHangMessage = MessageFormat.format( "{0} health check failed due to non-responding write: [lastChannelWriteAttemptTime: {1}, " + "lastChannelWriteTime: {2}, writeDelayInNanos: {3}, writeDelayLimitInNanos: {4}, " + - "rntbdContext: {5}, pendingRequestCount: {6}, clientVmId: {7}, clientUsedMemory: {8}, " + - "clientAvailableMemory: {9}, clientSystemCpuLoad: {10}, clientAvailableProcessors: {11}]", + "rntbdContext: {5}, pendingRequestCount: {6}, clientVmId: {7}, {8}]", channel, timestamps.lastChannelWriteAttemptTime(), timestamps.lastChannelWriteTime(), @@ -289,10 +288,7 @@ private String isWriteHang(Timestamps timestamps, Instant currentTime, RntbdRequ rntbdContext, pendingRequestCount, clientTelemetry.getClientTelemetryInfo().getMachineId(), - systemInfo.getUsedMemory(), - systemInfo.getAvailableMemory(), - systemInfo.getSystemCpuLoad(), - systemInfo.getAvailableProcessors()); + getSystemDiagnostics()); logger.warn(writeHangMessage); } @@ -315,12 +311,9 @@ private String isReadHang(Timestamps timestamps, Instant currentTime, RntbdReque final Optional rntbdContext = requestManager.rntbdContext(); final int pendingRequestCount = requestManager.pendingRequestCount(); - CosmosDiagnosticsSystemUsageSnapshot systemInfo = ClientSideRequestStatistics.fetchSystemInformation(); - readHangMessage = MessageFormat.format( "{0} health check failed due to non-responding read: [lastChannelWrite: {1}, lastChannelRead: {2}, " - + "readDelay: {3}, readDelayLimit: {4}, rntbdContext: {5}, pendingRequestCount: {6}, clientVmId: {7}" - + "clientUsedMemory: {8}, clientAvailableMemory: {9}, clientSystemCpuLoad: {10}, clientAvailableProcessors: {11}]", + + "readDelay: {3}, readDelayLimit: {4}, rntbdContext: {5}, pendingRequestCount: {6}, clientVmId: {7}, {8}]", channel, timestamps.lastChannelWriteTime(), timestamps.lastChannelReadTime(), @@ -329,10 +322,7 @@ private String isReadHang(Timestamps timestamps, Instant currentTime, RntbdReque rntbdContext, pendingRequestCount, clientTelemetry.getClientTelemetryInfo().getMachineId(), - systemInfo.getUsedMemory(), - systemInfo.getAvailableMemory(), - systemInfo.getSystemCpuLoad(), - systemInfo.getAvailableProcessors()); + getSystemDiagnostics()); logger.warn(readHangMessage); @@ -364,11 +354,13 @@ private String transitTimeoutValidation(Timestamps timestamps, Instant currentTi if (readDelay >= this.timeoutTimeLimitInNanos) { transitTimeoutValidationMessage = MessageFormat.format( "{0} health check failed due to transit timeout detection time limit: [rntbdContext: {1}," - + "lastChannelRead: {2}, timeoutTimeLimitInNanos: {3}]", + + "lastChannelRead: {2}, timeoutTimeLimitInNanos: {3}, clientVmId: {4}, {5}]", channel, rntbdContext, timestamps.lastReadTime, - this.timeoutTimeLimitInNanos); + this.timeoutTimeLimitInNanos, + clientTelemetry.getClientTelemetryInfo().getMachineId(), + getSystemDiagnostics()); logger.warn(transitTimeoutValidationMessage); return transitTimeoutValidationMessage; @@ -380,13 +372,16 @@ private String transitTimeoutValidation(Timestamps timestamps, Instant currentTi && readDelay >= this.timeoutHighFrequencyTimeLimitInNanos) { transitTimeoutValidationMessage = MessageFormat.format( "{0} health check failed due to transit timeout high frequency threshold hit: [rntbdContext: {1}," - + "lastChannelRead: {2}, transitTimeoutCount: {3}, timeoutHighFrequencyThreshold: {4}, timeoutHighFrequencyTimeLimitInNanos: {5}]", + + "lastChannelRead: {2}, transitTimeoutCount: {3}, timeoutHighFrequencyThreshold: {4}, timeoutHighFrequencyTimeLimitInNanos: {5}" + + "clientVmId: {6}, {7}]", channel, rntbdContext, timestamps.lastReadTime, timestamps.transitTimeoutCount, this.timeoutHighFrequencyThreshold, - this.timeoutHighFrequencyTimeLimitInNanos); + this.timeoutHighFrequencyTimeLimitInNanos, + clientTelemetry.getClientTelemetryInfo().getMachineId(), + getSystemDiagnostics()); logger.warn(transitTimeoutValidationMessage); return transitTimeoutValidationMessage; @@ -398,13 +393,16 @@ private String transitTimeoutValidation(Timestamps timestamps, Instant currentTi && readDelay >= this.timeoutOnWriteTimeLimitInNanos) { transitTimeoutValidationMessage = MessageFormat.format( "{0} health check failed due to transit timeout on write threshold hit: [rntbdContext: {1}," - + "lastChannelRead: {2}, transitTimeoutWriteCount: {3}, timeoutOnWriteThreshold: {4}, timeoutOnWriteTimeLimitInNanos: {5}]", + + "lastChannelRead: {2}, transitTimeoutWriteCount: {3}, timeoutOnWriteThreshold: {4}, timeoutOnWriteTimeLimitInNanos: {5}]" + + "clientVmId: {6}, {7}]", channel, rntbdContext, timestamps.lastReadTime, timestamps.transitTimeoutWriteCount, this.timeoutOnWriteThreshold, - this.timeoutOnWriteTimeLimitInNanos); + this.timeoutOnWriteTimeLimitInNanos, + clientTelemetry.getClientTelemetryInfo().getMachineId(), + getSystemDiagnostics()); logger.warn(transitTimeoutValidationMessage); return transitTimeoutValidationMessage; @@ -421,12 +419,14 @@ private String idleConnectionValidation(Timestamps timestamps, Instant currentTi if (Duration.between(timestamps.lastChannelReadTime(), currentTime).toNanos() > this.idleConnectionTimeoutInNanos) { errorMessage = MessageFormat.format( "{0} health check failed due to idle connection timeout: [lastChannelWrite: {1}, lastChannelRead: {2}, " - + "idleConnectionTimeout: {3}, currentTime: {4}]", + + "idleConnectionTimeout: {3}, currentTime: {4}, clientVmId: {5}, {6}]", channel, timestamps.lastChannelWriteTime(), timestamps.lastChannelReadTime(), idleConnectionTimeoutInNanos, - currentTime); + currentTime, + clientTelemetry.getClientTelemetryInfo().getMachineId(), + getSystemDiagnostics()); logger.warn(errorMessage); } @@ -458,13 +458,15 @@ private String isCancellationProneChannel(Timestamps timestamps, Instant current errorMessage = MessageFormat.format( "{0} health check failed due to channel being cancellation prone: [rntbdContext: {1}, lastChannelWrite: {2}, lastChannelRead: {3}," - + "cancellationCountSinceLastSuccessfulRead: {4}, currentTime: {5}]", + + "cancellationCountSinceLastSuccessfulRead: {4}, currentTime: {5}, clientVmId: {6}, {7}]", channel, rntbdContext, timestamps.lastChannelWriteTime(), timestamps.lastChannelReadTime(), timestamps.cancellationCount(), - currentTime); + currentTime, + clientTelemetry.getClientTelemetryInfo().getMachineId(), + getSystemDiagnostics()); logger.warn(errorMessage); return errorMessage; @@ -474,6 +476,15 @@ private String isCancellationProneChannel(Timestamps timestamps, Instant current return errorMessage; } + private String getSystemDiagnostics() { + CosmosDiagnosticsSystemUsageSnapshot systemInfo = ClientSideRequestStatistics.fetchSystemInformation(); + return MessageFormat.format("clientUsedMemory: {0}, clientAvailableMemory: {1}, clientSystemCpuLoad: {2}, clientAvailableProcessors: {3}", + systemInfo.getUsedMemory(), + systemInfo.getAvailableMemory(), + systemInfo.getSystemCpuLoad(), + systemInfo.getAvailableProcessors()); + } + @Override public String toString() { return RntbdObjectMapper.toString(this); From fcec0c46d4b635ddf3d6b36d2c3d38db04ee8ec4 Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Thu, 9 Jan 2025 16:10:06 -0500 Subject: [PATCH 11/11] Fix ConnectionStateListenerTest --- .../ConnectionStateListenerTest.java | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConnectionStateListenerTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConnectionStateListenerTest.java index fe329253fe96..d41b688ded2e 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConnectionStateListenerTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConnectionStateListenerTest.java @@ -3,6 +3,7 @@ package com.azure.cosmos.implementation.directconnectivity; +import com.azure.cosmos.ConnectionMode; import com.azure.cosmos.DirectConnectionConfig; import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.ConnectionPolicy; @@ -13,6 +14,8 @@ import com.azure.cosmos.implementation.ResourceType; import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.UserAgentContainer; +import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry; +import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetryInfo; import com.azure.cosmos.implementation.directconnectivity.TcpServerMock.RequestResponseType; import com.azure.cosmos.implementation.directconnectivity.TcpServerMock.SslContextUtils; import com.azure.cosmos.implementation.directconnectivity.TcpServerMock.TcpServer; @@ -33,6 +36,7 @@ import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Random; @@ -93,12 +97,27 @@ public void connectionStateListener_OnConnectionEvent( Configs config = Mockito.mock(Configs.class); Mockito.doReturn(sslContext).when(config).getSslContext(false, false); + ClientTelemetry clientTelemetry = Mockito.mock(ClientTelemetry.class); + ClientTelemetryInfo clientTelemetryInfo = new ClientTelemetryInfo( + "testMachine", + "testClient", + "testProcess", + "testApp", + ConnectionMode.DIRECT, + "test-cdb-account", + "Test Region 1", + "Linux", + false, + Arrays.asList("Test Region 1", "Test Region 2")); + + Mockito.when(clientTelemetry.getClientTelemetryInfo()).thenReturn(clientTelemetryInfo); + RntbdTransportClient client = new RntbdTransportClient( config, connectionPolicy, new UserAgentContainer(), addressResolver, - null, + clientTelemetry, null); RxDocumentServiceRequest req =