Skip to content

Commit

Permalink
Destination Snowflake: update check method to verify permissions for …
Browse files Browse the repository at this point in the history
…staging (airbytehq#8781)

* Destination Snowflake update check method to verify permission for stages

* fix for jdk 17

* fix for jdk 17

* fix with ci secrets

* fix with ci secrets

* removed snowflake secrets from ci_credentials.sh

* bump version

Co-authored-by: vmaltsev <[email protected]>
  • Loading branch information
2 people authored and schlattk committed Jan 4, 2022
1 parent b1c7a11 commit 4246b19
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba",
"name": "Snowflake",
"dockerRepository": "airbyte/destination-snowflake",
"dockerImageTag": "0.3.19",
"dockerImageTag": "0.3.21",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/snowflake",
"icon": "snowflake.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@
- name: Snowflake
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.3.20
dockerImageTag: 0.3.21
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
icon: snowflake.svg
- name: MariaDB ColumnStore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.20
LABEL io.airbyte.version=0.3.21
LABEL io.airbyte.name=airbyte/destination-snowflake
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public static Connection getConnection(final JsonNode config) throws SQLExceptio
// https://docs.snowflake.com/en/user-guide/jdbc-parameters.html#application
// identify airbyte traffic to snowflake to enable partnership & optimization opportunities
properties.put("application", "airbyte");
// Needed for JDK17 - see https://stackoverflow.com/questions/67409650/snowflake-jdbc-driver-internal-error-fail-to-retrieve-row-count-for-first-arrow
properties.put("JDBC_QUERY_RESULT_FORMAT", "JSON");

return DriverManager.getConnection(connectUrl, properties);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,49 @@
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.UUID;
import java.util.function.Consumer;

public class SnowflakeInternalStagingDestination extends AbstractJdbcDestination implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeInternalStagingDestination.class);

public SnowflakeInternalStagingDestination() {
super("", new SnowflakeSQLNameTransformer(), new SnowflakeStagingSqlOperations());
}

@Override
public AirbyteConnectionStatus check(JsonNode config) {
SnowflakeSQLNameTransformer nameTransformer = new SnowflakeSQLNameTransformer();
SnowflakeStagingSqlOperations snowflakeStagingSqlOperations = new SnowflakeStagingSqlOperations();
try (final JdbcDatabase database = getDatabase(config)) {
final String outputSchema = super.getNamingResolver().getIdentifier(config.get("schema").asText());
attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, snowflakeStagingSqlOperations);
attemptSQLCreateAndDropStages(outputSchema, database, nameTransformer, snowflakeStagingSqlOperations);
return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
} catch (final Exception e) {
LOGGER.error("Exception while checking connection: ", e);
return new AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage("Could not connect with provided configuration. \n" + e.getMessage());
}
}

private static void attemptSQLCreateAndDropStages(String outputSchema, JdbcDatabase database, SnowflakeSQLNameTransformer namingResolver, SnowflakeStagingSqlOperations sqlOperations) throws Exception {

// verify we have permissions to create/drop stage
final String outputTableName = namingResolver.getIdentifier("_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", ""));
String stageName = namingResolver.getStageName(outputSchema, outputTableName);;
sqlOperations.createStageIfNotExists(database, stageName);
sqlOperations.dropStageIfExists(database,stageName);
}

@Override
protected JdbcDatabase getDatabase(final JsonNode config) {
return SnowflakeDatabase.getDatabase(config);
Expand Down
3 changes: 3 additions & 0 deletions docs/integrations/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ By default, Airbyte uses batches of `INSERT` commands to add data to a temporary

Internal named stages are storage location objects within a Snowflake database/schema. Because they are database objects, the same security permissions apply as with any other database objects. No need to provide additional properties for internal staging

**Operating on a stage also requires the USAGE privilege on the parent database and schema.**

### AWS S3

For AWS S3, you will need to create a bucket and provide credentials to access the bucket. We recommend creating a bucket that is only used for Airbyte to stage data to Snowflake. Airbyte needs read/write access to interact with this bucket.
Expand Down Expand Up @@ -194,6 +196,7 @@ Finally, you need to add read/write permissions to your bucket with that email.

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| 0.3.21 | 2021-12-15 | [#8781](https://github.com/airbytehq/airbyte/pull/8781) | Updated check method to verify permissions to create/drop stage for internal staging; compatibility fix for Java 17 |
| 0.3.20 | 2021-12-10 | [#8562](https://github.com/airbytehq/airbyte/pull/8562) | Moving classes around for better dependency management; compatibility fix for Java 17 |
| 0.3.19 | 2021-12-06 | [#8528](https://github.com/airbytehq/airbyte/pull/8528) | Set Internal Staging as default choice |
| 0.3.18 | 2021-11-26 | [#8253](https://github.com/airbytehq/airbyte/pull/8253) | Snowflake Internal Staging Support |
Expand Down
Empty file added tools/bin/ci_credentials.sh
Empty file.

0 comments on commit 4246b19

Please sign in to comment.