Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Add e2e test for default value use cases for default stream and exclusive stream #2285

Merged
merged 9 commits into from
Oct 25, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import com.google.api.core.ApiFuture;
import com.google.cloud.ServiceOptions;
import com.google.cloud.bigquery.*;
import com.google.cloud.bigquery.Field.Mode;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.storage.test.Test.*;
import com.google.cloud.bigquery.storage.v1.*;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError;
import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetAlreadyExists;
import com.google.cloud.bigquery.storage.v1.Exceptions.OffsetOutOfRange;
Expand All @@ -43,6 +45,10 @@
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.text.ParseException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -63,14 +69,19 @@ public class ITBigQueryWriteManualClientTest {
private static final String DATASET_EU = RemoteBigQueryHelper.generateDatasetName();
private static final String TABLE = "testtable";
private static final String TABLE2 = "complicatedtable";

private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset";

private static BigQueryWriteClient client;
private static TableInfo tableInfo;
private static TableInfo tableInfo2;

private static TableInfo tableInfoEU;

private static TableDefinition defaultValueTableDefinition;
private static String tableId;
private static String tableId2;

private static String tableIdEU;
private static BigQuery bigquery;

Expand Down Expand Up @@ -126,6 +137,24 @@ public static void beforeClass() throws IOException {
.build(),
innerTypeFieldBuilder.setMode(Field.Mode.NULLABLE).build())))
.build();

defaultValueTableDefinition =
StandardTableDefinition.of(
Schema.of(
com.google.cloud.bigquery.Field.newBuilder(
"foo_with_default", LegacySQLTypeName.STRING)
.setDefaultValueExpression("'default_value_for_test'")
.setMode(Field.Mode.NULLABLE)
.build(),
com.google.cloud.bigquery.Field.newBuilder(
"bar_without_default", LegacySQLTypeName.STRING)
.setMode(Mode.NULLABLE)
.build(),
com.google.cloud.bigquery.Field.newBuilder(
"date_with_default_to_current", LegacySQLTypeName.TIMESTAMP)
.setDefaultValueExpression("CURRENT_TIMESTAMP()")
.setMode(Mode.NULLABLE)
.build()));
bigquery.create(tableInfo);
bigquery.create(tableInfo2);
tableId =
Expand Down Expand Up @@ -706,7 +735,12 @@ public void testJsonStreamWriterWithDefaultStream()
assertEquals(2, currentRow.get(3).getRepeatedValue().size());
assertEquals("Yg==", currentRow.get(3).getRepeatedValue().get(1).getStringValue());
assertEquals(
Timestamp.valueOf("2022-02-06 07:24:47.84").getTime() * 1000,
Timestamp.valueOf("2022-02-06 07:24:47.84")
.toLocalDateTime()
.atZone(ZoneId.of("UTC"))
.toInstant()
.toEpochMilli()
* 1000,
currentRow.get(4).getTimestampValue()); // timestamp long of "2022-02-06 07:24:47.84"
assertEquals("bbb", iter.next().get(0).getStringValue());
assertEquals("ccc", iter.next().get(0).getStringValue());
Expand All @@ -718,6 +752,110 @@ public void testJsonStreamWriterWithDefaultStream()
}
}

@Test
public void testJsonDefaultStreamOnTableWithDefaultValue_SchemaNotGiven()
throws IOException, InterruptedException, ExecutionException,
Descriptors.DescriptorValidationException, ParseException {
String tableName = "defaultStreamDefaultValue";
String defaultTableId =
String.format(
"projects/%s/datasets/%s/tables/%s",
ServiceOptions.getDefaultProjectId(), DATASET, tableName);
tableInfo =
TableInfo.newBuilder(TableId.of(DATASET, tableName), defaultValueTableDefinition).build();
bigquery.create(tableInfo);
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(defaultTableId, client)
.setDefaultMissingValueInterpretation(MissingValueInterpretation.DEFAULT_VALUE)
.build()) {
testJsonStreamWriterForDefaultValue(jsonStreamWriter);
}
}

@Test
public void testJsonExclusiveStreamOnTableWithDefaultValue_GiveTableSchema()
throws IOException, InterruptedException, ExecutionException,
Descriptors.DescriptorValidationException, ParseException {
String tableName = "exclusiveStreamDefaultValue";
String exclusiveTableId =
String.format(
"projects/%s/datasets/%s/tables/%s",
ServiceOptions.getDefaultProjectId(), DATASET, tableName);
tableInfo =
TableInfo.newBuilder(TableId.of(DATASET, tableName), defaultValueTableDefinition).build();
bigquery.create(tableInfo);
WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(exclusiveTableId)
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(exclusiveTableId, writeStream.getTableSchema())
.setDefaultMissingValueInterpretation(MissingValueInterpretation.DEFAULT_VALUE)
.build()) {
testJsonStreamWriterForDefaultValue(jsonStreamWriter);
}
}

private void testJsonStreamWriterForDefaultValue(JsonStreamWriter jsonStreamWriter)
throws DescriptorValidationException, IOException, ExecutionException, InterruptedException,
ParseException {
// 1. row has both fields set.
JSONArray jsonArr1 = new JSONArray();
JSONObject row1 = new JSONObject();
row1.put("foo_with_default", "aaa");
row1.put("bar_without_default", "a");
row1.put("date_with_default_to_current", "2022-02-02 01:02:03");
jsonArr1.put(row1);
// 2. row with the column with default value unset
JSONObject row2 = new JSONObject();
row2.put("bar_without_default", "a");
jsonArr1.put(row2);
// 3. both value not set
JSONObject row3 = new JSONObject();
jsonArr1.put(row3);

// Start insertion and validation.
ApiFuture<AppendRowsResponse> response1 = jsonStreamWriter.append(jsonArr1, -1);
response1.get();
TableResult result =
bigquery.listTableData(tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
Iterator<FieldValueList> iter = result.getValues().iterator();

FieldValueList currentRow = iter.next();
assertEquals("aaa", currentRow.get(0).getStringValue());
assertEquals("a", currentRow.get(1).getStringValue());
assertEquals(
Timestamp.valueOf("2022-02-02 01:02:03")
.toLocalDateTime()
.atZone(ZoneId.of("UTC"))
.toInstant()
.toEpochMilli(),
Double.valueOf(currentRow.get(2).getStringValue()).longValue() * 1000);

currentRow = iter.next();
assertEquals("default_value_for_test", currentRow.get(0).getStringValue());
assertFalse(currentRow.get(2).getStringValue().isEmpty());
assertEquals("a", currentRow.get(1).getStringValue());
// Check whether the recorded value is up to date enough.
Instant parsedInstant =
Instant.ofEpochSecond(Double.valueOf(currentRow.get(2).getStringValue()).longValue());
assertTrue(parsedInstant.isAfter(Instant.now().minus(1, ChronoUnit.HOURS)));

currentRow = iter.next();
assertEquals("default_value_for_test", currentRow.get(0).getStringValue());
assertEquals(null, currentRow.get(1).getValue());
assertFalse(currentRow.get(2).getStringValue().isEmpty());
// Check whether the recorded value is up to date enough.
parsedInstant =
Instant.ofEpochSecond(Double.valueOf(currentRow.get(2).getStringValue()).longValue());
assertTrue(parsedInstant.isAfter(Instant.now().minus(1, ChronoUnit.HOURS)));

assertEquals(false, iter.hasNext());
}

// This test runs about 1 min.
@Test
public void testJsonStreamWriterWithMessagesOver10M()
Expand Down