diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index b8adb20eee45..50d7bdb143b6 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -21,7 +21,7 @@ - name: AWS Datalake destinationDefinitionId: 99878c90-0fbd-46d3-9d98-ffde879d17fc dockerRepository: airbyte/destination-aws-datalake - dockerImageTag: 0.1.0 + dockerImageTag: 0.1.1 documentationUrl: https://docs.airbyte.io/integrations/destinations/aws-datalake releaseStage: alpha - name: BigQuery diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 8bbfcf28e7e7..1940432fba84 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -188,7 +188,7 @@ supportsDBT: false supported_destination_sync_modes: - "append" -- dockerImage: "airbyte/destination-aws-datalake:0.1.0" +- dockerImage: "airbyte/destination-aws-datalake:0.1.1" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/aws-datalake" connectionSpecification: @@ -243,7 +243,7 @@ required: - "credentials_title" - "aws_access_key_id" - - "aws_secret_access_key_id" + - "aws_secret_access_key" properties: credentials_title: type: "string" diff --git a/airbyte-integrations/connectors/destination-aws-datalake/Dockerfile b/airbyte-integrations/connectors/destination-aws-datalake/Dockerfile index 12ec2897fd09..ebe2844b7244 100644 --- a/airbyte-integrations/connectors/destination-aws-datalake/Dockerfile +++ b/airbyte-integrations/connectors/destination-aws-datalake/Dockerfile @@ -13,5 +13,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.version=0.1.1 LABEL io.airbyte.name=airbyte/destination-aws-datalake diff --git a/airbyte-integrations/connectors/destination-aws-datalake/README.md b/airbyte-integrations/connectors/destination-aws-datalake/README.md index a89b68c3acf2..81a16e03a8ad 100644 --- a/airbyte-integrations/connectors/destination-aws-datalake/README.md +++ b/airbyte-integrations/connectors/destination-aws-datalake/README.md @@ -140,6 +140,34 @@ To run acceptance and custom integration tests: ./gradlew :airbyte-integrations:connectors:destination-aws-datalake:integrationTest ``` +#### Running the Destination Acceptance Tests + +To successfully run the Destination Acceptance Tests, you need a `secrets/config.json` file with appropriate information. For example: + +```json +{ + "bucket_name": "your-bucket-name", + "bucket_prefix": "your-prefix", + "region": "your-region", + "aws_account_id": "111111111111", + "lakeformation_database_name": "an_lf_database", + "credentials": { + "credentials_title": "IAM User", + "aws_access_key_id": ".....", + "aws_secret_access_key": "....." + } +} +``` + +In the AWS account, you need to have the following elements in place: + +* An IAM user with appropriate IAM permissions (Notably S3 and Athena) +* A Lake Formation database pointing to the configured S3 location (See: [Creating a database](https://docs.aws.amazon.com/lake-formation/latest/dg/creating-database.html)) +* An Athena workspace named `AmazonAthenaLakeFormation` where the IAM user has proper authorizations +* The user must have appropriate permissions to the Lake Formation database to perform the tests (For example see: [Granting Database Permissions Using the Lake Formation Console and the Named Resource Method](https://docs.aws.amazon.com/lake-formation/latest/dg/granting-database-permissions.html)) + + + ## Dependency Management All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development. diff --git a/airbyte-integrations/connectors/destination-aws-datalake/build.gradle b/airbyte-integrations/connectors/destination-aws-datalake/build.gradle index b39e2774c5b4..8a47d52ac042 100644 --- a/airbyte-integrations/connectors/destination-aws-datalake/build.gradle +++ b/airbyte-integrations/connectors/destination-aws-datalake/build.gradle @@ -22,4 +22,4 @@ dependencies { integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test') integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-aws-datalake') -} \ No newline at end of file +} diff --git a/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/aws.py b/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/aws.py index f39174021818..d2ed9ed25d49 100644 --- a/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/aws.py +++ b/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/aws.py @@ -2,6 +2,8 @@ # Copyright (c) 2021 Airbyte, Inc., all rights reserved. # +import json + import boto3 from airbyte_cdk.destinations import Destination from botocore.exceptions import ClientError @@ -54,7 +56,6 @@ def session(self) -> boto3.Session: @retry(stop_max_attempt_number=10, wait_random_min=2000, wait_random_max=3000) def head_bucket(self): - print(self._bucket_name) self.s3_client.head_bucket(Bucket=self._bucket_name) @retry(stop_max_attempt_number=10, wait_random_min=2000, wait_random_max=3000) @@ -101,7 +102,6 @@ def get_table(self, txid, database_name: str, table_name: str, location: str): else: return None - @retry(stop_max_attempt_number=10, wait_random_min=2000, wait_random_max=3000) def update_table(self, database, table_info, transaction_id): self.glue_client.update_table(DatabaseName=database, TableInput=table_info, TransactionId=transaction_id) @@ -115,6 +115,22 @@ def preprocess_type(self, property_type): else: return property_type + def cast_to_athena(self, str_type): + preprocessed_type = self.preprocess_type(str_type) + return self.COLUMNS_MAPPING.get(preprocessed_type, preprocessed_type) + + def generate_athena_schema(self, schema): + columns = [] + for (k, v) in schema.items(): + athena_type = self.cast_to_athena(v["type"]) + if athena_type == "object": + properties = v["properties"] + type_str = ",".join([f"{k1}:{self.cast_to_athena(v1['type'])}" for (k1, v1) in properties.items()]) + columns.append({"Name": k, "Type": f"struct<{type_str}>"}) + else: + columns.append({"Name": k, "Type": athena_type}) + return columns + def update_table_schema(self, txid, database, table, schema): table_info = table["Table"] table_info_keys = list(table_info.keys()) @@ -137,9 +153,9 @@ def update_table_schema(self, txid, database, table, schema): ]: table_info.pop(k) - self.logger.info("Schema = " + repr(schema)) + self.logger.debug("Schema = " + repr(schema)) - columns = [{"Name": k, "Type": self.COLUMNS_MAPPING.get(self.preprocess_type(v["type"]), v["type"])} for (k, v) in schema.items()] + columns = self.generate_athena_schema(schema) if "StorageDescriptor" in table_info: table_info["StorageDescriptor"]["Columns"] = columns else: @@ -147,7 +163,6 @@ def update_table_schema(self, txid, database, table, schema): self.update_table(database, table_info, txid) self.glue_client.update_table(DatabaseName=database, TableInput=table_info, TransactionId=txid) - @retry(stop_max_attempt_number=10, wait_random_min=2000, wait_random_max=3000) def get_all_table_objects(self, txid, database, table): table_objects = [] @@ -177,7 +192,6 @@ def get_all_table_objects(self, txid, database, table): flat_list = [item for sublist in table_objects for item in sublist] return flat_list - @retry(stop_max_attempt_number=10, wait_random_min=2000, wait_random_max=3000) def purge_table(self, txid, database, table): self.logger.debug(f"Going to purge table {table}") write_ops = [] @@ -235,13 +249,16 @@ def cancel_transaction(self): self._aws_handler.lf_client.cancel_transaction(TransactionId=self.txid) def commit_transaction(self): - self._logger.debug("Commiting Lakeformation Transaction") + self._logger.debug(f"Commiting Lakeformation Transaction {self.txid}") self._aws_handler.lf_client.commit_transaction(TransactionId=self.txid) def extend_transaction(self): self._logger.debug("Extending Lakeformation Transaction") self._aws_handler.lf_client.extend_transaction(TransactionId=self.txid) + def describe_transaction(self): + return self._aws_handler.lf_client.describe_transaction(TransactionId=self.txid) + def __enter__(self, transaction_type="READ_AND_WRITE"): self._logger.debug("Starting Lakeformation Transaction") self._transaction = self._aws_handler.lf_client.start_transaction(TransactionType=transaction_type) @@ -250,6 +267,9 @@ def __enter__(self, transaction_type="READ_AND_WRITE"): def __exit__(self, exc_type, exc_val, exc_tb): self._logger.debug("Exiting LakeformationTransaction context manager") + tx_desc = self.describe_transaction() + self._logger.debug(json.dumps(tx_desc, default=str)) + if exc_type: self._logger.error("Exiting LakeformationTransaction context manager due to an exception") self._logger.error(repr(exc_type)) @@ -258,5 +278,11 @@ def __exit__(self, exc_type, exc_val, exc_tb): self._transaction = None else: self._logger.debug("Exiting LakeformationTransaction context manager due to reaching end of with block") - self.commit_transaction() - self._transaction = None + try: + self.commit_transaction() + self._transaction = None + except Exception as e: + self.cancel_transaction() + self._logger.error(f"Could not commit the transaction id = {self.txid} because of :\n{repr(e)}") + self._transaction = None + raise (e) diff --git a/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/destination.py b/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/destination.py index fc5151c18b18..e3872055efcd 100644 --- a/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/destination.py +++ b/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/destination.py @@ -45,30 +45,28 @@ def write( raise self.logger.debug("AWS session creation OK") - with LakeformationTransaction(aws_handler) as tx: - # creating stream writers - streams = { - s.stream.name: StreamWriter( - name=s.stream.name, - aws_handler=aws_handler, - tx=tx, - connector_config=connector_config, - schema=s.stream.json_schema["properties"], - sync_mode=s.destination_sync_mode, - ) - for s in configured_catalog.streams - } - - for message in input_messages: - if message.type == Type.STATE: - yield message - else: - data = message.record.data - stream = message.record.stream - streams[stream].append_message(json.dumps(data, default=str)) - - for stream_name, stream in streams.items(): - stream.add_to_datalake() + # creating stream writers + streams = { + s.stream.name: StreamWriter( + name=s.stream.name, + aws_handler=aws_handler, + connector_config=connector_config, + schema=s.stream.json_schema["properties"], + sync_mode=s.destination_sync_mode, + ) + for s in configured_catalog.streams + } + + for message in input_messages: + if message.type == Type.STATE: + yield message + else: + data = message.record.data + stream = message.record.stream + streams[stream].append_message(json.dumps(data, default=str)) + + for stream_name, stream in streams.items(): + stream.add_to_datalake() def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: """ diff --git a/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/spec.json b/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/spec.json index 72b4f9d37b9d..b5b876c41a6e 100644 --- a/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/spec.json +++ b/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/spec.json @@ -56,7 +56,7 @@ "required": [ "credentials_title", "aws_access_key_id", - "aws_secret_access_key_id" + "aws_secret_access_key" ], "properties": { "credentials_title": { diff --git a/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/stream_writer.py b/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/stream_writer.py index 96c95bbd8b2e..7407a053b754 100644 --- a/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/stream_writer.py +++ b/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/stream_writer.py @@ -6,27 +6,28 @@ import nanoid from airbyte_cdk.models import DestinationSyncMode +from retrying import retry -from .aws import AwsHandler +from .aws import AwsHandler, LakeformationTransaction class StreamWriter: - def __init__(self, name, aws_handler: AwsHandler, tx, connector_config, schema, sync_mode): + def __init__(self, name, aws_handler: AwsHandler, connector_config, schema, sync_mode): self._db = connector_config.lakeformation_database_name self._bucket = connector_config.bucket_name self._prefix = connector_config.bucket_prefix self._table = name self._aws_handler = aws_handler - self._tx = tx self._schema = schema self._sync_mode = sync_mode self._messages = [] self._logger = aws_handler.logger - self._logger.info(f"Creating StreamWriter for {self._db}:{self._table}") + self._logger.debug(f"Creating StreamWriter for {self._db}:{self._table}") if sync_mode == DestinationSyncMode.overwrite: - self._logger.info(f"StreamWriter mode is OVERWRITE, need to purge {self._db}:{self._table}") - self._aws_handler.purge_table(self._tx.txid, self._db, self._table) + self._logger.debug(f"StreamWriter mode is OVERWRITE, need to purge {self._db}:{self._table}") + with LakeformationTransaction(self._aws_handler) as tx: + self._aws_handler.purge_table(tx.txid, self._db, self._table) def append_message(self, message): self._logger.debug(f"Appending message to table {self._table}") @@ -41,18 +42,30 @@ def generate_object_key(self, prefix=None): return path + @retry(stop_max_attempt_number=10, wait_random_min=2000, wait_random_max=3000) def add_to_datalake(self): - self._logger.info(f"Flushing messages to table {self._table}") - object_prefix = f"{self._prefix}/{self._table}" - object_key = self.generate_object_key(object_prefix) - self._aws_handler.put_object(object_key, self._messages) - res = self._aws_handler.head_object(object_key) - - table_location = "s3://" + self._bucket + "/" + self._prefix + "/" + self._table + "/" - table = self._aws_handler.get_table(self._tx.txid, self._db, self._table, table_location) - self._aws_handler.update_table_schema(self._tx.txid, self._db, table, self._schema) - - self._aws_handler.update_governed_table( - self._tx.txid, self._db, self._table, self._bucket, object_key, res["ETag"], res["ContentLength"] - ) + with LakeformationTransaction(self._aws_handler) as tx: + self._logger.debug(f"Flushing messages to table {self._table}") + object_prefix = f"{self._prefix}/{self._table}" + table_location = "s3://" + self._bucket + "/" + self._prefix + "/" + self._table + "/" + + table = self._aws_handler.get_table(tx.txid, self._db, self._table, table_location) + self._aws_handler.update_table_schema(tx.txid, self._db, table, self._schema) + + if len(self._messages) > 0: + try: + self._logger.debug(f"There are {len(self._messages)} messages to flush for {self._table}") + self._logger.debug(f"10 first messages >>> {repr(self._messages[0:10])} <<<") + object_key = self.generate_object_key(object_prefix) + self._aws_handler.put_object(object_key, self._messages) + res = self._aws_handler.head_object(object_key) + self._aws_handler.update_governed_table( + tx.txid, self._db, self._table, self._bucket, object_key, res["ETag"], res["ContentLength"] + ) + self._logger.debug(f"Table {self._table} was updated") + except Exception as e: + self._logger.error(f"An exception was raised:\n{repr(e)}") + raise (e) + else: + self._logger.debug(f"There was no message to flush for {self._table}") self._messages = [] diff --git a/airbyte-integrations/connectors/destination-aws-datalake/src/main/java/AthenaHelper.java b/airbyte-integrations/connectors/destination-aws-datalake/src/main/java/AthenaHelper.java index d94bfd9b6338..c3dd70e91735 100644 --- a/airbyte-integrations/connectors/destination-aws-datalake/src/main/java/AthenaHelper.java +++ b/airbyte-integrations/connectors/destination-aws-datalake/src/main/java/AthenaHelper.java @@ -29,6 +29,7 @@ public class AthenaHelper { private static final Logger LOGGER = LoggerFactory.getLogger(AthenaHelper.class); public AthenaHelper(AwsCredentials credentials, Region region, String outputBucket, String workGroup) { + LOGGER.debug(String.format("region = %s, outputBucket = %s, workGroup = %s", region, outputBucket, workGroup)); var credProvider = StaticCredentialsProvider.create(credentials); this.athenaClient = AthenaClient.builder().region(region).credentialsProvider(credProvider).build(); this.outputBucket = outputBucket; @@ -83,7 +84,6 @@ public void waitForQueryToComplete(String queryExecutionId) throws InterruptedEx // Sleep an amount of time before retrying again Thread.sleep(1000); } - System.out.println("The current status is: " + queryState); } } @@ -115,6 +115,7 @@ public GetQueryResultsIterable runQuery(String database, String query) throws In try { waitForQueryToComplete(execId); } catch (RuntimeException e) { + e.printStackTrace(); LOGGER.info("Athena query failed once. Retrying."); retryCount++; continue; diff --git a/airbyte-integrations/connectors/destination-aws-datalake/src/test-integration/java/io/airbyte/integrations/destination/aws_datalake/AwsDatalakeDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-aws-datalake/src/test-integration/java/io/airbyte/integrations/destination/aws_datalake/AwsDatalakeDestinationAcceptanceTest.java index d131a2ed525b..2ea0b57e2614 100644 --- a/airbyte-integrations/connectors/destination-aws-datalake/src/test-integration/java/io/airbyte/integrations/destination/aws_datalake/AwsDatalakeDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-aws-datalake/src/test-integration/java/io/airbyte/integrations/destination/aws_datalake/AwsDatalakeDestinationAcceptanceTest.java @@ -4,6 +4,8 @@ package io.airbyte.integrations.destination.aws_datalake; +import static org.junit.jupiter.api.Assertions.assertEquals; + import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; @@ -76,16 +78,8 @@ protected List retrieveRecords(TestDestinationEnv testEnv, String namespace, JsonNode streamSchema) throws IOException, InterruptedException { - // TODO Implement this method to retrieve records which written to the destination by the connector. - // Records returned from this method will be compared against records provided to the connector - // to verify they were written correctly - LOGGER.info(String.format(">>>>>>>>>> namespace = %s, streamName = %s", namespace, streamName)); - // 2. Read from database:table (SELECT *) String query = String.format("SELECT * FROM \"%s\".\"%s\"", config.getDatabaseName(), streamName); - LOGGER.info(String.format(">>>>>>>>>> query = %s", query)); GetQueryResultsIterable results = athenaHelper.runQuery(config.getDatabaseName(), query); - // 3. return the rows as a list of JsonNodes - return parseResults(results); } @@ -96,8 +90,6 @@ protected List parseResults(GetQueryResultsIterable queryResults) { for (GetQueryResultsResponse result : queryResults) { List columnInfoList = result.resultSet().resultSetMetadata().columnInfo(); Iterator results = result.resultSet().rows().iterator(); - // processRow(results, columnInfoList); - // first row has column names Row colNamesRow = results.next(); while (results.hasNext()) { Map jsonMap = Maps.newHashMap(); @@ -107,8 +99,7 @@ protected List parseResults(GetQueryResultsIterable queryResults) { while (colInfoIterator.hasNext() && datum.hasNext()) { ColumnInfo colInfo = colInfoIterator.next(); Datum value = datum.next(); - LOGGER.info(String.format("key = %s, value = %s, type = %s", colInfo.name(), value.varCharValue(), colInfo.type())); - Object typedFieldValue = getTypedFieldValue(colInfo.type(), value.varCharValue()); + Object typedFieldValue = getTypedFieldValue(colInfo, value); if (typedFieldValue != null) { jsonMap.put(colInfo.name(), typedFieldValue); } @@ -119,16 +110,24 @@ protected List parseResults(GetQueryResultsIterable queryResults) { return processedResults; } - private static Object getTypedFieldValue(String typeName, String varCharValue) { + private static Object getTypedFieldValue(ColumnInfo colInfo, Datum value) { + var typeName = colInfo.type(); + var varCharValue = value.varCharValue(); + if (varCharValue == null) return null; - return switch (typeName) { + var returnType = switch (typeName) { case "real", "double", "float" -> Double.parseDouble(varCharValue); case "varchar" -> varCharValue; case "boolean" -> Boolean.parseBoolean(varCharValue); case "integer" -> Integer.parseInt(varCharValue); + case "row" -> varCharValue; default -> null; }; + if (returnType == null) { + LOGGER.warn(String.format("Unsupported type = %s", typeName)); + } + return returnType; } @Override @@ -150,13 +149,38 @@ protected void setup(TestDestinationEnv testEnv) throws IOException { this.config = AwsDatalakeDestinationConfig.getAwsDatalakeDestinationConfig(configJson); + Region region = Region.of(config.getRegion()); + AwsBasicCredentials awsCreds = AwsBasicCredentials.create(config.getAccessKeyId(), config.getSecretAccessKey()); - athenaHelper = new AthenaHelper(awsCreds, Region.US_EAST_1, String.format("s3://%s/airbyte_athena/", config.getBucketName()), - "AmazonAthenaLakeFormationPreview"); - glueHelper = new GlueHelper(awsCreds, Region.US_EAST_1); + athenaHelper = new AthenaHelper(awsCreds, region, String.format("s3://%s/airbyte_athena/", config.getBucketName()), + "AmazonAthenaLakeFormation"); + glueHelper = new GlueHelper(awsCreds, region); glueHelper.purgeDatabase(config.getDatabaseName()); } + private String toAthenaObject(JsonNode value) { + StringBuilder sb = new StringBuilder("\"{"); + List elements = new ArrayList<>(); + var it = value.fields(); + while (it.hasNext()) { + Map.Entry f = it.next(); + final String k = f.getKey(); + final String v = f.getValue().asText(); + elements.add(String.format("%s=%s", k, v)); + } + sb.append(String.join(",", elements)); + sb.append("}\""); + return sb.toString(); + } + + protected void assertSameValue(final String key, final JsonNode expectedValue, final JsonNode actualValue) { + if (expectedValue.isObject()) { + assertEquals(toAthenaObject(expectedValue), actualValue.toString()); + } else { + assertEquals(expectedValue, actualValue); + } + } + @Override protected void tearDown(TestDestinationEnv testEnv) { // TODO Implement this method to run any cleanup actions needed after every test case diff --git a/build.gradle b/build.gradle index 00930de35bec..ff8918dcf8e9 100644 --- a/build.gradle +++ b/build.gradle @@ -350,6 +350,9 @@ subprojects { if (pythonFormatTask != null) { apply plugin: "com.github.hierynomus.license" + license { + header rootProject.file("LICENSE_SHORT") + } task licenseFormatPython(type: com.hierynomus.gradle.license.tasks.LicenseFormat) { header = createPythonLicenseWith(rootProject.file('LICENSE_SHORT')) source = fileTree(dir: projectDir) diff --git a/docs/integrations/destinations/aws-datalake.md b/docs/integrations/destinations/aws-datalake.md index 64f121693a9d..0fa59745a562 100644 --- a/docs/integrations/destinations/aws-datalake.md +++ b/docs/integrations/destinations/aws-datalake.md @@ -99,4 +99,5 @@ following information to configure the destination: ## Changelog +| 0.1.1 | 2022-04-20 | [\#11811](https://github.com/airbytehq/airbyte/pull/11811) | Fix name of required param in specification | | 0.1.0 | 2022-03-29 | [\#10760](https://github.com/airbytehq/airbyte/pull/10760) | Initial release | \ No newline at end of file