Skip to content

Commit

Permalink
Fix metrics weekly handling (#2277)
Browse files Browse the repository at this point in the history
* Fix metrics weekly

* remove print statements

* remove commented code
  • Loading branch information
ravenac95 authored Oct 1, 2024
1 parent 53bb576 commit a4b84ed
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 240 deletions.
6 changes: 3 additions & 3 deletions warehouse/metrics_tools/lib/factories/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
from enum import Enum

import sqlglot
from metrics_tools.dialect.translate import (CustomFuncHandler,
CustomFuncRegistry)
from metrics_tools.evaluator import FunctionsTransformer
from sqlglot import exp
from sqlglot.optimizer.qualify import qualify
from sqlmesh.core.macros import MacroEvaluator
from sqlmesh.utils.date import TimeLike

from metrics_tools.dialect.translate import CustomFuncHandler, CustomFuncRegistry
from metrics_tools.evaluator import FunctionsTransformer

CURR_DIR = os.path.dirname(__file__)
QUERIES_DIR = os.path.abspath(
os.path.join(CURR_DIR, "../../../metrics_mesh/oso_metrics")
Expand Down
17 changes: 13 additions & 4 deletions warehouse/metrics_tools/lib/factories/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,16 @@ def generate_models_from_query(
metrics_start,
]

kind_common = {"batch_size": 1, "batch_concurrency": 1}

# Due to how the schedulers work for sqlmesh we actually can't batch if
# we're using a weekly cron for a time aggregation. In order to have
# this work we just adjust the start/end time for the
# metrics_start/metrics_end and also give a large enough batch time to
# fit a few weeks. This ensures there's on missing data
if cron == "@weekly":
kind_common = {"batch_size": 21, "lookback": 7}

if ref["entity_type"] == "artifact":
GeneratedModel.create(
func=generated_entity,
Expand All @@ -124,8 +134,7 @@ def generate_models_from_query(
kind={
"name": ModelKindName.INCREMENTAL_BY_TIME_RANGE,
"time_column": "metrics_sample_date",
"batch_size": 1,
"batch_concurrency": 1,
**kind_common,
},
dialect="clickhouse",
columns=columns,
Expand All @@ -150,7 +159,7 @@ def generate_models_from_query(
kind={
"name": ModelKindName.INCREMENTAL_BY_TIME_RANGE,
"time_column": "metrics_sample_date",
"batch_size": 1,
**kind_common,
},
dialect="clickhouse",
columns=columns,
Expand All @@ -174,7 +183,7 @@ def generate_models_from_query(
kind={
"name": ModelKindName.INCREMENTAL_BY_TIME_RANGE,
"time_column": "metrics_sample_date",
"batch_size": 1,
**kind_common,
},
dialect="clickhouse",
columns=columns,
Expand Down
26 changes: 24 additions & 2 deletions warehouse/metrics_tools/lib/factories/macros.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,16 @@ def metrics_start(evaluator: MacroEvaluator, _data_type: t.Optional[str] = None)
by taking the end_ds provided by sqlmesh and calculating a
trailing interval back {window} intervals of unit {unit}.
"""
if evaluator.locals.get("time_aggregation"):
time_aggregation_interval = evaluator.locals.get("time_aggregation")
if time_aggregation_interval:
start_date = t.cast(
exp.Expression,
evaluator.transform(
parse_one("STR_TO_DATE(@start_ds, '%Y-%m-%d')", dialect="clickhouse")
),
)
return evaluator.transform(
parse_one("STR_TO_DATE(@start_ds, '%Y-%m-%d')", dialect="clickhouse")
time_aggregation_bucket(evaluator, start_date, time_aggregation_interval)
)
else:
return evaluator.transform(
Expand All @@ -109,6 +116,21 @@ def metrics_start(evaluator: MacroEvaluator, _data_type: t.Optional[str] = None)

def metrics_end(evaluator: MacroEvaluator, _data_type: t.Optional[str] = None):
"""This has different semantic meanings depending on the mode of the metric query"""

time_aggregation_interval = evaluator.locals.get("time_aggregation")
if time_aggregation_interval:
end_date = t.cast(
exp.Expression,
evaluator.transform(
parse_one(
f"STR_TO_DATE(@end_ds, '%Y-%m-%d') + INTERVAL 1 {time_aggregation_interval}",
dialect="clickhouse",
)
),
)
return evaluator.transform(
time_aggregation_bucket(evaluator, end_date, time_aggregation_interval)
)
return evaluator.transform(
parse_one("STR_TO_DATE(@end_ds, '%Y-%m-%d')", dialect="clickhouse")
)
Expand Down
231 changes: 0 additions & 231 deletions warehouse/metrics_tools/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,237 +22,6 @@
logger = logging.getLogger(__name__)


# class model(registry_decorator):
# """Specifies a function is a python based model."""

# registry_name = "python_models"
# _dialect: DialectType = None

# def __init__(self, name: t.Optional[str] = None, is_sql: bool = False, **kwargs: t.Any) -> None:
# if not is_sql and "columns" not in kwargs:
# raise ConfigError("Python model must define column schema.")

# self.name = name or ""
# self.is_sql = is_sql
# self.kwargs = kwargs

# # Make sure that argument values are expressions in order to pass validation in ModelMeta.
# calls = self.kwargs.pop("audits", [])
# self.kwargs["audits"] = [
# (
# (call, {})
# if isinstance(call, str)
# else (
# call[0],
# {
# arg_key: exp.convert(
# tuple(arg_value) if isinstance(arg_value, list) else arg_value
# )
# for arg_key, arg_value in call[1].items()
# },
# )
# )
# for call in calls
# ]

# if "default_catalog" in kwargs:
# raise ConfigError("`default_catalog` cannot be set on a per-model basis.")

# self.columns = {
# column_name: (
# column_type
# if isinstance(column_type, exp.DataType)
# else exp.DataType.build(str(column_type))
# )
# for column_name, column_type in self.kwargs.pop("columns", {}).items()
# }

# def model(
# self,
# *,
# module_path: Path,
# path: Path,
# defaults: t.Optional[t.Dict[str, t.Any]] = None,
# macros: t.Optional[MacroRegistry] = None,
# jinja_macros: t.Optional[JinjaMacroRegistry] = None,
# dialect: t.Optional[str] = None,
# time_column_format: str = c.DEFAULT_TIME_COLUMN_FORMAT,
# physical_schema_override: t.Optional[t.Dict[str, str]] = None,
# project: str = "",
# default_catalog: t.Optional[str] = None,
# variables: t.Optional[t.Dict[str, t.Any]] = None,
# infer_names: t.Optional[bool] = False,
# ) -> Model:
# """Get the model registered by this function."""
# env: t.Dict[str, t.Any] = {}
# entrypoint = self.func.__name__

# if not self.name and infer_names:
# self.name = get_model_name(Path(inspect.getfile(self.func)))

# if not self.name:
# raise ConfigError("Python model must have a name.")

# kind = self.kwargs.get("kind", None)
# if kind is not None:
# if isinstance(kind, _ModelKind):
# logger.warning(
# f"""Python model "{self.name}"'s `kind` argument was passed a SQLMesh `{type(kind).__name__}` object. This may result in unexpected behavior - provide a dictionary instead."""
# )
# elif isinstance(kind, dict):
# if "name" not in kind or not isinstance(kind.get("name"), ModelKindName):
# raise ConfigError(
# f"""Python model "{self.name}"'s `kind` dictionary must contain a `name` key with a valid ModelKindName enum value."""
# )

# build_env(self.func, env=env, name=entrypoint, path=module_path)

# common_kwargs = dict(
# defaults=defaults,
# path=path,
# time_column_format=time_column_format,
# python_env=serialize_env(env, path=module_path),
# physical_schema_override=physical_schema_override,
# project=project,
# default_catalog=default_catalog,
# variables=variables,
# **self.kwargs,
# )

# dialect = common_kwargs.pop("dialect", dialect)
# for key in ("pre_statements", "post_statements"):
# statements = common_kwargs.get(key)
# if statements:
# common_kwargs[key] = [
# parse_one(s, dialect=dialect) if isinstance(s, str) else s for s in statements
# ]

# if self.is_sql:
# query = MacroFunc(this=exp.Anonymous(this=entrypoint))
# if self.columns:
# common_kwargs["columns"] = self.columns
# return create_sql_model(
# self.name, query, module_path=module_path, dialect=dialect, **common_kwargs
# )

# return create_python_model(
# self.name,
# entrypoint,
# module_path=module_path,
# macros=macros,
# jinja_macros=jinja_macros,
# columns=self.columns,
# dialect=dialect,
# **common_kwargs,
# )


# class metrics_model(model):
# """This is a hack upon hacks. We abuse the sqlmesh serialization system
# to create a new type of model. That we can forcibly inject into sqlmesh
# state"""

# def __init__(self, name: t.Optional[str], inject_env: t.Dict[str, t.Any]):
# super().__init__(name=name, is_sql=True)
# self._inject_env = inject_env

# def model(
# self,
# *,
# module_path: Path,
# path: Path,
# defaults: t.Optional[t.Dict[str, t.Any]] = None,
# macros: t.Optional[MacroRegistry] = None,
# jinja_macros: t.Optional[JinjaMacroRegistry] = None,
# dialect: t.Optional[str] = None,
# time_column_format: str = c.DEFAULT_TIME_COLUMN_FORMAT,
# physical_schema_override: t.Optional[t.Dict[str, str]] = None,
# project: str = "",
# default_catalog: t.Optional[str] = None,
# variables: t.Optional[t.Dict[str, t.Any]] = None,
# infer_names: t.Optional[bool] = False,
# ) -> Model:
# """Get the model registered by this function."""
# env: t.Dict[str, t.Any] = {}
# entrypoint = self.func.__name__

# if not self.name and infer_names:
# self.name = get_model_name(Path(inspect.getfile(self.func)))

# if not self.name:
# raise ConfigError("Python model must have a name.")

# kind = self.kwargs.get("kind", None)
# if kind is not None:
# if isinstance(kind, _ModelKind):
# logger.warning(
# f"""Python model "{self.name}"'s `kind` argument was passed a SQLMesh `{type(kind).__name__}` object. This may result in unexpected behavior - provide a dictionary instead."""
# )
# elif isinstance(kind, dict):
# if "name" not in kind or not isinstance(
# kind.get("name"), ModelKindName
# ):
# raise ConfigError(
# f"""Python model "{self.name}"'s `kind` dictionary must contain a `name` key with a valid ModelKindName enum value."""
# )

# self.build_env(env=env, name=entrypoint, path=module_path)

# common_kwargs: t.Dict[str, t.Any] = dict(
# defaults=defaults,
# path=path,
# time_column_format=time_column_format,
# python_env=serialize_env(env, path=module_path),
# physical_schema_override=physical_schema_override,
# project=project,
# default_catalog=default_catalog,
# variables=variables,
# **self.kwargs,
# )

# dialect = common_kwargs.pop("dialect", dialect)
# for key in ("pre_statements", "post_statements"):
# statements = common_kwargs.get(key)
# if statements:
# common_kwargs[key] = [
# parse_one(s, dialect=dialect) if isinstance(s, str) else s
# for s in statements
# ]

# if self.is_sql:
# query = MacroFunc(this=exp.Anonymous(this=entrypoint))
# if self.columns:
# common_kwargs["columns"] = self.columns
# return create_sql_model(
# self.name,
# query,
# module_path=module_path,
# dialect=dialect,
# **common_kwargs,
# )

# return create_python_model(
# self.name,
# entrypoint,
# module_path=module_path,
# macros=macros,
# jinja_macros=jinja_macros,
# columns=self.columns,
# dialect=dialect,
# **common_kwargs,
# )

# python_env = _python_env(
# [*pre_statements, query, *post_statements, *audit_expressions],
# jinja_macro_references,
# module_path,
# macros or macro.get_registry(),
# variables=variables,
# used_variables=used_variables,
# path=path,
# )


class GeneratedModel:
@classmethod
def create(
Expand Down

0 comments on commit a4b84ed

Please sign in to comment.