Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: an attempt to solve test failure in nightly build #2330

Merged
merged 1 commit into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -371,22 +371,24 @@ static BigQueryWriteSettings getBigQueryWriteSettings(Builder builder) throws IO

// Validate whether the fetched connection pool matched certain properties.
private void validateFetchedConnectonPool(StreamWriter.Builder builder) {
String paramsValidatedFailed = "";
if (!Objects.equals(
this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId(),
builder.traceId)) {
paramsValidatedFailed = "Trace id";
} else if (!Objects.equals(
this.singleConnectionOrConnectionPool.connectionWorkerPool().limitExceededBehavior(),
builder.limitExceededBehavior)) {
paramsValidatedFailed = "Limit Exceeds Behavior";
}

if (!paramsValidatedFailed.isEmpty()) {
String storedTraceId =
this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId();
if (!Objects.equals(storedTraceId, builder.traceId)) {
throw new IllegalArgumentException(
String.format(
"%s used for the same connection pool for the same location must be the same!",
paramsValidatedFailed));
"Trace id used for the same connection pool for the same location must be the same, "
+ "however stored trace id is %s, and expected trace id is %s.",
storedTraceId, builder.traceId));
}
FlowController.LimitExceededBehavior storedLimitExceededBehavior =
singleConnectionOrConnectionPool.connectionWorkerPool().limitExceededBehavior();
if (!Objects.equals(storedLimitExceededBehavior, builder.limitExceededBehavior)) {
throw new IllegalArgumentException(
String.format(
"Limit exceeded behavior setting used for the same connection pool for the same "
+ "location must be the same, however stored value is %s, and expected "
+ "value is %s.",
storedLimitExceededBehavior, builder.limitExceededBehavior));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public class ITBigQueryWriteManualClientTest {
private static final String TABLE = "testtable";
private static final String TABLE2 = "complicatedtable";

private static final String TEST_TRACE_ID = "DATAFLOW:job_id";

private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset";

private static BigQueryWriteClient client;
Expand Down Expand Up @@ -928,6 +930,7 @@ public void testStreamWriterWithDefaultValue() throws ExecutionException, Interr
ProtoSchemaConverter.convert(SimpleTypeForDefaultValue.getDescriptor()))
.setDefaultMissingValueInterpretation(MissingValueInterpretation.DEFAULT_VALUE)
.setEnableConnectionPool(true)
.setTraceId(TEST_TRACE_ID)
.build()) {
// 1. row has both fields set.
SimpleTypeForDefaultValue simpleTypeForDefaultValue1 =
Expand Down Expand Up @@ -1534,16 +1537,19 @@ public void testMultiplexingMixedLocation()
StreamWriter.newBuilder(defaultStream1)
.setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor()))
.setEnableConnectionPool(true)
.setTraceId(TEST_TRACE_ID)
.build();
StreamWriter streamWriter2 =
StreamWriter.newBuilder(defaultStream2)
.setWriterSchema(ProtoSchemaConverter.convert(ComplicateType.getDescriptor()))
.setEnableConnectionPool(true)
.setTraceId(TEST_TRACE_ID)
.build();
StreamWriter streamWriter3 =
StreamWriter.newBuilder(defaultStream3)
.setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor()))
.setEnableConnectionPool(true)
.setTraceId(TEST_TRACE_ID)
.build();
ApiFuture<AppendRowsResponse> response1 =
streamWriter1.append(CreateProtoRows(new String[] {"aaa"}));
Expand All @@ -1557,6 +1563,9 @@ public void testMultiplexingMixedLocation()
assertEquals("us", streamWriter1.getLocation());
assertEquals("us", streamWriter2.getLocation());
assertEquals("eu", streamWriter3.getLocation());
streamWriter1.close();
streamWriter2.close();
streamWriter3.close();
}

@Test
Expand Down
Loading