From ac351a53ea2d73b678e97e8822089e4f6afd7d37 Mon Sep 17 00:00:00 2001
From: Luke Miller
Date: Thu, 9 Nov 2023 15:56:43 +0100
Subject: [PATCH] incorporate feedback + lint
---
README.md | 305 +++++++++++++++--------------
dbt/adapters/athena/connections.py | 6 +-
dbt/adapters/athena/impl.py | 42 ++--
3 files changed, 177 insertions(+), 176 deletions(-)
diff --git a/README.md b/README.md
index 2c892fa9..87bb3157 100644
--- a/README.md
+++ b/README.md
@@ -18,52 +18,52 @@
-* [Features](#features)
- * [Quick Start](#quick-start)
- * [Installation](#installation)
- * [Prerequisites](#prerequisites)
- * [Credentials](#credentials)
- * [Configuring your profile](#configuring-your-profile)
- * [Additional information](#additional-information)
- * [Models](#models)
- * [Table Configuration](#table-configuration)
- * [Table location](#table-location)
- * [Incremental models](#incremental-models)
- * [On schema change](#on-schema-change)
- * [Iceberg](#iceberg)
- * [Highly available table](#highly-available-table-ha)
- * [Known issues](#ha-known-issues)
- * [Snapshots](#snapshots)
- * [Timestamp strategy](#timestamp-strategy)
- * [Check strategy](#check-strategy)
- * [Hard-deletes](#hard-deletes)
- * [AWS Lakeformation integration](#aws-lakeformation-integration)
- * [Working example](#working-example)
- * [Known issues](#snapshots-known-issues)
- * [Contracts](#contracts)
- * [Contributing](#contributing)
- * [Contributors ✨](#contributors-)
+- [Features](#features)
+ - [Quick Start](#quick-start)
+ - [Installation](#installation)
+ - [Prerequisites](#prerequisites)
+ - [Credentials](#credentials)
+ - [Configuring your profile](#configuring-your-profile)
+ - [Additional information](#additional-information)
+ - [Models](#models)
+ - [Table Configuration](#table-configuration)
+ - [Table location](#table-location)
+ - [Incremental models](#incremental-models)
+ - [On schema change](#on-schema-change)
+ - [Iceberg](#iceberg)
+ - [Highly available table (HA)](#highly-available-table-ha)
+ - [HA Known issues](#ha-known-issues)
+ - [Snapshots](#snapshots)
+ - [Timestamp strategy](#timestamp-strategy)
+ - [Check strategy](#check-strategy)
+ - [Hard-deletes](#hard-deletes)
+ - [AWS Lakeformation integration](#aws-lakeformation-integration)
+ - [Working example](#working-example)
+ - [Snapshots Known issues](#snapshots-known-issues)
+ - [Contracts](#contracts)
+ - [Contributing](#contributing)
+ - [Contributors ✨](#contributors-)
# Features
-* Supports dbt version `1.6.*`
-* Support for Python
-* Supports [seeds][seeds]
-* Correctly detects views and their columns
-* Supports [table materialization][table]
- * [Iceberg tables][athena-iceberg] are supported **only with Athena Engine v3** and **a unique table location**
+- Supports dbt version `1.6.*`
+- Support for Python
+- Supports [seeds][seeds]
+- Correctly detects views and their columns
+- Supports [table materialization][table]
+ - [Iceberg tables][athena-iceberg] are supported **only with Athena Engine v3** and **a unique table location**
(see table location section below)
- * Hive tables are supported by both Athena engines.
-* Supports [incremental models][incremental]
- * On Iceberg tables :
- * Supports the use of `unique_key` only with the `merge` strategy
- * Supports the `append` strategy
- * On Hive tables :
- * Supports two incremental update strategies: `insert_overwrite` and `append`
- * Does **not** support the use of `unique_key`
-* Supports [snapshots][snapshots]
-* Does not support [Python models][python-models]
+ - Hive tables are supported by both Athena engines.
+- Supports [incremental models][incremental]
+ - On Iceberg tables :
+ - Supports the use of `unique_key` only with the `merge` strategy
+ - Supports the `append` strategy
+ - On Hive tables :
+ - Supports two incremental update strategies: `insert_overwrite` and `append`
+ - Does **not** support the use of `unique_key`
+- Supports [snapshots][snapshots]
+- Does not support [Python models][python-models]
[seeds]: https://docs.getdbt.com/docs/building-a-dbt-project/seeds
@@ -81,8 +81,8 @@
### Installation
-* `pip install dbt-athena-community`
-* Or `pip install git+https://github.com/dbt-athena/dbt-athena.git`
+- `pip install dbt-athena-community`
+- Or `pip install git+https://github.com/dbt-athena/dbt-athena.git`
### Prerequisites
@@ -97,8 +97,8 @@ WITH DBPROPERTIES ('creator'='Foo Bar', 'email'='foo@bar.com');
Notes:
-* Take note of your AWS region code (e.g. `us-west-2` or `eu-west-2`, etc.).
-* You can also use [AWS Glue](https://docs.aws.amazon.com/athena/latest/ug/glue-athena.html) to create and manage Athena
+- Take note of your AWS region code (e.g. `us-west-2` or `eu-west-2`, etc.).
+- You can also use [AWS Glue](https://docs.aws.amazon.com/athena/latest/ug/glue-athena.html) to create and manage Athena
databases.
### Credentials
@@ -108,32 +108,33 @@ be [determined automatically](https://boto3.amazonaws.com/v1/documentation/api/l
on `aws cli`/`boto3` conventions.
You can either:
-* configure `aws_access_key_id` and `aws_secret_access_key`
-* configure `aws_profile_name` to match a profile defined in your AWS credentials file
+- configure `aws_access_key_id` and `aws_secret_access_key`
+- configure `aws_profile_name` to match a profile defined in your AWS credentials file
Checkout dbt profile configuration below for details.
### Configuring your profile
A dbt profile can be configured to run against AWS Athena using the following configuration:
-| Option | Description | Required? | Example |
-| --------------------- | ------------------------------------------------------------------------------------- | --------- | ------------------------------------------ |
-| s3_staging_dir | S3 location to store Athena query results and metadata | Required | `s3://bucket/dbt/` |
-| s3_data_dir | Prefix for storing tables, if different from the connection's `s3_staging_dir` | Optional | `s3://bucket2/dbt/` |
-| s3_data_naming | How to generate table paths in `s3_data_dir` | Optional | `schema_table_unique` |
-| s3_tmp_table_dir | Prefix for storing temporary tables, if different from the connection's `s3_data_dir` | Optional | `s3://bucket3/dbt/` |
-| region_name | AWS region of your Athena instance | Required | `eu-west-1` |
-| schema | Specify the schema (Athena database) to build models into (lowercase **only**) | Required | `dbt` |
-| database | Specify the database (Data catalog) to build models into (lowercase **only**) | Required | `awsdatacatalog` |
-| poll_interval | Interval in seconds to use for polling the status of query results in Athena | Optional | `5` |
-| debug_query_state | Flag if debug message with Athena query state is needed | Optional | `false` |
-| aws_access_key_id | Access key ID of the user performing requests. | Optional | `AKIAIOSFODNN7EXAMPLE` |
-| aws_secret_access_key | Secret access key of the user performing requests | Optional | `wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY` |
-| aws_profile_name | Profile to use from your AWS shared credentials file. | Optional | `my-profile` |
-| work_group | Identifier of Athena workgroup | Optional | `my-custom-workgroup` |
-| num_retries | Number of times to retry a failing query | Optional | `3` |
-| seed_s3_upload_args | Dictionary containing boto3 ExtraArgs when uploading to S3 | Optional | `{"ACL": "bucket-owner-full-control"}` |
-| lf_tags_database | Default LF tags for new database if it's created by dbt | Optional | `tag_key: tag_value` |
+| Option | Description | Required? | Example |
+| --------------------- | ---------------------------------------------------------------------------------------- | --------- | ------------------------------------------ |
+| s3_staging_dir | S3 location to store Athena query results and metadata | Required | `s3://bucket/dbt/` |
+| s3_data_dir | Prefix for storing tables, if different from the connection's `s3_staging_dir` | Optional | `s3://bucket2/dbt/` |
+| s3_data_naming | How to generate table paths in `s3_data_dir` | Optional | `schema_table_unique` |
+| s3_tmp_table_dir | Prefix for storing temporary tables, if different from the connection's `s3_data_dir` | Optional | `s3://bucket3/dbt/` |
+| region_name | AWS region of your Athena instance | Required | `eu-west-1` |
+| schema | Specify the schema (Athena database) to build models into (lowercase **only**) | Required | `dbt` |
+| database | Specify the database (Data catalog) to build models into (lowercase **only**) | Required | `awsdatacatalog` |
+| poll_interval | Interval in seconds to use for polling the status of query results in Athena | Optional | `5` |
+| debug_query_state | Flag if debug message with Athena query state is needed | Optional | `false` |
+| aws_access_key_id | Access key ID of the user performing requests. | Optional | `AKIAIOSFODNN7EXAMPLE` |
+| aws_secret_access_key | Secret access key of the user performing requests | Optional | `wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY` |
+| aws_profile_name | Profile to use from your AWS shared credentials file. | Optional | `my-profile` |
+| work_group | Identifier of Athena workgroup | Optional | `my-custom-workgroup` |
+| num_retries | Number of times to retry a failing query | Optional | `3` |
+| num_boto3_retries | Number of times to retry boto3 requests (e.g. deleting S3 files for materialized tables) | Optional | `5` |
+| seed_s3_upload_args | Dictionary containing boto3 ExtraArgs when uploading to S3 | Optional | `{"ACL": "bucket-owner-full-control"}` |
+| lf_tags_database | Default LF tags for new database if it's created by dbt | Optional | `tag_key: tag_value` |
**Example profiles.yml entry:**
@@ -158,61 +159,61 @@ athena:
### Additional information
-* `threads` is supported
-* `database` and `catalog` can be used interchangeably
+- `threads` is supported
+- `database` and `catalog` can be used interchangeably
## Models
### Table Configuration
-* `external_location` (`default=none`)
- * If set, the full S3 path in which the table will be saved.
- * Does not work with Iceberg table or Hive table with `ha` set to true.
-* `partitioned_by` (`default=none`)
- * An array list of columns by which the table will be partitioned
- * Limited to creation of 100 partitions (*currently*)
-* `bucketed_by` (`default=none`)
- * An array list of columns to bucket data, ignored if using Iceberg
-* `bucket_count` (`default=none`)
- * The number of buckets for bucketing your data, ignored if using Iceberg
-* `table_type` (`default='hive'`)
- * The type of table
- * Supports `hive` or `iceberg`
-* `ha` (`default=false`)
- * If the table should be built using the high-availability method. This option is only available for Hive tables
+- `external_location` (`default=none`)
+ - If set, the full S3 path in which the table will be saved.
+ - Does not work with Iceberg table or Hive table with `ha` set to true.
+- `partitioned_by` (`default=none`)
+ - An array list of columns by which the table will be partitioned
+ - Limited to creation of 100 partitions (*currently*)
+- `bucketed_by` (`default=none`)
+ - An array list of columns to bucket data, ignored if using Iceberg
+- `bucket_count` (`default=none`)
+ - The number of buckets for bucketing your data, ignored if using Iceberg
+- `table_type` (`default='hive'`)
+ - The type of table
+ - Supports `hive` or `iceberg`
+- `ha` (`default=false`)
+ - If the table should be built using the high-availability method. This option is only available for Hive tables
since it is by default for Iceberg tables (see the section [below](#highly-available-table-ha))
-* `format` (`default='parquet'`)
- * The data format for the table
- * Supports `ORC`, `PARQUET`, `AVRO`, `JSON`, `TEXTFILE`
-* `write_compression` (`default=none`)
- * The compression type to use for any storage format that allows compression to be specified. To see which options are
+- `format` (`default='parquet'`)
+ - The data format for the table
+ - Supports `ORC`, `PARQUET`, `AVRO`, `JSON`, `TEXTFILE`
+- `write_compression` (`default=none`)
+ - The compression type to use for any storage format that allows compression to be specified. To see which options are
available, check out [CREATE TABLE AS][create-table-as]
-* `field_delimiter` (`default=none`)
- * Custom field delimiter, for when format is set to `TEXTFILE`
-* `table_properties`: table properties to add to the table, valid for Iceberg only
-* `native_drop`: Relation drop operations will be performed with SQL, not direct Glue API calls. No S3 calls will be
+- `field_delimiter` (`default=none`)
+ - Custom field delimiter, for when format is set to `TEXTFILE`
+- `table_properties`: table properties to add to the table, valid for Iceberg only
+- `native_drop`: Relation drop operations will be performed with SQL, not direct Glue API calls. No S3 calls will be
made to manage data in S3. Data in S3 will only be cleared up for Iceberg
tables [see AWS docs](https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-managing-tables.html). Note that
Iceberg DROP TABLE operations may timeout if they take longer than 60 seconds.
-* `seed_by_insert` (`default=false`)
- * default behaviour uploads seed data to S3. This flag will create seeds using an SQL insert statement
- * large seed files cannot use `seed_by_insert`, as the SQL insert statement would
+- `seed_by_insert` (`default=false`)
+ - default behaviour uploads seed data to S3. This flag will create seeds using an SQL insert statement
+ - large seed files cannot use `seed_by_insert`, as the SQL insert statement would
exceed [the Athena limit of 262144 bytes](https://docs.aws.amazon.com/athena/latest/ug/service-limits.html)
-* `force_batch` (`default=false`)
- * Skip creating the table as ctas and run the operation directly in batch insert mode.
- * This is particularly useful when the standard table creation process fails due to partition limitations,
+- `force_batch` (`default=false`)
+ - Skip creating the table as ctas and run the operation directly in batch insert mode.
+ - This is particularly useful when the standard table creation process fails due to partition limitations,
allowing you to work with temporary tables and persist the dataset more efficiently.
-* `lf_tags_config` (`default=none`)
- * [AWS lakeformation](#aws-lakeformation-integration) tags to associate with the table and columns
- * `enabled` (`default=False`) whether LF tags management is enabled for a model
- * `tags` dictionary with tags and their values to assign for the model
- * `tags_columns` dictionary with a tag key, value and list of columns they must be assigned to
- * `lf_inherited_tags` (`default=none`)
- * List of Lake Formation tag keys that are intended to be inherited from the database level and thus shouldn't be
+- `lf_tags_config` (`default=none`)
+ - [AWS lakeformation](#aws-lakeformation-integration) tags to associate with the table and columns
+ - `enabled` (`default=False`) whether LF tags management is enabled for a model
+ - `tags` dictionary with tags and their values to assign for the model
+ - `tags_columns` dictionary with a tag key, value and list of columns they must be assigned to
+ - `lf_inherited_tags` (`default=none`)
+ - List of Lake Formation tag keys that are intended to be inherited from the database level and thus shouldn't be
removed during association of those defined in `lf_tags_config`
- * i.e., the default behavior of `lf_tags_config` is to be exhaustive and first remove any pre-existing tags from
+ - i.e., the default behavior of `lf_tags_config` is to be exhaustive and first remove any pre-existing tags from
tables and columns before associating the ones currently defined for a given model
- * This breaks tag inheritance as inherited tags appear on tables and columns like those associated directly
+ - This breaks tag inheritance as inherited tags appear on tables and columns like those associated directly
```sql
{{
@@ -240,7 +241,7 @@ athena:
}}
```
-* format for `dbt_project.yml`:
+- format for `dbt_project.yml`:
```yaml
+lf_tags_config:
@@ -254,9 +255,9 @@ athena:
inherited_tags: [ tag1, tag2 ]
```
-* `lf_grants` (`default=none`)
- * lakeformation grants config for data_cell filters
- * format:
+- `lf_grants` (`default=none`)
+ - lakeformation grants config for data_cell filters
+ - format:
```python
lf_grants={
@@ -274,19 +275,19 @@ athena:
> Notes:
>
-> * `lf_tags` and `lf_tags_columns` configs support only attaching lf tags to corresponding resources.
+> - `lf_tags` and `lf_tags_columns` configs support only attaching lf tags to corresponding resources.
> We recommend managing LF Tags permissions somewhere outside dbt. For example, you may use
> [terraform](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_permissions) or
> [aws cdk](https://docs.aws.amazon.com/cdk/api/v1/docs/aws-lakeformation-readme.html) for such purpose.
-> * `data_cell_filters` management can't be automated outside dbt because the filter can't be attached to the table
+> - `data_cell_filters` management can't be automated outside dbt because the filter can't be attached to the table
> which doesn't exist. Once you `enable` this config, dbt will set all filters and their permissions during every
> dbt run. Such approach keeps the actual state of row level security configuration actual after every dbt run and
> apply changes if they occur: drop, create, update filters and their permissions.
-> * Any tags listed in `lf_inherited_tags` should be strictly inherited from the database level and never overridden at
+> - Any tags listed in `lf_inherited_tags` should be strictly inherited from the database level and never overridden at
the table and column level
-> * Currently `dbt-athena` does not differentiate between an inherited tag association and an override of same it made
+> - Currently `dbt-athena` does not differentiate between an inherited tag association and an override of same it made
> previously
-> * e.g. If an inherited tag is overridden by an `lf_tags_config` value in one DBT run, and that override is removed
+> - e.g. If an inherited tag is overridden by an `lf_tags_config` value in one DBT run, and that override is removed
prior to a subsequent run, the prior override will linger and no longer be encoded anywhere (in e.g. Terraform
where the inherited value is configured nor in the DBT project where the override previously existed but now is
gone)
@@ -303,11 +304,11 @@ The location in which a table is saved is determined by:
Here all the options available for `s3_data_naming`:
-* `unique`: `{s3_data_dir}/{uuid4()}/`
-* `table`: `{s3_data_dir}/{table}/`
-* `table_unique`: `{s3_data_dir}/{table}/{uuid4()}/`
-* `schema_table`: `{s3_data_dir}/{schema}/{table}/`
-* `s3_data_naming=schema_table_unique`: `{s3_data_dir}/{schema}/{table}/{uuid4()}/`
+- `unique`: `{s3_data_dir}/{uuid4()}/`
+- `table`: `{s3_data_dir}/{table}/`
+- `table_unique`: `{s3_data_dir}/{table}/{uuid4()}/`
+- `schema_table`: `{s3_data_dir}/{schema}/{table}/`
+- `s3_data_naming=schema_table_unique`: `{s3_data_dir}/{schema}/{table}/{uuid4()}/`
It's possible to set the `s3_data_naming` globally in the target profile, or overwrite the value in the table config,
or setting up the value for groups of model in dbt_project.yml.
@@ -321,12 +322,12 @@ Support for [incremental models](https://docs.getdbt.com/docs/build/incremental-
These strategies are supported:
-* `insert_overwrite` (default): The insert overwrite strategy deletes the overlapping partitions from the destination
+- `insert_overwrite` (default): The insert overwrite strategy deletes the overlapping partitions from the destination
table, and then inserts the new records from the source. This strategy depends on the `partitioned_by` keyword! If no
partitions are defined, dbt will fall back to the `append` strategy.
-* `append`: Insert new records without updating, deleting or overwriting any existing data. There might be duplicate
+- `append`: Insert new records without updating, deleting or overwriting any existing data. There might be duplicate
data (e.g. great for log or historical data).
-* `merge`: Conditionally updates, deletes, or inserts rows into an Iceberg table. Used in combination with `unique_key`.
+- `merge`: Conditionally updates, deletes, or inserts rows into an Iceberg table. Used in combination with `unique_key`.
Only available when using Iceberg.
### On schema change
@@ -334,10 +335,10 @@ These strategies are supported:
`on_schema_change` is an option to reflect changes of schema in incremental models.
The following options are supported:
-* `ignore` (default)
-* `fail`
-* `append_new_columns`
-* `sync_all_columns`
+- `ignore` (default)
+- `fail`
+- `append_new_columns`
+- `sync_all_columns`
For details, please refer
to [dbt docs](https://docs.getdbt.com/docs/build/incremental-models#what-if-the-columns-of-my-incremental-model-change).
@@ -375,16 +376,16 @@ Iceberg supports several table formats for data : `PARQUET`, `AVRO` and `ORC`.
It is possible to use Iceberg in an incremental fashion, specifically two strategies are supported:
-* `append`: New records are appended to the table, this can lead to duplicates.
-* `merge`: Performs an upsert (and optional delete), where new records are added and existing records are updated. Only
+- `append`: New records are appended to the table, this can lead to duplicates.
+- `merge`: Performs an upsert (and optional delete), where new records are added and existing records are updated. Only
available with Athena engine version 3.
- * `unique_key` **(required)**: columns that define a unique record in the source and target tables.
- * `incremental_predicates` (optional): SQL conditions that enable custom join clauses in the merge statement. This can
+ - `unique_key` **(required)**: columns that define a unique record in the source and target tables.
+ - `incremental_predicates` (optional): SQL conditions that enable custom join clauses in the merge statement. This can
be useful for improving performance via predicate pushdown on the target table.
- * `delete_condition` (optional): SQL condition used to identify records that should be deleted.
- * `update_condition` (optional): SQL condition used to identify records that should be updated.
- * `insert_condition` (optional): SQL condition used to identify records that should be inserted.
- * `incremental_predicates`, `delete_condition`, `update_condition` and `insert_condition` can include any column of
+ - `delete_condition` (optional): SQL condition used to identify records that should be deleted.
+ - `update_condition` (optional): SQL condition used to identify records that should be updated.
+ - `insert_condition` (optional): SQL condition used to identify records that should be inserted.
+ - `incremental_predicates`, `delete_condition`, `update_condition` and `insert_condition` can include any column of
the incremental table (`src`) or the final table (`target`).
Column names must be prefixed by either `src` or `target` to prevent a `Column is ambiguous` error.
@@ -494,12 +495,12 @@ By default, the materialization keeps the last 4 table versions, you can change
#### HA Known issues
-* When swapping from a table with partitions to a table without (and the other way around), there could be a little
+- When swapping from a table with partitions to a table without (and the other way around), there could be a little
downtime.
In case high performances are needed consider bucketing instead of partitions
-* By default, Glue "duplicates" the versions internally, so the last two versions of a table point to the same location
-* It's recommended to have `versions_to_keep` >= 4, as this will avoid having the older location removed
-* The macro `athena__end_of_time` needs to be overwritten by the user if using Athena engine v3 since it requires a
+- By default, Glue "duplicates" the versions internally, so the last two versions of a table point to the same location
+- It's recommended to have `versions_to_keep` >= 4, as this will avoid having the older location removed
+- The macro `athena__end_of_time` needs to be overwritten by the user if using Athena engine v3 since it requires a
precision parameter for timestamps
## Snapshots
@@ -525,16 +526,16 @@ the [docs](https://docs.getdbt.com/docs/build/snapshots#hard-deletes-opt-in) to
The adapter implements AWS Lakeformation tags management in the following way:
-* you can enable or disable lf-tags management via [config](#table-configuration) (disabled by default)
-* once you enable the feature, lf-tags will be updated on every dbt run
-* first, all lf-tags for columns are removed to avoid inheritance issues
-* then all redundant lf-tags are removed from table and actual tags from config are applied
-* finally, lf-tags for columns are applied
+- you can enable or disable lf-tags management via [config](#table-configuration) (disabled by default)
+- once you enable the feature, lf-tags will be updated on every dbt run
+- first, all lf-tags for columns are removed to avoid inheritance issues
+- then all redundant lf-tags are removed from table and actual tags from config are applied
+- finally, lf-tags for columns are applied
It's important to understand the following points:
-* dbt does not manage lf-tags for database
-* dbt does not manage lakeformation permissions
+- dbt does not manage lf-tags for database
+- dbt does not manage lakeformation permissions
That's why you should handle this by yourself manually or using some automation tools like terraform, AWS CDK etc.
You may find the following links useful to manage that:
@@ -638,27 +639,27 @@ from {{ ref('model') }} {% endsnapshot %}
### Snapshots Known issues
-* Incremental Iceberg models - Sync all columns on schema change can't remove columns used as partitioning.
+- Incremental Iceberg models - Sync all columns on schema change can't remove columns used as partitioning.
The only way, from a dbt perspective, is to do a full-refresh of the incremental model.
-* Tables, schemas and database should only be lowercase
+- Tables, schemas and database should only be lowercase
-* In order to avoid potential conflicts, make sure [`dbt-athena-adapter`](https://github.com/Tomme/dbt-athena) is not
+- In order to avoid potential conflicts, make sure [`dbt-athena-adapter`](https://github.com/Tomme/dbt-athena) is not
installed in the target environment.
See for more details.
-* Snapshot does not support dropping columns from the source table. If you drop a column make sure to drop the column
+- Snapshot does not support dropping columns from the source table. If you drop a column make sure to drop the column
from the snapshot as well. Another workaround is to NULL the column in the snapshot definition to preserve history
## Contracts
The adapter partly supports contract definition.
-* Concerning the `data_type`, it is supported but needs to be adjusted for complex types. They must be specified
+- Concerning the `data_type`, it is supported but needs to be adjusted for complex types. They must be specified
entirely (for instance `array`) even though they won't be checked. Indeed, as dbt recommends, we only compare
the broader type (array, map, int, varchar). The complete definition is used in order to check that the data types
defined in athena are ok (pre-flight check).
-* the adapter does not support the constraints since no constraints don't exist in Athena.
+- the adapter does not support the constraints since no constraints don't exist in Athena.
## Contributing
diff --git a/dbt/adapters/athena/connections.py b/dbt/adapters/athena/connections.py
index bf127b7d..f12a5c47 100644
--- a/dbt/adapters/athena/connections.py
+++ b/dbt/adapters/athena/connections.py
@@ -76,8 +76,8 @@ def unique_field(self) -> str:
return f"athena-{hashlib.md5(self.s3_staging_dir.encode()).hexdigest()}"
@property
- def get_effective_num_retries(self) -> int:
- return self.num_boto3_retries if self.num_boto3_retries is not None else self.num_retries
+ def effective_num_retries(self) -> int:
+ return self.num_boto3_retries or self.num_retries
def _connection_keys(self) -> Tuple[str, ...]:
return (
@@ -240,7 +240,7 @@ def open(cls, connection: Connection) -> Connection:
attempt=creds.num_retries + 1,
exceptions=("ThrottlingException", "TooManyRequestsException", "InternalServerException"),
),
- config=get_boto3_config(num_retries=creds.get_effective_num_retries),
+ config=get_boto3_config(num_retries=creds.effective_num_retries),
)
connection.state = ConnectionState.OPEN
diff --git a/dbt/adapters/athena/impl.py b/dbt/adapters/athena/impl.py
index ffefb842..063afd85 100755
--- a/dbt/adapters/athena/impl.py
+++ b/dbt/adapters/athena/impl.py
@@ -159,7 +159,7 @@ def add_lf_tags_to_database(self, relation: AthenaRelation) -> None:
lf_client = client.session.client(
"lakeformation",
client.region_name,
- config=get_boto3_config(num_retries=creds.get_effective_num_retries),
+ config=get_boto3_config(num_retries=creds.effective_num_retries),
)
manager = LfTagsManager(lf_client, relation, config)
manager.process_lf_tags_database()
@@ -177,7 +177,7 @@ def add_lf_tags(self, relation: AthenaRelation, lf_tags_config: Dict[str, Any])
lf_client = client.session.client(
"lakeformation",
client.region_name,
- config=get_boto3_config(num_retries=creds.get_effective_num_retries),
+ config=get_boto3_config(num_retries=creds.effective_num_retries),
)
manager = LfTagsManager(lf_client, relation, config)
manager.process_lf_tags()
@@ -195,7 +195,7 @@ def apply_lf_grants(self, relation: AthenaRelation, lf_grants_config: Dict[str,
lf = client.session.client(
"lakeformation",
region_name=client.region_name,
- config=get_boto3_config(num_retries=creds.get_effective_num_retries),
+ config=get_boto3_config(num_retries=creds.effective_num_retries),
)
catalog = self._get_data_catalog(relation.database)
catalog_id = get_catalog_id(catalog)
@@ -213,7 +213,7 @@ def is_work_group_output_location_enforced(self) -> bool:
athena_client = client.session.client(
"athena",
region_name=client.region_name,
- config=get_boto3_config(num_retries=creds.get_effective_num_retries),
+ config=get_boto3_config(num_retries=creds.effective_num_retries),
)
if creds.work_group:
@@ -313,7 +313,7 @@ def get_glue_table(self, relation: AthenaRelation) -> Optional[GetTableResponseT
glue_client = client.session.client(
"glue",
region_name=client.region_name,
- config=get_boto3_config(num_retries=creds.get_effective_num_retries),
+ config=get_boto3_config(num_retries=creds.effective_num_retries),
)
try:
@@ -373,7 +373,7 @@ def clean_up_partitions(self, relation: AthenaRelation, where_condition: str) ->
glue_client = client.session.client(
"glue",
region_name=client.region_name,
- config=get_boto3_config(num_retries=creds.get_effective_num_retries),
+ config=get_boto3_config(num_retries=creds.effective_num_retries),
)
paginator = glue_client.get_paginator("get_partitions")
partition_params = {
@@ -426,7 +426,7 @@ def upload_seed_to_s3(
s3_client = client.session.client(
"s3",
region_name=client.region_name,
- config=get_boto3_config(num_retries=creds.get_effective_num_retries),
+ config=get_boto3_config(num_retries=creds.effective_num_retries),
)
# This ensures cross-platform support, tempfile.NamedTemporaryFile does not
tmpfile = os.path.join(tempfile.gettempdir(), os.urandom(24).hex())
@@ -451,7 +451,7 @@ def delete_from_s3(self, s3_path: str) -> None:
s3_resource = client.session.resource(
"s3",
region_name=client.region_name,
- config=get_boto3_config(num_retries=creds.get_effective_num_retries),
+ config=get_boto3_config(num_retries=creds.effective_num_retries),
)
s3_bucket = s3_resource.Bucket(bucket_name)
LOGGER.debug(f"Deleting table data: path='{s3_path}', bucket='{bucket_name}', prefix='{prefix}'")
@@ -494,7 +494,7 @@ def _s3_path_exists(self, s3_bucket: str, s3_prefix: str) -> bool:
s3_client = client.session.client(
"s3",
region_name=client.region_name,
- config=get_boto3_config(num_retries=creds.get_effective_num_retries),
+ config=get_boto3_config(num_retries=creds.effective_num_retries),
)
response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix)
return True if "Contents" in response else False
@@ -586,7 +586,7 @@ def _get_one_catalog(
glue_client = client.session.client(
"glue",
region_name=client.region_name,
- config=get_boto3_config(num_retries=creds.get_effective_num_retries),
+ config=get_boto3_config(num_retries=creds.effective_num_retries),
)
catalog = []
@@ -612,7 +612,7 @@ def _get_one_catalog(
athena_client = client.session.client(
"athena",
region_name=client.region_name,
- config=get_boto3_config(num_retries=creds.get_effective_num_retries),
+ config=get_boto3_config(num_retries=creds.effective_num_retries),
)
catalog = []
@@ -660,7 +660,7 @@ def _get_data_catalog(self, database: str) -> Optional[DataCatalogTypeDef]:
sts = client.session.client(
"sts",
region_name=client.region_name,
- config=get_boto3_config(num_retries=creds.get_effective_num_retries),
+ config=get_boto3_config(num_retries=creds.effective_num_retries),
)
catalog_id = sts.get_caller_identity()["Account"]
return {"Name": database, "Type": "GLUE", "Parameters": {"catalog-id": catalog_id}}
@@ -668,7 +668,7 @@ def _get_data_catalog(self, database: str) -> Optional[DataCatalogTypeDef]:
athena = client.session.client(
"athena",
region_name=client.region_name,
- config=get_boto3_config(num_retries=creds.get_effective_num_retries),
+ config=get_boto3_config(num_retries=creds.effective_num_retries),
)
return athena.get_data_catalog(Name=database)["DataCatalog"]
return None
@@ -687,7 +687,7 @@ def list_relations_without_caching(self, schema_relation: AthenaRelation) -> Lis
glue_client = client.session.client(
"glue",
region_name=client.region_name,
- config=get_boto3_config(num_retries=creds.get_effective_num_retries),
+ config=get_boto3_config(num_retries=creds.effective_num_retries),
)
paginator = glue_client.get_paginator("get_tables")
@@ -769,7 +769,7 @@ def swap_table(self, src_relation: AthenaRelation, target_relation: AthenaRelati
glue_client = client.session.client(
"glue",
region_name=client.region_name,
- config=get_boto3_config(num_retries=creds.get_effective_num_retries),
+ config=get_boto3_config(num_retries=creds.effective_num_retries),
)
src_table = glue_client.get_table(
@@ -854,7 +854,7 @@ def _get_glue_table_versions_to_expire(self, relation: AthenaRelation, to_keep:
glue_client = client.session.client(
"glue",
region_name=client.region_name,
- config=get_boto3_config(num_retries=creds.get_effective_num_retries),
+ config=get_boto3_config(num_retries=creds.effective_num_retries),
)
paginator = glue_client.get_paginator("get_table_versions")
@@ -884,7 +884,7 @@ def expire_glue_table_versions(
glue_client = client.session.client(
"glue",
region_name=client.region_name,
- config=get_boto3_config(num_retries=creds.get_effective_num_retries),
+ config=get_boto3_config(num_retries=creds.effective_num_retries),
)
versions_to_delete = self._get_glue_table_versions_to_expire(relation, to_keep)
@@ -945,7 +945,7 @@ def persist_docs_to_glue(
glue_client = client.session.client(
"glue",
region_name=client.region_name,
- config=get_boto3_config(num_retries=creds.get_effective_num_retries),
+ config=get_boto3_config(num_retries=creds.effective_num_retries),
)
# By default, there is no need to update Glue Table
@@ -1033,7 +1033,7 @@ def list_schemas(self, database: str) -> List[str]:
glue_client = client.session.client(
"glue",
region_name=client.region_name,
- config=get_boto3_config(num_retries=creds.get_effective_num_retries),
+ config=get_boto3_config(num_retries=creds.effective_num_retries),
)
paginator = glue_client.get_paginator("get_databases")
@@ -1064,7 +1064,7 @@ def get_columns_in_relation(self, relation: AthenaRelation) -> List[AthenaColumn
glue_client = client.session.client(
"glue",
region_name=client.region_name,
- config=get_boto3_config(num_retries=creds.get_effective_num_retries),
+ config=get_boto3_config(num_retries=creds.effective_num_retries),
)
try:
@@ -1106,7 +1106,7 @@ def delete_from_glue_catalog(self, relation: AthenaRelation) -> None:
glue_client = client.session.client(
"glue",
region_name=client.region_name,
- config=get_boto3_config(num_retries=creds.get_effective_num_retries),
+ config=get_boto3_config(num_retries=creds.effective_num_retries),
)
try: