Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination Snowflake: update check method to verify permissions for staging #8781

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba",
"name": "Snowflake",
"dockerRepository": "airbyte/destination-snowflake",
"dockerImageTag": "0.3.19",
"dockerImageTag": "0.3.21",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/snowflake",
"icon": "snowflake.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@
- name: Snowflake
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.3.20
dockerImageTag: 0.3.21
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
icon: snowflake.svg
- name: MariaDB ColumnStore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.20
LABEL io.airbyte.version=0.3.21
LABEL io.airbyte.name=airbyte/destination-snowflake
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public static Connection getConnection(final JsonNode config) throws SQLExceptio
// https://docs.snowflake.com/en/user-guide/jdbc-parameters.html#application
// identify airbyte traffic to snowflake to enable partnership & optimization opportunities
properties.put("application", "airbyte");
// Needed for JDK17 - see https://stackoverflow.com/questions/67409650/snowflake-jdbc-driver-internal-error-fail-to-retrieve-row-count-for-first-arrow
properties.put("JDBC_QUERY_RESULT_FORMAT", "JSON");

return DriverManager.getConnection(connectUrl, properties);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,49 @@
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.UUID;
import java.util.function.Consumer;

public class SnowflakeInternalStagingDestination extends AbstractJdbcDestination implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeInternalStagingDestination.class);

public SnowflakeInternalStagingDestination() {
super("", new SnowflakeSQLNameTransformer(), new SnowflakeStagingSqlOperations());
}

@Override
public AirbyteConnectionStatus check(JsonNode config) {
SnowflakeSQLNameTransformer nameTransformer = new SnowflakeSQLNameTransformer();
SnowflakeStagingSqlOperations snowflakeStagingSqlOperations = new SnowflakeStagingSqlOperations();
try (final JdbcDatabase database = getDatabase(config)) {
final String outputSchema = super.getNamingResolver().getIdentifier(config.get("schema").asText());
attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, snowflakeStagingSqlOperations);
attemptSQLCreateAndDropStages(outputSchema, database, nameTransformer, snowflakeStagingSqlOperations);
return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
} catch (final Exception e) {
LOGGER.error("Exception while checking connection: ", e);
return new AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage("Could not connect with provided configuration. \n" + e.getMessage());
}
}

private static void attemptSQLCreateAndDropStages(String outputSchema, JdbcDatabase database, SnowflakeSQLNameTransformer namingResolver, SnowflakeStagingSqlOperations sqlOperations) throws Exception {

// verify we have permissions to create/drop stage
final String outputTableName = namingResolver.getIdentifier("_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", ""));
String stageName = namingResolver.getStageName(outputSchema, outputTableName);;
sqlOperations.createStageIfNotExists(database, stageName);
sqlOperations.dropStageIfExists(database,stageName);
}

@Override
protected JdbcDatabase getDatabase(final JsonNode config) {
return SnowflakeDatabase.getDatabase(config);
Expand Down
3 changes: 3 additions & 0 deletions docs/integrations/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ By default, Airbyte uses batches of `INSERT` commands to add data to a temporary

Internal named stages are storage location objects within a Snowflake database/schema. Because they are database objects, the same security permissions apply as with any other database objects. No need to provide additional properties for internal staging

**Operating on a stage also requires the USAGE privilege on the parent database and schema.**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add it to the script above used to create the user?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the script above already has needed statement
grant OWNERSHIP
on database identifier($airbyte_database)
to role identifier($airbyte_role);


### AWS S3

For AWS S3, you will need to create a bucket and provide credentials to access the bucket. We recommend creating a bucket that is only used for Airbyte to stage data to Snowflake. Airbyte needs read/write access to interact with this bucket.
Expand Down Expand Up @@ -194,6 +196,7 @@ Finally, you need to add read/write permissions to your bucket with that email.

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| 0.3.21 | 2021-12-15 | [#8781](https://github.com/airbytehq/airbyte/pull/8781) | Updated check method to verify permissions to create/drop stage for internal staging; compatibility fix for Java 17 |
| 0.3.20 | 2021-12-10 | [#8562](https://github.com/airbytehq/airbyte/pull/8562) | Moving classes around for better dependency management; compatibility fix for Java 17 |
| 0.3.19 | 2021-12-06 | [#8528](https://github.com/airbytehq/airbyte/pull/8528) | Set Internal Staging as default choice |
| 0.3.18 | 2021-11-26 | [#8253](https://github.com/airbytehq/airbyte/pull/8253) | Snowflake Internal Staging Support |
Expand Down
Empty file added tools/bin/ci_credentials.sh
Empty file.