Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(build system): consolidate YAML/SQL serialization #525

Open
wants to merge 37 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
b23c0f5
feat: add ibis-yaml compiler
hussainsultan Feb 8, 2025
3e05b85
feat: add letsql RemoteTable op
hussainsultan Feb 8, 2025
9885629
wip: SourceStorage doesnt have profile_name in con
hussainsultan Feb 8, 2025
a4fb8c8
feat: add SourceStorage cache serde
hussainsultan Feb 8, 2025
dc630ec
feat: add BuildManager
hussainsultan Feb 15, 2025
7743050
refactor: IbisYamlCompiler with state
hussainsultan Feb 15, 2025
ae7a275
feat: add sql plan generation for yaml serialization
hussainsultan Feb 16, 2025
27b3be6
feat: add seperate schema definitions in yaml
hussainsultan Feb 16, 2025
ee4c12e
feat: add RowNumber support
hussainsultan Feb 16, 2025
f29a93e
feat: add schema for expr output in yaml
hussainsultan Feb 16, 2025
dabd66e
refactor: refactor user facing class -> BuildManager
hussainsultan Feb 17, 2025
6c12bf4
chore: change imports for rebase
hussainsultan Feb 17, 2025
dc34584
feat: add Profiles
dlovell Feb 16, 2025
686548c
feat: use Profile in YamlExpressionCompiler
hussainsultan Feb 17, 2025
7772102
feat: add Read support for yaml roundtrip
hussainsultan Feb 18, 2025
050d818
chore: unmark xfail tests for deferred reads
hussainsultan Feb 19, 2025
d976247
wip
hussainsultan Feb 19, 2025
b136897
feat: add Read op to sql serialization
hussainsultan Feb 19, 2025
f3614c8
refactor: raise error when UDFs are not of proper input type
hussainsultan Feb 19, 2025
05d8488
refactor: remove memtable parquet serialization
hussainsultan Feb 19, 2025
b2ac060
feat: split sql in its own files
hussainsultan Feb 20, 2025
b4cfe5b
feat: add walk_nodes, find_all_sources to graph_utils
dlovell Feb 20, 2025
a2f3dcb
wip: handle multiple Reads in RemoteTable
hussainsultan Feb 20, 2025
36e5d28
chore: rebase main with xorq name change
hussainsultan Feb 20, 2025
2d5e6c8
wip: split out sql files and use walk_nodes to find opaque ops
hussainsultan Feb 20, 2025
c5f97c6
feat: split deferred reads into its own yaml
hussainsultan Feb 21, 2025
4a5685b
fix: Read op should have frozen read_kwargs
hussainsultan Feb 21, 2025
bdd1582
refactor: refactor YamlExpressionCompiler so it does not have any state
hussainsultan Feb 21, 2025
359d239
feat: add BuildConfig
hussainsultan Feb 21, 2025
a66fb59
fix: handle Namespace in tables
hussainsultan Feb 22, 2025
bdb4067
fix: remove checks if values present
hussainsultan Feb 22, 2025
981eadd
fix: kwargs_tuple in profile should be a key,value pair in yaml
hussainsultan Feb 22, 2025
44ec5ed
refactor: pull out udf.py and make common module
hussainsultan Feb 22, 2025
e8cab22
feat: add metadata.json
hussainsultan Feb 22, 2025
f218d8b
refactor: TranslationContext in translate
hussainsultan Feb 22, 2025
5ba6857
fix(tests): expr_hash is not stable due to pins
hussainsultan Feb 23, 2025
6972299
fix: make compiler test dynamic
hussainsultan Feb 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions examples/yaml_roundrip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import xorq as xo
from xorq.common.utils.defer_utils import deferred_read_parquet
from xorq.expr.relations import into_backend
from xorq.ibis_yaml.compiler import BuildManager


pg = xo.postgres.connect_examples()
db = xo.duckdb.connect()

batting = pg.table("batting")

backend = xo.duckdb.connect()
awards_players = deferred_read_parquet(
backend,
xo.config.options.pins.get_path("awards_players"),
table_name="award_players",
)
left = batting.filter(batting.yearID == 2015)
right = awards_players.filter(awards_players.lgID == "NL").drop("yearID", "lgID")
expr = left.join(
into_backend(right, pg, "pg-filtered-table"), ["playerID"], how="semi"
)[["yearID", "stint"]]

build_manager = BuildManager("builds")
expr_hash = build_manager.compile_expr(expr)

roundtrip_expr = build_manager.load_expr(expr_hash)
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ dependencies = [
"sqlglot==25.20.2",
"toolz>=0.11",
"typing-extensions>=4.3.0",
"hypothesis>=6.124.9",
"pyyaml>=6.0.2",
"cloudpickle>=3.1.1",
]
requires-python = ">=3.10"
authors = [
Expand Down
52 changes: 52 additions & 0 deletions python/letsql/common/utils/graph_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import xorq.expr.relations as rel


def walk_nodes(node_types, expr):
def process_node(op):
match op:
case rel.RemoteTable():
yield op
yield from walk_nodes(
node_types,
op.remote_expr,
)
case rel.CachedNode():
yield op
yield from walk_nodes(
node_types,
op.parent,
)
case _:
yield from op.find(node_types)

def inner(rest, seen):
if not rest:
return seen
op = rest.pop()
seen.add(op)
new = process_node(op)
rest.update(set(new).difference(seen))
return inner(rest, seen)

rest = process_node(expr.op())
return inner(set(rest), set())


def find_all_sources(expr):
import xorq.vendor.ibis.expr.operations as ops

node_types = (
ops.DatabaseTable,
ops.SQLQueryResult,
rel.CachedNode,
rel.Read,
rel.RemoteTable,
# ExprScalarUDF has an expr we need to get to
# FlightOperator has a dynamically generated connection: it should be passed a Profile instead
)
nodes = walk_nodes(node_types, expr)
sources = tuple(
source
for (source, _) in set((node.source, node.source._profile) for node in nodes)
)
return sources
3 changes: 2 additions & 1 deletion python/xorq/common/utils/graph_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ def inner(rest, seen):
rest.update(set(new).difference(seen))
return inner(rest, seen)

rest = process_node(expr.op())
initial_op = expr.op() if hasattr(expr, "op") else expr
rest = process_node(initial_op)
return inner(set(rest), set())


Expand Down
Empty file.
77 changes: 77 additions & 0 deletions python/xorq/ibis_yaml/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import functools
from typing import Any

import attr

import xorq.vendor.ibis.expr.datatypes as dt
import xorq.vendor.ibis.expr.types as ir
from xorq.ibis_yaml.utils import freeze
from xorq.vendor.ibis.common.collections import FrozenOrderedDict


FROM_YAML_HANDLERS: dict[str, Any] = {}


class SchemaRegistry:
def __init__(self):
self.schemas = {}
self.counter = 0

def register_schema(self, schema):
frozen_schema = freeze(
{name: _translate_type(dtype) for name, dtype in schema.items()}
)

for schema_id, existing_schema in self.schemas.items():
if existing_schema == frozen_schema:
return schema_id

schema_id = f"schema_{self.counter}"
self.schemas[schema_id] = frozen_schema
self.counter += 1
return schema_id

def _register_expr_schema(self, expr: ir.Expr) -> str:
if hasattr(expr, "schema"):
schema = expr.schema()
return self.register_schema(schema)
return None


@attr.s(frozen=True)
class TranslationContext:
schema_registry: SchemaRegistry = attr.ib(factory=SchemaRegistry)
profiles: FrozenOrderedDict = attr.ib(factory=FrozenOrderedDict)
definitions: FrozenOrderedDict = attr.ib(factory=lambda: freeze({"schemas": {}}))

def update_definitions(self, new_definitions: FrozenOrderedDict):
return attr.evolve(self, definitions=new_definitions)


def register_from_yaml_handler(*op_names: str):
def decorator(func):
for name in op_names:
FROM_YAML_HANDLERS[name] = func
return func

return decorator


@functools.cache
@functools.singledispatch
def translate_from_yaml(yaml_dict: dict, context: TranslationContext) -> Any:
op_type = yaml_dict["op"]
if op_type not in FROM_YAML_HANDLERS:
raise NotImplementedError(f"No handler for operation {op_type}")
return FROM_YAML_HANDLERS[op_type](yaml_dict, context)


@functools.cache
@functools.singledispatch
def translate_to_yaml(op: Any, context: TranslationContext) -> dict:
raise NotImplementedError(f"No translation rule for {type(op)}")


@functools.singledispatch
def _translate_type(dtype: dt.DataType) -> dict:
return freeze({"name": type(dtype).__name__, "nullable": dtype.nullable})
Loading
Loading