Skip to content

Commit

Permalink
add to integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Dec 23, 2021
1 parent cc8c4a5 commit 24643b7
Showing 1 changed file with 15 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package io.airbyte.integrations.destination.snowflake;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
Expand All @@ -22,6 +25,7 @@
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.io.IOException;
import java.nio.file.Path;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -117,10 +121,17 @@ protected List<String> resolveIdentifier(final String identifier) {
return result;
}

private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schema) throws SQLException, InterruptedException {
private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schema) throws SQLException {
return SnowflakeDatabase.getDatabase(getConfig()).bufferedResultSetQuery(
connection -> connection.createStatement()
.executeQuery(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schema, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)),
connection -> {
final ResultSet tableInfo = connection.createStatement()
.executeQuery(String.format("SHOW TABLES LIKE '%s' IN SCHEMA %s;", tableName, schema));
assertTrue(tableInfo.next());
// check that we're creating permanent tables. DBT defaults to transient tables, which have `TRANSIENT` as the value for the `kind` column.
assertEquals("TABLE", tableInfo.getString("kind"));
return connection.createStatement()
.executeQuery(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schema, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT));
},
JdbcUtils.getDefaultSourceOperations()::rowToJson);
}

Expand Down Expand Up @@ -163,7 +174,7 @@ public void testSyncWithBillionRecords(final String messagesFilename, final Stri
runSyncAndVerifyStateOutput(config, largeNumberRecords, configuredCatalog, false);
}

private <T> T parseConfig(final String path, Class<T> clazz) throws IOException {
private <T> T parseConfig(final String path, final Class<T> clazz) throws IOException {
return Jsons.deserialize(MoreResources.readResource(path), clazz);
}

Expand Down

0 comments on commit 24643b7

Please sign in to comment.