From 12eaf0d6dcb332254bc70d734973c1621c275b62 Mon Sep 17 00:00:00 2001 From: Gaole Meng Date: Thu, 6 Feb 2025 01:50:52 +0000 Subject: [PATCH] fix: allow trace id to switch within the same connection --- .../bigquery/storage/v1/ConnectionWorker.java | 10 ++++++-- .../storage/v1/ConnectionWorkerPool.java | 21 ++++++----------- .../bigquery/storage/v1/StreamWriter.java | 23 ++++++++++--------- .../storage/v1/ConnectionWorkerPoolTest.java | 1 - .../storage/v1/ConnectionWorkerTest.java | 11 +++++++-- 5 files changed, 36 insertions(+), 30 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index a7a43b515..f689eb76b 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -550,7 +550,8 @@ String getWriteLocation() { private ApiFuture appendInternal( StreamWriter streamWriter, AppendRowsRequest message, String requestUniqueId) { AppendRequestAndResponse requestWrapper = - new AppendRequestAndResponse(message, streamWriter, this.retrySettings, requestUniqueId); + new AppendRequestAndResponse(message, streamWriter, this.retrySettings, requestUniqueId, + streamWriter.getFullTraceId()); if (requestWrapper.messageSize > getApiMaxRequestBytes()) { requestWrapper.appendResult.setException( new StatusRuntimeException( @@ -1372,16 +1373,21 @@ static final class AppendRequestAndResponse { // If a response is no longer expected this is set back to null. Instant requestSendTimeStamp; + // Mark where the traffic coming from. Used for internal logging purpose. + String fullTraceId; + AppendRequestAndResponse( AppendRowsRequest message, StreamWriter streamWriter, RetrySettings retrySettings, - String requestUniqueId) { + String requestUniqueId, + String fullTraceId) { this.appendResult = SettableApiFuture.create(); this.message = message; this.messageSize = message.getProtoRows().getSerializedSize(); this.streamWriter = streamWriter; this.requestUniqueId = requestUniqueId; + this.fullTraceId = fullTraceId; this.blockMessageSendDeadline = Instant.now(); this.retryCount = 0; // To be set after first retry diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index 8550e553d..b458ddffd 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -95,10 +95,6 @@ public class ConnectionWorkerPool { /** Enable test related logic. */ private static boolean enableTesting = false; - /* - * TraceId for debugging purpose. - */ - private final String traceId; /* * Sets the compression to use for the calls */ @@ -213,7 +209,6 @@ public abstract static class Builder { long maxInflightBytes, java.time.Duration maxRetryDuration, FlowController.LimitExceededBehavior limitExceededBehavior, - String traceId, @Nullable String comperssorName, BigQueryWriteSettings clientSettings, RetrySettings retrySettings, @@ -223,7 +218,6 @@ public abstract static class Builder { this.maxInflightBytes = maxInflightBytes; this.maxRetryDuration = maxRetryDuration; this.limitExceededBehavior = limitExceededBehavior; - this.traceId = traceId; this.compressorName = comperssorName; this.clientSettings = clientSettings; this.currentMaxConnectionCount = settings.minConnectionsPerRegion(); @@ -319,7 +313,8 @@ private ConnectionWorker createOrReuseConnectionWorker( if (connectionWorkerPool.size() < currentMaxConnectionCount) { // Always create a new connection if we haven't reached current maximum. return createConnectionWorker( - streamWriter.getStreamName(), streamWriter.getLocation(), streamWriter.getProtoSchema()); + streamWriter.getStreamName(), streamWriter.getLocation(), streamWriter.getProtoSchema(), + streamWriter.getFullTraceId()); } else { ConnectionWorker existingBestConnection = pickBestLoadConnection( @@ -338,7 +333,8 @@ private ConnectionWorker createOrReuseConnectionWorker( return createConnectionWorker( streamWriter.getStreamName(), streamWriter.getLocation(), - streamWriter.getProtoSchema()); + streamWriter.getProtoSchema(), + streamWriter.getFullTraceId()); } else { // Stick to the original connection if all the connections are overwhelmed. if (existingConnectionWorker != null) { @@ -394,7 +390,8 @@ static ConnectionWorker pickBestLoadConnection( * computeIfAbsent(...) which is at most once per key. */ private ConnectionWorker createConnectionWorker( - String streamName, String location, ProtoSchema writeSchema) throws IOException { + String streamName, String location, ProtoSchema writeSchema, + String fullTraceId) throws IOException { if (enableTesting) { // Though atomic integer is super lightweight, add extra if check in case adding future logic. testValueCreateConnectionCount.getAndIncrement(); @@ -408,7 +405,7 @@ private ConnectionWorker createConnectionWorker( maxInflightBytes, maxRetryDuration, limitExceededBehavior, - traceId, + fullTraceId, compressorName, clientSettings, retrySettings, @@ -494,10 +491,6 @@ int getTotalConnectionCount() { return connectionWorkerPool.size(); } - String getTraceId() { - return traceId; - } - FlowController.LimitExceededBehavior limitExceededBehavior() { return limitExceededBehavior; } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index fbeadef87..a5ec85477 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -73,6 +73,11 @@ public class StreamWriter implements AutoCloseable { */ private final String streamName; + /** + * This is the library version may or may not include library version id. + */ + private final String fullTraceId; + /** Every writer has a fixed proto schema. */ private final ProtoSchema writerSchema; @@ -233,6 +238,7 @@ private StreamWriter(Builder builder) throws IOException { BigQueryWriteSettings clientSettings = getBigQueryWriteSettings(builder); this.requestProfilerHook = new RequestProfiler.RequestProfilerHook(builder.enableRequestProfiler); + this.fullTraceId = builder.getFullTraceId(); if (builder.enableRequestProfiler) { // Request profiler is enabled on singleton level, from now on a periodical flush will be // started @@ -320,7 +326,6 @@ private StreamWriter(Builder builder) throws IOException { builder.maxInflightBytes, builder.maxRetryDuration, builder.limitExceededBehavior, - builder.getFullTraceId(), builder.compressorName, client.getSettings(), builder.retrySettings, @@ -359,6 +364,10 @@ static boolean isDefaultStream(String streamName) { return streamMatcher.find(); } + String getFullTraceId() { + return fullTraceId; + } + AppendRowsRequest.MissingValueInterpretation getDefaultValueInterpretation() { return defaultMissingValueInterpretation; } @@ -401,15 +410,6 @@ static BigQueryWriteSettings getBigQueryWriteSettings(Builder builder) throws IO // Validate whether the fetched connection pool matched certain properties. private void validateFetchedConnectonPool(StreamWriter.Builder builder) { - String storedTraceId = - this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId(); - if (!Objects.equals(storedTraceId, builder.getFullTraceId())) { - throw new IllegalArgumentException( - String.format( - "Trace id used for the same connection pool for the same location must be the same, " - + "however stored trace id is %s, and expected trace id is %s.", - storedTraceId, builder.getFullTraceId())); - } FlowController.LimitExceededBehavior storedLimitExceededBehavior = singleConnectionOrConnectionPool.connectionWorkerPool().limitExceededBehavior(); if (!Objects.equals(storedLimitExceededBehavior, builder.limitExceededBehavior)) { @@ -479,7 +479,8 @@ ApiFuture appendWithUniqueId( AppendRowsRequest.newBuilder().build(), /*StreamWriter=*/ this, /*RetrySettings=*/ null, - requestUniqueId); + requestUniqueId, + fullTraceId); requestWrapper.appendResult.setException( new Exceptions.StreamWriterClosedException( Status.fromCode(Status.Code.FAILED_PRECONDITION) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java index 4f3a7d30d..047909122 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java @@ -561,7 +561,6 @@ ConnectionWorkerPool createConnectionWorkerPool( maxBytes, maxRetryDuration, FlowController.LimitExceededBehavior.Block, - TEST_TRACE_ID, null, clientSettings, retrySettings, diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index 7332960ca..3aacd9b28 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -85,6 +85,7 @@ public void setUp() throws Exception { .build()); } + @Test public void testMultiplexedAppendSuccess() throws Exception { try (ConnectionWorker connectionWorker = createMultiplexedConnectionWorker()) { @@ -97,11 +98,13 @@ public void testMultiplexedAppendSuccess() throws Exception { StreamWriter.newBuilder(TEST_STREAM_1, client) .setWriterSchema(createProtoSchema("foo")) .setLocation("us") + .setTraceId("some_head:trace_id_1") .build(); StreamWriter sw2 = StreamWriter.newBuilder(TEST_STREAM_2, client) .setWriterSchema(createProtoSchema("complicate")) .setLocation("us") + .setTraceId("some_head:trace_id_2") .build(); // We do a pattern of: // send to stream1, string1 @@ -159,28 +162,32 @@ public void testMultiplexedAppendSuccess() throws Exception { case 0: assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1); assertThat( - serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName()) + serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName()) .isEqualTo("foo"); + assertThat(serverRequest.getTraceId()).isEqualTo(sw1.getFullTraceId()); break; case 1: // The write stream is empty until we enter multiplexing. assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1); // Schema is empty if not at the first request after table switch. assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse(); + assertThat(serverRequest.getTraceId()).isEmpty(); break; case 2: // Stream name is always populated after multiplexing. assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_2); // Schema is populated after table switch. assertThat( - serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName()) + serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName()) .isEqualTo("complicate"); + assertThat(serverRequest.getTraceId()).isEqualTo(sw2.getFullTraceId()); break; case 3: // Schema is empty if not at the first request after table switch. assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse(); // Stream name is always populated after multiplexing. assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_2); + assertThat(serverRequest.getTraceId()).isEmpty(); break; default: // fall out break;