-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcheck_run_results.py
104 lines (77 loc) · 3.83 KB
/
check_run_results.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# mypy: disable-error-code="union-attr"
from typing import TYPE_CHECKING, Literal, Optional
from pydantic import Field
from dbt_bouncer.check_base import BaseCheck
if TYPE_CHECKING:
from dbt_bouncer.artifact_parsers.parsers_run_results import DbtBouncerRunResultBase
class CheckRunResultsMaxExecutionTime(BaseCheck):
"""Each result can take a maximum duration (seconds).
Parameters:
max_execution_time_seconds (float): The maximum execution time (seconds) allowed for a node.
Receives:
run_result (DbtBouncerRunResult): The DbtBouncerRunResult object to check.
Other Parameters:
exclude (Optional[str]): Regex pattern to match the resource path. Resource paths that match the pattern will not be checked.
include (Optional[str]): Regex pattern to match the resource path. Only resource paths that match the pattern will be checked.
severity (Optional[Literal["error", "warn"]]): Severity level of the check. Default: `error`.
Example(s):
```yaml
run_results_checks:
- name: check_run_results_max_execution_time
max_execution_time_seconds: 60
```
```yaml
run_results_checks:
- name: check_run_results_max_execution_time
include: ^models/staging # Not a good idea, here for demonstration purposes only
max_execution_time_seconds: 10
```
"""
max_execution_time_seconds: float
name: Literal["check_run_results_max_execution_time"]
run_result: Optional["DbtBouncerRunResultBase"] = Field(default=None)
def execute(self) -> None:
"""Execute the check."""
assert self.run_result.execution_time <= self.max_execution_time_seconds, (
f"`{self.run_result.unique_id.split('.')[-1]}` has an execution time ({self.run_result.execution_time} greater than permitted ({self.max_execution_time_seconds}s)."
)
class CheckRunResultsMaxGigabytesBilled(BaseCheck):
"""Each result can have a maximum number of gigabytes billed.
!!! note
Note that this check only works for the `dbt-bigquery` adapter.
Parameters:
max_gigabytes_billed (float): The maximum number of gigabytes billed.
run_result (DbtBouncerRunResult): The DbtBouncerRunResult object to check.
Other Parameters:
exclude (Optional[str]): Regex pattern to match the resource path. Resource paths that match the pattern will not be checked.
include (Optional[str]): Regex pattern to match the resource path. Only resource paths that match the pattern will be checked.
severity (Optional[Literal["error", "warn"]]): Severity level of the check. Default: `error`.
Raises: # noqa:DOC502
KeyError: If the `dbt-bigquery` adapter is not used.
Example(s):
```yaml
run_results_checks:
- name: check_run_results_max_gigabytes_billed
max_gigabytes_billed: 100
exclude: ^seeds
```
"""
max_gigabytes_billed: float
name: Literal["check_run_results_max_gigabytes_billed"]
run_result: Optional["DbtBouncerRunResultBase"] = Field(default=None)
def execute(self) -> None:
"""Execute the check.
Raises:
RuntimeError: If running with adapter other than `dbt-bigquery`.
"""
try:
gigabytes_billed = self.run_result.adapter_response["bytes_billed"] / (
1000**3
)
except KeyError as e:
raise RuntimeError(
"`bytes_billed` not found in adapter response. Are you using the `dbt-bigquery` adapter?",
) from e
assert gigabytes_billed < self.max_gigabytes_billed, (
f"`{self.run_result.unique_id.split('.')[-2]}` results in ({gigabytes_billed} billed bytes, this is greater than permitted ({self.max_gigabytes_billed})."
)