Skip to content

Commit

Permalink
Fix typo in spec.json for destination AWS Datalake & fix python licen…
Browse files Browse the repository at this point in the history
…se plugin setup (#11811)
  • Loading branch information
kattos-aws authored May 6, 2022
1 parent 6eacd18 commit 66d7609
Show file tree
Hide file tree
Showing 13 changed files with 170 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
- name: AWS Datalake
destinationDefinitionId: 99878c90-0fbd-46d3-9d98-ffde879d17fc
dockerRepository: airbyte/destination-aws-datalake
dockerImageTag: 0.1.0
dockerImageTag: 0.1.1
documentationUrl: https://docs.airbyte.io/integrations/destinations/aws-datalake
releaseStage: alpha
- name: BigQuery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@
supportsDBT: false
supported_destination_sync_modes:
- "append"
- dockerImage: "airbyte/destination-aws-datalake:0.1.0"
- dockerImage: "airbyte/destination-aws-datalake:0.1.1"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/aws-datalake"
connectionSpecification:
Expand Down Expand Up @@ -243,7 +243,7 @@
required:
- "credentials_title"
- "aws_access_key_id"
- "aws_secret_access_key_id"
- "aws_secret_access_key"
properties:
credentials_title:
type: "string"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.name=airbyte/destination-aws-datalake
28 changes: 28 additions & 0 deletions airbyte-integrations/connectors/destination-aws-datalake/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,34 @@ To run acceptance and custom integration tests:
./gradlew :airbyte-integrations:connectors:destination-aws-datalake:integrationTest
```

#### Running the Destination Acceptance Tests

To successfully run the Destination Acceptance Tests, you need a `secrets/config.json` file with appropriate information. For example:

```json
{
"bucket_name": "your-bucket-name",
"bucket_prefix": "your-prefix",
"region": "your-region",
"aws_account_id": "111111111111",
"lakeformation_database_name": "an_lf_database",
"credentials": {
"credentials_title": "IAM User",
"aws_access_key_id": ".....",
"aws_secret_access_key": "....."
}
}
```

In the AWS account, you need to have the following elements in place:

* An IAM user with appropriate IAM permissions (Notably S3 and Athena)
* A Lake Formation database pointing to the configured S3 location (See: [Creating a database](https://docs.aws.amazon.com/lake-formation/latest/dg/creating-database.html))
* An Athena workspace named `AmazonAthenaLakeFormation` where the IAM user has proper authorizations
* The user must have appropriate permissions to the Lake Formation database to perform the tests (For example see: [Granting Database Permissions Using the Lake Formation Console and the Named Resource Method](https://docs.aws.amazon.com/lake-formation/latest/dg/granting-database-permissions.html))



## Dependency Management

All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ dependencies {

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-aws-datalake')
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import json

import boto3
from airbyte_cdk.destinations import Destination
from botocore.exceptions import ClientError
Expand Down Expand Up @@ -54,7 +56,6 @@ def session(self) -> boto3.Session:

@retry(stop_max_attempt_number=10, wait_random_min=2000, wait_random_max=3000)
def head_bucket(self):
print(self._bucket_name)
self.s3_client.head_bucket(Bucket=self._bucket_name)

@retry(stop_max_attempt_number=10, wait_random_min=2000, wait_random_max=3000)
Expand Down Expand Up @@ -101,7 +102,6 @@ def get_table(self, txid, database_name: str, table_name: str, location: str):
else:
return None

@retry(stop_max_attempt_number=10, wait_random_min=2000, wait_random_max=3000)
def update_table(self, database, table_info, transaction_id):
self.glue_client.update_table(DatabaseName=database, TableInput=table_info, TransactionId=transaction_id)

Expand All @@ -115,6 +115,22 @@ def preprocess_type(self, property_type):
else:
return property_type

def cast_to_athena(self, str_type):
preprocessed_type = self.preprocess_type(str_type)
return self.COLUMNS_MAPPING.get(preprocessed_type, preprocessed_type)

def generate_athena_schema(self, schema):
columns = []
for (k, v) in schema.items():
athena_type = self.cast_to_athena(v["type"])
if athena_type == "object":
properties = v["properties"]
type_str = ",".join([f"{k1}:{self.cast_to_athena(v1['type'])}" for (k1, v1) in properties.items()])
columns.append({"Name": k, "Type": f"struct<{type_str}>"})
else:
columns.append({"Name": k, "Type": athena_type})
return columns

def update_table_schema(self, txid, database, table, schema):
table_info = table["Table"]
table_info_keys = list(table_info.keys())
Expand All @@ -137,17 +153,16 @@ def update_table_schema(self, txid, database, table, schema):
]:
table_info.pop(k)

self.logger.info("Schema = " + repr(schema))
self.logger.debug("Schema = " + repr(schema))

columns = [{"Name": k, "Type": self.COLUMNS_MAPPING.get(self.preprocess_type(v["type"]), v["type"])} for (k, v) in schema.items()]
columns = self.generate_athena_schema(schema)
if "StorageDescriptor" in table_info:
table_info["StorageDescriptor"]["Columns"] = columns
else:
table_info["StorageDescriptor"] = {"Columns": columns}
self.update_table(database, table_info, txid)
self.glue_client.update_table(DatabaseName=database, TableInput=table_info, TransactionId=txid)

@retry(stop_max_attempt_number=10, wait_random_min=2000, wait_random_max=3000)
def get_all_table_objects(self, txid, database, table):
table_objects = []

Expand Down Expand Up @@ -177,7 +192,6 @@ def get_all_table_objects(self, txid, database, table):
flat_list = [item for sublist in table_objects for item in sublist]
return flat_list

@retry(stop_max_attempt_number=10, wait_random_min=2000, wait_random_max=3000)
def purge_table(self, txid, database, table):
self.logger.debug(f"Going to purge table {table}")
write_ops = []
Expand Down Expand Up @@ -235,13 +249,16 @@ def cancel_transaction(self):
self._aws_handler.lf_client.cancel_transaction(TransactionId=self.txid)

def commit_transaction(self):
self._logger.debug("Commiting Lakeformation Transaction")
self._logger.debug(f"Commiting Lakeformation Transaction {self.txid}")
self._aws_handler.lf_client.commit_transaction(TransactionId=self.txid)

def extend_transaction(self):
self._logger.debug("Extending Lakeformation Transaction")
self._aws_handler.lf_client.extend_transaction(TransactionId=self.txid)

def describe_transaction(self):
return self._aws_handler.lf_client.describe_transaction(TransactionId=self.txid)

def __enter__(self, transaction_type="READ_AND_WRITE"):
self._logger.debug("Starting Lakeformation Transaction")
self._transaction = self._aws_handler.lf_client.start_transaction(TransactionType=transaction_type)
Expand All @@ -250,6 +267,9 @@ def __enter__(self, transaction_type="READ_AND_WRITE"):

def __exit__(self, exc_type, exc_val, exc_tb):
self._logger.debug("Exiting LakeformationTransaction context manager")
tx_desc = self.describe_transaction()
self._logger.debug(json.dumps(tx_desc, default=str))

if exc_type:
self._logger.error("Exiting LakeformationTransaction context manager due to an exception")
self._logger.error(repr(exc_type))
Expand All @@ -258,5 +278,11 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self._transaction = None
else:
self._logger.debug("Exiting LakeformationTransaction context manager due to reaching end of with block")
self.commit_transaction()
self._transaction = None
try:
self.commit_transaction()
self._transaction = None
except Exception as e:
self.cancel_transaction()
self._logger.error(f"Could not commit the transaction id = {self.txid} because of :\n{repr(e)}")
self._transaction = None
raise (e)
Original file line number Diff line number Diff line change
Expand Up @@ -45,30 +45,28 @@ def write(
raise
self.logger.debug("AWS session creation OK")

with LakeformationTransaction(aws_handler) as tx:
# creating stream writers
streams = {
s.stream.name: StreamWriter(
name=s.stream.name,
aws_handler=aws_handler,
tx=tx,
connector_config=connector_config,
schema=s.stream.json_schema["properties"],
sync_mode=s.destination_sync_mode,
)
for s in configured_catalog.streams
}

for message in input_messages:
if message.type == Type.STATE:
yield message
else:
data = message.record.data
stream = message.record.stream
streams[stream].append_message(json.dumps(data, default=str))

for stream_name, stream in streams.items():
stream.add_to_datalake()
# creating stream writers
streams = {
s.stream.name: StreamWriter(
name=s.stream.name,
aws_handler=aws_handler,
connector_config=connector_config,
schema=s.stream.json_schema["properties"],
sync_mode=s.destination_sync_mode,
)
for s in configured_catalog.streams
}

for message in input_messages:
if message.type == Type.STATE:
yield message
else:
data = message.record.data
stream = message.record.stream
streams[stream].append_message(json.dumps(data, default=str))

for stream_name, stream in streams.items():
stream.add_to_datalake()

def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
"required": [
"credentials_title",
"aws_access_key_id",
"aws_secret_access_key_id"
"aws_secret_access_key"
],
"properties": {
"credentials_title": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,28 @@

import nanoid
from airbyte_cdk.models import DestinationSyncMode
from retrying import retry

from .aws import AwsHandler
from .aws import AwsHandler, LakeformationTransaction


class StreamWriter:
def __init__(self, name, aws_handler: AwsHandler, tx, connector_config, schema, sync_mode):
def __init__(self, name, aws_handler: AwsHandler, connector_config, schema, sync_mode):
self._db = connector_config.lakeformation_database_name
self._bucket = connector_config.bucket_name
self._prefix = connector_config.bucket_prefix
self._table = name
self._aws_handler = aws_handler
self._tx = tx
self._schema = schema
self._sync_mode = sync_mode
self._messages = []
self._logger = aws_handler.logger

self._logger.info(f"Creating StreamWriter for {self._db}:{self._table}")
self._logger.debug(f"Creating StreamWriter for {self._db}:{self._table}")
if sync_mode == DestinationSyncMode.overwrite:
self._logger.info(f"StreamWriter mode is OVERWRITE, need to purge {self._db}:{self._table}")
self._aws_handler.purge_table(self._tx.txid, self._db, self._table)
self._logger.debug(f"StreamWriter mode is OVERWRITE, need to purge {self._db}:{self._table}")
with LakeformationTransaction(self._aws_handler) as tx:
self._aws_handler.purge_table(tx.txid, self._db, self._table)

def append_message(self, message):
self._logger.debug(f"Appending message to table {self._table}")
Expand All @@ -41,18 +42,30 @@ def generate_object_key(self, prefix=None):

return path

@retry(stop_max_attempt_number=10, wait_random_min=2000, wait_random_max=3000)
def add_to_datalake(self):
self._logger.info(f"Flushing messages to table {self._table}")
object_prefix = f"{self._prefix}/{self._table}"
object_key = self.generate_object_key(object_prefix)
self._aws_handler.put_object(object_key, self._messages)
res = self._aws_handler.head_object(object_key)

table_location = "s3://" + self._bucket + "/" + self._prefix + "/" + self._table + "/"
table = self._aws_handler.get_table(self._tx.txid, self._db, self._table, table_location)
self._aws_handler.update_table_schema(self._tx.txid, self._db, table, self._schema)

self._aws_handler.update_governed_table(
self._tx.txid, self._db, self._table, self._bucket, object_key, res["ETag"], res["ContentLength"]
)
with LakeformationTransaction(self._aws_handler) as tx:
self._logger.debug(f"Flushing messages to table {self._table}")
object_prefix = f"{self._prefix}/{self._table}"
table_location = "s3://" + self._bucket + "/" + self._prefix + "/" + self._table + "/"

table = self._aws_handler.get_table(tx.txid, self._db, self._table, table_location)
self._aws_handler.update_table_schema(tx.txid, self._db, table, self._schema)

if len(self._messages) > 0:
try:
self._logger.debug(f"There are {len(self._messages)} messages to flush for {self._table}")
self._logger.debug(f"10 first messages >>> {repr(self._messages[0:10])} <<<")
object_key = self.generate_object_key(object_prefix)
self._aws_handler.put_object(object_key, self._messages)
res = self._aws_handler.head_object(object_key)
self._aws_handler.update_governed_table(
tx.txid, self._db, self._table, self._bucket, object_key, res["ETag"], res["ContentLength"]
)
self._logger.debug(f"Table {self._table} was updated")
except Exception as e:
self._logger.error(f"An exception was raised:\n{repr(e)}")
raise (e)
else:
self._logger.debug(f"There was no message to flush for {self._table}")
self._messages = []
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class AthenaHelper {
private static final Logger LOGGER = LoggerFactory.getLogger(AthenaHelper.class);

public AthenaHelper(AwsCredentials credentials, Region region, String outputBucket, String workGroup) {
LOGGER.debug(String.format("region = %s, outputBucket = %s, workGroup = %s", region, outputBucket, workGroup));
var credProvider = StaticCredentialsProvider.create(credentials);
this.athenaClient = AthenaClient.builder().region(region).credentialsProvider(credProvider).build();
this.outputBucket = outputBucket;
Expand Down Expand Up @@ -83,7 +84,6 @@ public void waitForQueryToComplete(String queryExecutionId) throws InterruptedEx
// Sleep an amount of time before retrying again
Thread.sleep(1000);
}
System.out.println("The current status is: " + queryState);
}
}

Expand Down Expand Up @@ -115,6 +115,7 @@ public GetQueryResultsIterable runQuery(String database, String query) throws In
try {
waitForQueryToComplete(execId);
} catch (RuntimeException e) {
e.printStackTrace();
LOGGER.info("Athena query failed once. Retrying.");
retryCount++;
continue;
Expand Down
Loading

0 comments on commit 66d7609

Please sign in to comment.