Skip to content

Commit

Permalink
fix: allow trace id to switch within the same connection
Browse files Browse the repository at this point in the history
  • Loading branch information
GaoleMeng committed Feb 6, 2025
1 parent 2ff6112 commit 12eaf0d
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,8 @@ String getWriteLocation() {
private ApiFuture<AppendRowsResponse> appendInternal(
StreamWriter streamWriter, AppendRowsRequest message, String requestUniqueId) {
AppendRequestAndResponse requestWrapper =
new AppendRequestAndResponse(message, streamWriter, this.retrySettings, requestUniqueId);
new AppendRequestAndResponse(message, streamWriter, this.retrySettings, requestUniqueId,
streamWriter.getFullTraceId());
if (requestWrapper.messageSize > getApiMaxRequestBytes()) {
requestWrapper.appendResult.setException(
new StatusRuntimeException(
Expand Down Expand Up @@ -1372,16 +1373,21 @@ static final class AppendRequestAndResponse {
// If a response is no longer expected this is set back to null.
Instant requestSendTimeStamp;

// Mark where the traffic coming from. Used for internal logging purpose.
String fullTraceId;

AppendRequestAndResponse(
AppendRowsRequest message,
StreamWriter streamWriter,
RetrySettings retrySettings,
String requestUniqueId) {
String requestUniqueId,
String fullTraceId) {
this.appendResult = SettableApiFuture.create();
this.message = message;
this.messageSize = message.getProtoRows().getSerializedSize();
this.streamWriter = streamWriter;
this.requestUniqueId = requestUniqueId;
this.fullTraceId = fullTraceId;
this.blockMessageSendDeadline = Instant.now();
this.retryCount = 0;
// To be set after first retry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,6 @@ public class ConnectionWorkerPool {
/** Enable test related logic. */
private static boolean enableTesting = false;

/*
* TraceId for debugging purpose.
*/
private final String traceId;
/*
* Sets the compression to use for the calls
*/
Expand Down Expand Up @@ -213,7 +209,6 @@ public abstract static class Builder {
long maxInflightBytes,
java.time.Duration maxRetryDuration,
FlowController.LimitExceededBehavior limitExceededBehavior,
String traceId,
@Nullable String comperssorName,
BigQueryWriteSettings clientSettings,
RetrySettings retrySettings,
Expand All @@ -223,7 +218,6 @@ public abstract static class Builder {
this.maxInflightBytes = maxInflightBytes;
this.maxRetryDuration = maxRetryDuration;
this.limitExceededBehavior = limitExceededBehavior;
this.traceId = traceId;
this.compressorName = comperssorName;
this.clientSettings = clientSettings;
this.currentMaxConnectionCount = settings.minConnectionsPerRegion();
Expand Down Expand Up @@ -319,7 +313,8 @@ private ConnectionWorker createOrReuseConnectionWorker(
if (connectionWorkerPool.size() < currentMaxConnectionCount) {
// Always create a new connection if we haven't reached current maximum.
return createConnectionWorker(
streamWriter.getStreamName(), streamWriter.getLocation(), streamWriter.getProtoSchema());
streamWriter.getStreamName(), streamWriter.getLocation(), streamWriter.getProtoSchema(),
streamWriter.getFullTraceId());
} else {
ConnectionWorker existingBestConnection =
pickBestLoadConnection(
Expand All @@ -338,7 +333,8 @@ private ConnectionWorker createOrReuseConnectionWorker(
return createConnectionWorker(
streamWriter.getStreamName(),
streamWriter.getLocation(),
streamWriter.getProtoSchema());
streamWriter.getProtoSchema(),
streamWriter.getFullTraceId());
} else {
// Stick to the original connection if all the connections are overwhelmed.
if (existingConnectionWorker != null) {
Expand Down Expand Up @@ -394,7 +390,8 @@ static ConnectionWorker pickBestLoadConnection(
* computeIfAbsent(...) which is at most once per key.
*/
private ConnectionWorker createConnectionWorker(
String streamName, String location, ProtoSchema writeSchema) throws IOException {
String streamName, String location, ProtoSchema writeSchema,
String fullTraceId) throws IOException {
if (enableTesting) {
// Though atomic integer is super lightweight, add extra if check in case adding future logic.
testValueCreateConnectionCount.getAndIncrement();
Expand All @@ -408,7 +405,7 @@ private ConnectionWorker createConnectionWorker(
maxInflightBytes,
maxRetryDuration,
limitExceededBehavior,
traceId,
fullTraceId,
compressorName,
clientSettings,
retrySettings,
Expand Down Expand Up @@ -494,10 +491,6 @@ int getTotalConnectionCount() {
return connectionWorkerPool.size();
}

String getTraceId() {
return traceId;
}

FlowController.LimitExceededBehavior limitExceededBehavior() {
return limitExceededBehavior;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public class StreamWriter implements AutoCloseable {
*/
private final String streamName;

/**
* This is the library version may or may not include library version id.
*/
private final String fullTraceId;

/** Every writer has a fixed proto schema. */
private final ProtoSchema writerSchema;

Expand Down Expand Up @@ -233,6 +238,7 @@ private StreamWriter(Builder builder) throws IOException {
BigQueryWriteSettings clientSettings = getBigQueryWriteSettings(builder);
this.requestProfilerHook =
new RequestProfiler.RequestProfilerHook(builder.enableRequestProfiler);
this.fullTraceId = builder.getFullTraceId();
if (builder.enableRequestProfiler) {
// Request profiler is enabled on singleton level, from now on a periodical flush will be
// started
Expand Down Expand Up @@ -320,7 +326,6 @@ private StreamWriter(Builder builder) throws IOException {
builder.maxInflightBytes,
builder.maxRetryDuration,
builder.limitExceededBehavior,
builder.getFullTraceId(),
builder.compressorName,
client.getSettings(),
builder.retrySettings,
Expand Down Expand Up @@ -359,6 +364,10 @@ static boolean isDefaultStream(String streamName) {
return streamMatcher.find();
}

String getFullTraceId() {
return fullTraceId;
}

AppendRowsRequest.MissingValueInterpretation getDefaultValueInterpretation() {
return defaultMissingValueInterpretation;
}
Expand Down Expand Up @@ -401,15 +410,6 @@ static BigQueryWriteSettings getBigQueryWriteSettings(Builder builder) throws IO

// Validate whether the fetched connection pool matched certain properties.
private void validateFetchedConnectonPool(StreamWriter.Builder builder) {
String storedTraceId =
this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId();
if (!Objects.equals(storedTraceId, builder.getFullTraceId())) {
throw new IllegalArgumentException(
String.format(
"Trace id used for the same connection pool for the same location must be the same, "
+ "however stored trace id is %s, and expected trace id is %s.",
storedTraceId, builder.getFullTraceId()));
}
FlowController.LimitExceededBehavior storedLimitExceededBehavior =
singleConnectionOrConnectionPool.connectionWorkerPool().limitExceededBehavior();
if (!Objects.equals(storedLimitExceededBehavior, builder.limitExceededBehavior)) {
Expand Down Expand Up @@ -479,7 +479,8 @@ ApiFuture<AppendRowsResponse> appendWithUniqueId(
AppendRowsRequest.newBuilder().build(),
/*StreamWriter=*/ this,
/*RetrySettings=*/ null,
requestUniqueId);
requestUniqueId,
fullTraceId);
requestWrapper.appendResult.setException(
new Exceptions.StreamWriterClosedException(
Status.fromCode(Status.Code.FAILED_PRECONDITION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,6 @@ ConnectionWorkerPool createConnectionWorkerPool(
maxBytes,
maxRetryDuration,
FlowController.LimitExceededBehavior.Block,
TEST_TRACE_ID,
null,
clientSettings,
retrySettings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public void setUp() throws Exception {
.build());
}


@Test
public void testMultiplexedAppendSuccess() throws Exception {
try (ConnectionWorker connectionWorker = createMultiplexedConnectionWorker()) {
Expand All @@ -97,11 +98,13 @@ public void testMultiplexedAppendSuccess() throws Exception {
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema("foo"))
.setLocation("us")
.setTraceId("some_head:trace_id_1")
.build();
StreamWriter sw2 =
StreamWriter.newBuilder(TEST_STREAM_2, client)
.setWriterSchema(createProtoSchema("complicate"))
.setLocation("us")
.setTraceId("some_head:trace_id_2")
.build();
// We do a pattern of:
// send to stream1, string1
Expand Down Expand Up @@ -159,28 +162,32 @@ public void testMultiplexedAppendSuccess() throws Exception {
case 0:
assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
assertThat(
serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName())
serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName())
.isEqualTo("foo");
assertThat(serverRequest.getTraceId()).isEqualTo(sw1.getFullTraceId());
break;
case 1:
// The write stream is empty until we enter multiplexing.
assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
// Schema is empty if not at the first request after table switch.
assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse();
assertThat(serverRequest.getTraceId()).isEmpty();
break;
case 2:
// Stream name is always populated after multiplexing.
assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_2);
// Schema is populated after table switch.
assertThat(
serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName())
serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName())
.isEqualTo("complicate");
assertThat(serverRequest.getTraceId()).isEqualTo(sw2.getFullTraceId());
break;
case 3:
// Schema is empty if not at the first request after table switch.
assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse();
// Stream name is always populated after multiplexing.
assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_2);
assertThat(serverRequest.getTraceId()).isEmpty();
break;
default: // fall out
break;
Expand Down

0 comments on commit 12eaf0d

Please sign in to comment.