-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature: source freshness (#1240) #1272
Conversation
55e642a
to
1775a6b
Compare
dceef4d
to
f839e64
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really nice work! I gave this a spin and hit some weird edge cases are validation, but overall, i'm super into this.
Let me try it on some other dbs over the next day or two. I'm really impressed with how nice the sources.json
file came out. This is going to be great
core/dbt/graph/selector.py
Outdated
yield node | ||
|
||
def get_nodes_from_spec(self, graph, spec): | ||
def get_nodes_from_spec(self, graph, spec, sources=False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this sources
arg used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, good catch. it was briefly, for a version that never survived rebasing.
|
||
{% macro default__collect_freshness(source, loaded_at_field) %} | ||
{% call statement('check_schema_exists', fetch_result=True, auto_begin=False) -%} | ||
with source_snapshot as ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I bet any performance characteristics would be minimal, but I think we can do this with:
select
max({{ loaded_at_field }}) as max_loaded_at,
max({{ current_timestamp() }} as snapshotted_at
from {{ source }}
Some databases (postgres and its derivatives) don't play nice with CTEs, so doing this as a single select is probably the move all else being equal
help=argparse.SUPPRESS, | ||
) | ||
|
||
subs = p.add_subparsers(title="Available sub-commands") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🙏 🙏 🙏 🙏 🙏 🙏
@@ -68,3 +68,7 @@ | |||
{%- endcall %} | |||
{{ return(load_result('check_schema_exists').table) }} | |||
{%- endmacro %} | |||
|
|||
{% macro snowflake__current_timestamp() -%} | |||
CURRENT_TIMESTAMP() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is going to return a timestamp in some abysmally useless timezone, like PST. We can grab a UTC timestamp like this:
convert_timestamp('UTC', current_timestamp())
Does this macro need to return a utc timestamp, or is the local time conversion handled in python?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As long as there is a timezone attached, dbt will do the right thing. If there's no timezone we assume UTC. I'll update this macro though, UTC is useful.
|
||
def execute(self, compiled_node, manifest): | ||
# given a Source, calculate its fresnhess. | ||
freshness = self.adapter.calculate_freshness( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i couldn't track down exactly where this happens, but i think we try to calc freshness for sources that don't have a freshness
key specified. With this schema.yml:
version: 2
sources:
- name: snowplow
loader: snowpipe
tables:
- name: event
sql_table_name: raw.snowplow.event
loaded_at_field: collector_tstamp
i see:
21:27:32 | 1 of 1 START freshness of snowplow.event............................. [RUN]
Unhandled error while executing source.dbt_metrics.snowplow.event
'warn_after'
21:27:34 | 1 of 1 ERROR......................................................... [ERROR in 1.74s]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similarly, it looks like warn_after
and error_after
are both required by the code in here, but i think each is optional per the contract? It's totally reasonable to specify one of these and not the other
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll poke around a bit, we should not be getting that node back from the selector at all.
Requiring both warn_after
and error_after
was my understanding of the initial feature but it's easy enough to change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I fixed this by checking if self.freshness
as truthy instead of a not-none value, which handles the empty dict case, as it should. it also handles the absence of either one of the values.
Now you'll get the WARNING: Nothing to do. Try checking your model configs and model specification args
you'd expect.
Move some of RunManager into tasks Move compile_node work into compilation Move manifest work into the GraphLoader Move the rest into the runners Implement freshness calculations for sources command: 'dbt source snapshot-freshness' support for 4 adapters (no presto) Integration tests break up main.py's argument parsing Pass the manifest along to freshness calculation Results support for freshness New freshness result contracts Fix source result printing Result contract cleanup safe_run supports alternate result types Fix tests to support changes in results PR feedback: - snowflake macro changed to always return utc - no cte in collect_freshness - remove extra optional arg - fix the has_freshness check to examine if there is anything in freshness - support error_after without warn_after and vice-versa - snowflake: convert_timestamp -> convert_timezone Update sources to be Relations - update contracts - add create_from_source - add create_from_source calls - fix tests PR feedback create_from_source forces quotes default source schema/table from source/table names snowflake quoting nonsense also fix output: pass -> PASS make seeding test 017 take 1m instead of 3m by using csv instead of sql - source tweaks for the docs site
e76ead0
to
5e8ab9c
Compare
Ok, this is working super well. Two cosmetic requests, then we'll be good to ship this. Given the current output:
|
print source name on pass/warn/error lines distinguish 'error' vs 'error stale'
6406c62
to
f83a713
Compare
f83a713
to
0bd5999
Compare
Snowflake tests taking forever triggered the 1h timeout again, merging anyway! |
Implements #1240
Implement the
dbt sources snapshot-freshness
command.This required some restructuring of how dbt handles results, in order to push the individual result data up to the task level. There are a lot of places in dbt's task execution where we have specific expectations about types that this breaks (because sources aren't really like nodes). This PR mostly papers over that by merging
RunManager
s intoTask
s (in particular, intoRunnableTask
s) and making them responsible for everything.safe_run
still makes more use of mutable state than I really love, but fixing it seems like it could be a real rabbit hole.Fields in the database that have no timestamp are assumed to be UTC. I think on snowflake you can do wild things like set the type
CURRENT_TIMESTAMP
will return (to be specific, you can specify whatTIMESTAMP
is). I am not clear on whether you can set it to return the local time without a timezone based on these docs.