Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 New Source: Big Query #4457

Merged
merged 69 commits into from
Jul 22, 2021
Merged
Show file tree
Hide file tree
Changes from 58 commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
286e327
unfinished jdbcsource separation
DoNotPanicUA Jun 11, 2021
6a62eaa
creation AbstactRelation
DoNotPanicUA Jun 11, 2021
0aaf904
Migrate StateManager to new abstract level (JdbcSource -> RelationalS…
DoNotPanicUA Jun 14, 2021
6dcedf5
fix imports
DoNotPanicUA Jun 14, 2021
6d8e976
move configs to Database level + fix MySql source
DoNotPanicUA Jun 14, 2021
042c527
make in line jdbc source with a new impl
DoNotPanicUA Jun 15, 2021
147e166
Fix ScaffoldJavaJdbcSource template
DoNotPanicUA Jun 15, 2021
246528a
rename `AbstractField` to `CommonField`. Now it
DoNotPanicUA Jun 17, 2021
e8d29c9
format
DoNotPanicUA Jun 17, 2021
111abae
rename generated files in line with their location
DoNotPanicUA Jun 17, 2021
3bc83b2
bonus renaming
DoNotPanicUA Jun 17, 2021
18e86d8
move utility methods specific for jdbc source to a proper module
DoNotPanicUA Jun 18, 2021
67f1e6a
internal review update
DoNotPanicUA Jun 22, 2021
42eb2c5
BigQueryDatabase impl without row transformation
DoNotPanicUA Jun 23, 2021
9326d35
add Static method for BigQueryDatabase instancing
DoNotPanicUA Jun 23, 2021
c7d2eff
remove data type parameter limitation + rename class parameters
DoNotPanicUA Jun 23, 2021
c78a441
Merge remote-tracking branch 'origin/aleonets/4024-abstract-source' i…
DoNotPanicUA Jun 23, 2021
06f5d13
Move DataTypeUtils from jdbs to common + impl basic types BigQueryUtils
DoNotPanicUA Jun 23, 2021
d16f883
Merge remote-tracking branch 'origin/master' into aleonets/4024-abstr…
DoNotPanicUA Jun 23, 2021
75e7c99
make DB2 in line with new relational abstract classes
DoNotPanicUA Jun 23, 2021
3dc5383
add missing import
DoNotPanicUA Jun 23, 2021
fd60d14
cover all biqquery classes + add type transformation method from Stan…
DoNotPanicUA Jun 29, 2021
b3e0801
Merge remote-tracking branch 'origin/master' into 4024-abstract-source
DoNotPanicUA Jun 30, 2021
5421313
close unused connections
DoNotPanicUA Jun 30, 2021
22356d0
Merge branch 'master' into aleonets/1876-source-bigquery
heade Jul 1, 2021
32b53ee
Merge remote-tracking branch 'origin/aleonets/4024-abstract-source' i…
DoNotPanicUA Jul 1, 2021
8c706b4
Merge remote-tracking branch 'origin/aleonets/1876-source-bigquery' i…
heade Jul 1, 2021
aa38921
add table list extract method
DoNotPanicUA Jul 1, 2021
5d31771
Merge remote-tracking branch 'origin/aleonets/1876-source-bigquery' i…
heade Jul 1, 2021
4fb2f24
bigquery source connector
heade Jul 1, 2021
f4d6aa0
return all tables for a whole project instead of a dataset
DoNotPanicUA Jul 1, 2021
7f76db9
impl incremental fetch
DoNotPanicUA Jul 1, 2021
0495b35
bigquery source connector
heade Jul 2, 2021
d764c16
bigquery source connector
heade Jul 2, 2021
33a447e
remove unnecessary databaseid
DoNotPanicUA Jul 5, 2021
e114a18
add primitive type filtering
DoNotPanicUA Jul 5, 2021
c27a744
Merge remote-tracking branch 'origin/master' into aleonets/1876-sourc…
DoNotPanicUA Jul 5, 2021
74f8350
add temporary workaround for test database.
DoNotPanicUA Jul 6, 2021
2a5703e
add dataset location
DoNotPanicUA Jul 7, 2021
ae5f059
fix table info retrieving
DoNotPanicUA Jul 7, 2021
904f054
handle dataset config
DoNotPanicUA Jul 8, 2021
094fa82
Add working comprehensive test without data cases
DoNotPanicUA Jul 8, 2021
32bd999
minor changes in the source processing
DoNotPanicUA Jul 9, 2021
5541017
acceptance tests; discover method fix
heade Jul 9, 2021
667018b
Merge remote-tracking branch 'origin/aleonets/1876-source-bigquery' i…
heade Jul 9, 2021
4e8910f
discover method fix
heade Jul 9, 2021
36693ed
first comprehensinve test
DoNotPanicUA Jul 9, 2021
5468d54
Merge branch 'aleonets/1876-source-bigquery' of https://github.com/ai…
DoNotPanicUA Jul 9, 2021
8dc3f44
Comprehensive tests for the BigQuery source + database timeout config
DoNotPanicUA Jul 11, 2021
194af3f
bigquery acceptance tests fix; formatting
heade Jul 12, 2021
62b3f89
fix incremental sync using date, datetime, time and timestamp types
DoNotPanicUA Jul 13, 2021
954995a
Implement source checks: basic and dataset
DoNotPanicUA Jul 13, 2021
d90f96c
Merge remote-tracking branch 'origin/master' into aleonets/1876-sourc…
DoNotPanicUA Jul 13, 2021
e258c82
format
DoNotPanicUA Jul 13, 2021
6107f30
revert: airbyte_protocol.by
DoNotPanicUA Jul 13, 2021
f95c1af
Merge remote-tracking branch 'origin/master' into aleonets/1876-sourc…
DoNotPanicUA Jul 14, 2021
bfa5cf3
internal review update
DoNotPanicUA Jul 14, 2021
d64ce0b
Add possibility to get list of comprehensive tests in a Markdown tabl…
DoNotPanicUA Jul 14, 2021
fd33eed
Merge branch 'master' into aleonets/1876-source-bigquery
heade Jul 15, 2021
6a58540
Update airbyte-integrations/connectors/source-bigquery/src/main/resou…
DoNotPanicUA Jul 16, 2021
d6053f9
review update
DoNotPanicUA Jul 16, 2021
e545247
Implement processing for arrays and structures
DoNotPanicUA Jul 16, 2021
2fd9199
format
DoNotPanicUA Jul 16, 2021
45c7f0c
Merge remote-tracking branch 'origin/master' into aleonets/1876-sourc…
DoNotPanicUA Jul 16, 2021
ebce19c
Merge remote-tracking branch 'origin/aleonets/1876-source-bigquery' i…
heade Jul 20, 2021
46f5b3e
added bigquery secrets
heade Jul 20, 2021
e493468
added bigquery secrets
heade Jul 20, 2021
05067c3
spec fix
heade Jul 22, 2021
449a0b5
test configs fix
heade Jul 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions airbyte-db/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,12 @@ dependencies {

testImplementation 'org.apache.commons:commons-lang3:3.11'
testImplementation "org.testcontainers:postgresql:1.15.1"

// Big Query
implementation platform('com.google.cloud:libraries-bom:20.6.0')
implementation('com.google.cloud:google-cloud-bigquery:1.133.1')

// Lombok
implementation 'org.projectlombok:lombok:1.18.20'
annotationProcessor('org.projectlombok:lombok:1.18.20')
}
34 changes: 34 additions & 0 deletions airbyte-db/src/main/java/io/airbyte/db/DataTypeSupplier.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.db;

import java.sql.SQLException;

@FunctionalInterface
public interface DataTypeSupplier<DataType> {

DataType apply() throws SQLException;

}
63 changes: 63 additions & 0 deletions airbyte-db/src/main/java/io/airbyte/db/DataTypeUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.db;

import java.sql.Date;
import java.sql.SQLException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.function.Function;

public class DataTypeUtils {

public static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); // Quoted "Z" to indicate UTC, no timezone offset

public static <T> T returnNullIfInvalid(DataTypeSupplier<T> valueProducer) {
return returnNullIfInvalid(valueProducer, ignored -> true);
}

public static <T> T returnNullIfInvalid(DataTypeSupplier<T> valueProducer, Function<T, Boolean> isValidFn) {
// Some edge case values (e.g: Infinity, NaN) have no java or JSON equivalent, and will throw an
// exception when parsed. We want to parse those
// values as null.
// This method reduces error handling boilerplate.
try {
T value = valueProducer.apply();
return isValidFn.apply(value) ? value : null;
} catch (SQLException e) {
return null;
yaroslav-hrytsaienko marked this conversation as resolved.
Show resolved Hide resolved
}
}

public static String toISO8601String(long epochMillis) {
return DATE_FORMAT.format(Date.from(Instant.ofEpochMilli(epochMillis)));
}

public static String toISO8601String(java.util.Date date) {
return DATE_FORMAT.format(date);
}

}
5 changes: 5 additions & 0 deletions airbyte-db/src/main/java/io/airbyte/db/Databases.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package io.airbyte.db;

import io.airbyte.commons.lang.Exceptions;
import io.airbyte.db.bigquery.BigQueryDatabase;
import io.airbyte.db.jdbc.DefaultJdbcDatabase;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration;
Expand Down Expand Up @@ -157,4 +158,8 @@ private static BasicDataSource createBasicDataSource(final String username,
return connectionPool;
}

public static BigQueryDatabase createBigQueryDatabase(final String projectId, final String jsonCreds) {
return new BigQueryDatabase(projectId, jsonCreds);
}

}
5 changes: 2 additions & 3 deletions airbyte-db/src/main/java/io/airbyte/db/SqlDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,16 @@
package io.airbyte.db;

import com.fasterxml.jackson.databind.JsonNode;
import java.sql.SQLException;
import java.util.stream.Stream;

public abstract class SqlDatabase implements AutoCloseable {

private JsonNode sourceConfig;
private JsonNode databaseConfig;

public abstract void execute(String sql) throws SQLException;
public abstract void execute(String sql) throws Exception;

public abstract Stream<JsonNode> query(String sql, String... params) throws SQLException;
public abstract Stream<JsonNode> query(String sql, String... params) throws Exception;

public JsonNode getSourceConfig() {
return sourceConfig;
Expand Down
225 changes: 225 additions & 0 deletions airbyte-db/src/main/java/io/airbyte/db/bigquery/BigQueryDatabase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.db.bigquery;

import static java.util.Objects.isNull;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.api.gax.retrying.RetrySettings;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.QueryParameterValue;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.Table;
import com.google.common.base.Charsets;
import com.google.common.collect.Streams;
import io.airbyte.db.SqlDatabase;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.threeten.bp.Duration;

public class BigQueryDatabase extends SqlDatabase {

private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDatabase.class);

private final BigQuery bigQuery;

public BigQueryDatabase(String projectId, String jsonCreds) {
try {
BigQueryOptions.Builder bigQueryBuilder = BigQueryOptions.newBuilder();
ServiceAccountCredentials credentials = null;
if (jsonCreds != null && !jsonCreds.isEmpty()) {
credentials = ServiceAccountCredentials
.fromStream(new ByteArrayInputStream(jsonCreds.getBytes(Charsets.UTF_8)));
}
bigQuery = bigQueryBuilder
.setProjectId(projectId)
.setCredentials(!isNull(credentials) ? credentials : ServiceAccountCredentials.getApplicationDefault())
.setRetrySettings(RetrySettings
.newBuilder()
.setMaxAttempts(10)
.setRetryDelayMultiplier(1.5)
.setTotalTimeout(Duration.ofMinutes(60))
.build())
.build()
.getService();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void execute(String sql) throws SQLException {
final ImmutablePair<Job, String> result = executeQuery(bigQuery, getQueryConfig(sql, Collections.emptyList()));
if (result.getLeft() == null) {
throw new SQLException("BigQuery request is failed with error: " + result.getRight() + ". SQL: " + sql);
}
LOGGER.info("BigQuery successfully finished execution SQL: " + sql);
}

public Stream<JsonNode> query(String sql) throws Exception {
return query(sql, Collections.emptyList());
}

public Stream<JsonNode> query(String sql, QueryParameterValue... params) throws Exception {
return query(sql, (params == null ? Collections.emptyList() : Arrays.asList(params)));
}

@Override
public Stream<JsonNode> query(String sql, String... params) throws Exception {
List<QueryParameterValue> parameterValueList;
if (params == null)
parameterValueList = Collections.emptyList();
else
parameterValueList = Arrays.stream(params).map(param -> QueryParameterValue.newBuilder().setValue(param).setType(
StandardSQLTypeName.STRING).build()).collect(Collectors.toList());

return query(sql, parameterValueList);
}

public Stream<JsonNode> query(String sql, List<QueryParameterValue> params) throws Exception {
final ImmutablePair<Job, String> result = executeQuery(bigQuery, getQueryConfig(sql, params));

if (result.getLeft() != null) {
FieldList fieldList = result.getLeft().getQueryResults().getSchema().getFields();
return Streams.stream(result.getLeft().getQueryResults().iterateAll())
.map(fieldValues -> BigQueryUtils.rowToJson(fieldValues, fieldList));
} else
throw new Exception(
"Failed to execute query " + sql + (params != null && !params.isEmpty() ? " with params " + params : "") + ". Error: " + result.getRight());
}

@Override
public void close() throws Exception {
/**
* The BigQuery doesn't require connection close. It will be done automatically.
*/
}

public QueryJobConfiguration getQueryConfig(String sql, List<QueryParameterValue> params) {
return QueryJobConfiguration
.newBuilder(sql)
.setUseLegacySql(false)
.setPositionalParameters(params)
.build();
}

public ImmutablePair<Job, String> executeQuery(BigQuery bigquery, QueryJobConfiguration queryConfig) {
final JobId jobId = JobId.of(UUID.randomUUID().toString());
final Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
return executeQuery(queryJob);
}

/**
* Returns full information about all tables from entire project
*
* @param projectId BigQuery project id
* @return List of BigQuery tables
*/
public List<Table> getProjectTables(String projectId) {
List<Table> tableList = new ArrayList<>();
bigQuery.listDatasets(projectId)
.iterateAll()
.forEach(dataset -> bigQuery.listTables(dataset.getDatasetId())
.iterateAll()
.forEach(table -> tableList.add(bigQuery.getTable(table.getTableId()))));
return tableList;
}

/**
* Returns full information about all tables from specific Dataset
*
* @param projectId BigQuery project id
* @param datasetId BigQuery dataset id
* @return List of BigQuery tables
*/
public List<Table> getDatasetTables(String projectId, String datasetId) {
List<Table> tableList = new ArrayList<>();
bigQuery.listDatasets(projectId)
.iterateAll()
.forEach(dataset -> bigQuery.listTables(dataset.getDatasetId())
.iterateAll()
.forEach(table -> {
if (table.getTableId().getDataset().equalsIgnoreCase(datasetId))
DoNotPanicUA marked this conversation as resolved.
Show resolved Hide resolved
tableList.add(bigQuery.getTable(table.getTableId()));
}));
return tableList;
}

public BigQuery getBigQuery() {
return bigQuery;
}

public void cleanDataSet(String dataSetId) {
// allows deletion of a dataset that has contents
final BigQuery.DatasetDeleteOption option = BigQuery.DatasetDeleteOption.deleteContents();

final boolean success = bigQuery.delete(dataSetId, option);
if (success) {
LOGGER.info("BQ Dataset " + dataSetId + " deleted...");
} else {
LOGGER.info("BQ Dataset cleanup for " + dataSetId + " failed!");
}
}

private ImmutablePair<Job, String> executeQuery(Job queryJob) {
final Job completedJob = waitForQuery(queryJob);
if (completedJob == null) {
throw new RuntimeException("Job no longer exists");
} else if (completedJob.getStatus().getError() != null) {
// You can also look at queryJob.getStatus().getExecutionErrors() for all
// errors, not just the latest one.
return ImmutablePair.of(null, (completedJob.getStatus().getError().toString()));
}

return ImmutablePair.of(completedJob, null);
}

private Job waitForQuery(Job queryJob) {
try {
return queryJob.waitFor();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

}
Loading