Skip to content

Commit

Permalink
Feature:支持多个新 SQL 语法,统一了 string 类型为小写,时间类型时区设定为 UTC,并优化了 dbt seed 读取 …
Browse files Browse the repository at this point in the history
…CSV 的效率。
  • Loading branch information
dingxin-tech committed Nov 21, 2024
1 parent a52c5f1 commit 89db11e
Show file tree
Hide file tree
Showing 35 changed files with 948 additions and 14 deletions.
2 changes: 1 addition & 1 deletion dbt/adapters/maxcompute/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "1.8.0-alpha7"
version = "1.8.0-dev"
4 changes: 2 additions & 2 deletions dbt/adapters/maxcompute/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class MaxComputeColumn(Column):
comment: str = ""

TYPE_LABELS = {
"TEXT": "STRING",
"TEXT": "string",
}

@property
Expand Down Expand Up @@ -62,7 +62,7 @@ def from_odps_column(cls, column: TableSchema.TableColumn):

return cls(
column=column.name,
dtype=column.type.name,
dtype=column.type.name.lower(),
char_size=char_size,
numeric_precision=numeric_precision,
numeric_scale=numeric_scale,
Expand Down
4 changes: 3 additions & 1 deletion dbt/adapters/maxcompute/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dbt.adapters.sql import SQLConnectionManager
from dbt.adapters.events.logging import AdapterLogger

from odps import ODPS
from odps import ODPS, options

from dbt.adapters.maxcompute.context import GLOBAL_SQL_HINTS
from dbt.adapters.maxcompute.wrapper import ConnectionWrapper
Expand Down Expand Up @@ -58,6 +58,8 @@ def open(cls, connection):
endpoint=credentials.endpoint,
)
o.schema = credentials.schema
# always use UTC timezone
options.local_timezone = False

try:
o.get_project().reload()
Expand Down
1 change: 1 addition & 0 deletions dbt/adapters/maxcompute/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
"odps.sql.select.output.format": "csv",
"odps.sql.submit.mode": "script",
"odps.sql.allow.cartesian": "true",
"odps.sql.timezone": "GMT",
}
39 changes: 39 additions & 0 deletions dbt/adapters/maxcompute/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Optional, List, Dict, Any, Set, FrozenSet, Tuple

import agate
import pandas as pd
from agate import Table
from dbt.adapters.base import ConstraintSupport, available
from dbt.adapters.base.relation import InformationSchema
Expand All @@ -14,6 +15,7 @@
CapabilitySupport,
Support,
)
from dbt.adapters.contracts.connection import AdapterResponse
from dbt.adapters.contracts.macros import MacroResolverProtocol
from dbt.adapters.contracts.relation import RelationType
from dbt.adapters.protocol import AdapterConfig
Expand Down Expand Up @@ -390,3 +392,40 @@ def string_add_sql(
return f"concat('{value}',{add_to})"
else:
raise DbtRuntimeError(f'Got an unexpected location value of "{location}"')

def validate_sql(self, sql: str) -> AdapterResponse:
res = self.connections.execute(sql)
return res[0]

@available.parse_none
def load_dataframe(
self,
database: str,
schema: str,
table_name: str,
agate_table: "agate.Table",
column_override: Dict[str, str],
field_delimiter: str,
) -> None:
file_path = agate_table.original_abspath

timestamp_columns = [key for key, value in column_override.items() if value == "timestamp"]

for i, column_type in enumerate(agate_table.column_types):
if isinstance(column_type, agate.data_types.date_time.DateTime):
timestamp_columns.append(agate_table.column_names[i])

print(timestamp_columns)

pd_dataframe = pd.read_csv(
file_path, delimiter=field_delimiter, parse_dates=timestamp_columns
)

self.get_odps_client().write_table(
table_name,
pd_dataframe,
project=database,
schema=schema,
create_table=False,
create_partition=False,
)
8 changes: 6 additions & 2 deletions dbt/adapters/maxcompute/wrapper.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import time
from datetime import datetime
from datetime import datetime, date
from decimal import Decimal

from dbt.adapters.events.logging import AdapterLogger
Expand Down Expand Up @@ -37,10 +37,14 @@ def param_normalization(params):
return None
normalized_params = []
for param in params:
if isinstance(param, Decimal):
if param is None:
normalized_params.append("NULL")
elif isinstance(param, Decimal):
normalized_params.append(f"{param}BD")
elif isinstance(param, datetime):
normalized_params.append(f"TIMESTAMP'{param.strftime('%Y-%m-%d %H:%M:%S')}'")
elif isinstance(param, date):
normalized_params.append(f"DATE'{param.strftime('%Y-%m-%d')}'")
elif isinstance(param, str):
normalized_params.append(f"'{param}'")
else:
Expand Down
4 changes: 1 addition & 3 deletions dbt/include/maxcompute/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@ dbt docs: https://docs.getdbt.com/docs/contributing/building-a-new-adapter

/* {# override dbt/include/global_project/macros/relations/view/create.sql #} */
{% macro maxcompute__create_view_as(relation, sql) -%}
CREATE OR REPLACE VIEW {{ relation.render() }} AS (
{{ sql }}
);
CREATE OR REPLACE VIEW {{ relation.render() }} AS ({{ sql }});
{% endmacro %}

{% macro create_transactional_table_as(temporary, relation, sql) -%}
Expand Down
10 changes: 10 additions & 0 deletions dbt/include/maxcompute/macros/materializations/seeds/seeds.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,13 @@

{{ return(sql) }}
{% endmacro %}


{% macro maxcompute__load_csv_rows(model, agate_table) %}

{%- set column_override = model['config'].get('column_types', {}) -%}
{{ adapter.load_dataframe(model['database'], model['schema'], model['alias'],
agate_table, column_override, model['config']['delimiter']) }}

{% do persist_docs(target_relation, model) %}
{% endmacro %}
6 changes: 6 additions & 0 deletions dbt/include/maxcompute/macros/utils/any_value.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{# https://help.aliyun.com/zh/maxcompute/user-guide/any-value #}
{% macro maxcompute__any_value(expression) -%}

any_value({{ expression }})

{%- endmacro %}
3 changes: 3 additions & 0 deletions dbt/include/maxcompute/macros/utils/array_append.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{% macro maxcompute__array_append(array, new_element) -%}
concat({{ array }}, array({{ new_element }}))
{%- endmacro %}
4 changes: 4 additions & 0 deletions dbt/include/maxcompute/macros/utils/array_concat.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- 7 is the length of 'array()', I don't know how to judge if array is empty other than this method
{% macro maxcompute__array_concat(array_1, array_2) -%}
concat({{ array_1 }}, {{ array_2 }})
{%- endmacro %}
23 changes: 23 additions & 0 deletions dbt/include/maxcompute/macros/utils/array_construct.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{% macro maxcompute__array_construct(inputs, data_type) -%}
{%- if inputs|length > 0 -%}
{%- if data_type == 'string' -%}
array({{ '\"' + inputs|join('\", \"') + '\"' }})
{%- else -%}
array({{ inputs|join(', ')}})
{%- endif -%}
{%- else -%}
{%- if data_type == 'string' -%}
array()
{%- elif data_type == 'integer' or data_type == 'int'-%}
array_remove(array(1), 1)
{%- elif data_type == 'bigint' -%}
array_remove(array(1L), 1L)
{%- elif data_type == 'decimal' -%}
array_remove(array(1BD), 1BD)
{%- elif data_type == 'timestamp' -%}
array_remove(array(TIMESTAMP '2017-11-11 00:00:00'), TIMESTAMP '2017-11-11 00:00:00')
{%- else -%}
{{ exceptions.raise_compiler_error("Unsupport datatype when create empty array ~ '" ~ data_type ~ "'") }}
{%- endif -%}
{%- endif -%}
{%- endmacro %}
7 changes: 7 additions & 0 deletions dbt/include/maxcompute/macros/utils/bool_or.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

-- The boolean function is an aggregate function.
-- When there is true in the bool type group, it returns true, otherwise it returns false.

{% macro maxcompute__bool_or(expression) -%}
max({{ expression }})
{%- endmacro %}
4 changes: 4 additions & 0 deletions dbt/include/maxcompute/macros/utils/cast.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- https://help.aliyun.com/zh/maxcompute/user-guide/cast
{% macro maxcompute__cast(field, type) %}
cast({{field}} as {{type}})
{% endmacro %}
5 changes: 5 additions & 0 deletions dbt/include/maxcompute/macros/utils/cast_bool_to_text.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- https://docs.getdbt.com/reference/dbt-jinja-functions/cross-database-macros#cast_bool_to_text
-- need to consider 'NULL'
{% macro maxcompute__cast_bool_to_text(field) -%}
tolower(cast({{ field }} as string))
{%- endmacro %}
3 changes: 3 additions & 0 deletions dbt/include/maxcompute/macros/utils/concat.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{% macro maxcompute__concat(fields) -%}
concat({{ fields|join(', ') }})
{%- endmacro %}
5 changes: 5 additions & 0 deletions dbt/include/maxcompute/macros/utils/date.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{% macro maxcompute__date(year, month, day) -%}
{%- set dt = modules.datetime.date(year, month, day) -%}
{%- set iso_8601_formatted_date = dt.strftime('%Y-%m-%d') -%}
to_date('{{ iso_8601_formatted_date }}')
{%- endmacro %}
75 changes: 75 additions & 0 deletions dbt/include/maxcompute/macros/utils/date_spine.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
{% macro get_intervals_between(start_date, end_date, datepart) -%}
{{ return(adapter.dispatch('get_intervals_between', 'dbt')(start_date, end_date, datepart)) }}
{%- endmacro %}

{% macro default__get_intervals_between(start_date, end_date, datepart) -%}
{%- call statement('get_intervals_between', fetch_result=True) %}

select {{ dbt.datediff(start_date, end_date, datepart) }}

{%- endcall -%}

{%- set value_list = load_result('get_intervals_between') -%}

{%- if value_list and value_list['data'] -%}
{%- set values = value_list['data'] | map(attribute=0) | list %}
{{ return(values[0]) }}
{%- else -%}
{{ return(1) }}
{%- endif -%}

{%- endmacro %}




{% macro date_spine(datepart, start_date, end_date) %}
{{ return(adapter.dispatch('date_spine', 'dbt')(datepart, start_date, end_date)) }}
{%- endmacro %}

{% macro default__date_spine(datepart, start_date, end_date) %}


{# call as follows:

date_spine(
"day",
"to_date('01/01/2016', 'mm/dd/yyyy')",
"dbt.dateadd(week, 1, current_date)"
) #}


with rawdata as (

{{dbt.generate_series(
dbt.get_intervals_between(start_date, end_date, datepart)
)}}

),

all_periods as (

select (
{{
dbt.dateadd(
datepart,
"row_number() over (order by 1) - 1",
start_date
)
}}
) as date_{{datepart}}
from rawdata

),

filtered as (

select *
from all_periods
where date_{{datepart}} <= {{ end_date }}

)

select * from filtered

{% endmacro %}
15 changes: 15 additions & 0 deletions dbt/include/maxcompute/macros/utils/date_trunc.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- https://help.aliyun.com/zh/maxcompute/user-guide/datetrunc
{% macro maxcompute__date_trunc(datepart, date) -%}
{%- if datepart in ['day', 'month', 'year', 'hour'] %}
datetrunc({{date}}, '{{datepart}}')
{%- elif datepart in ['minute', 'second'] -%}
{%- set diviser -%}
{%- if datepart == 'minute' -%} 60
{%- else -%} 1
{%- endif -%}
{%- endset -%}
from_unixtime(unix_timestamp({{date}}) - (unix_timestamp({{date}}) % {{diviser}}))
{%- else -%}
{{ exceptions.raise_compiler_error("macro datetrunc not support for datepart ~ '" ~ datepart ~ "'") }}
{%- endif -%}
{%- endmacro %}
15 changes: 11 additions & 4 deletions dbt/include/maxcompute/macros/utils/dateadd.sql
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
{% macro maxcompute__dateadd(datepart, interval, from_date_or_timestamp) %}
{%- if datepart in ['day', 'month', 'year'] %}
{%- if datepart in ['day', 'month', 'year', 'hour'] %}
dateadd({{ from_date_or_timestamp }}, {{ interval }}, '{{ datepart }}')
{%- elif datepart == 'hour' -%}
from_unixtime(unix_timestamp({{from_date_or_timestamp}}) + {{interval}}*3600)
{%- elif datepart == 'quarter' -%}
dateadd({{ from_date_or_timestamp }}, {{ interval }}*3, 'month')
{%- elif datepart in ['minute', 'second'] -%}
{%- set multiplier -%}
{%- if datepart == 'minute' -%} 60
{%- else -%} 1
{%- endif -%}
{%- endset -%}
from_unixtime(unix_timestamp({{from_date_or_timestamp}}) + {{interval}}*{{multiplier}})
{%- else -%}
{{ exceptions.raise_compiler_error("macro dateadd not implemented for datepart ~ '" ~ datepart ~ "' ~ on ODPS") }}
{{ exceptions.raise_compiler_error("macro dateadd not support for datepart ~ '" ~ datepart ~ "'") }}
{%- endif -%}
{% endmacro %}
Loading

0 comments on commit 89db11e

Please sign in to comment.