Skip to content
This repository has been archived by the owner on Nov 6, 2023. It is now read-only.

Commit

Permalink
feat(pg): lineage between schemas (#233)
Browse files Browse the repository at this point in the history
  • Loading branch information
Vixtir authored Oct 3, 2023
1 parent e8ef0b6 commit e84382f
Show file tree
Hide file tree
Showing 4 changed files with 259 additions and 125 deletions.
46 changes: 40 additions & 6 deletions odd_collector/adapters/postgresql/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
from odd_models.models import DataEntityList
from oddrn_generator import PostgresqlGenerator

from odd_collector.adapters.postgresql.models import Table
from odd_collector.domain.plugin import PostgreSQLPlugin

from .logger import logger
from .mappers.database import map_database
from .mappers.tables import map_tables
from .mappers.schemas import map_schema
from .mappers.tables import map_tables
from .repository import ConnectionParams, PostgreSQLRepository


Expand All @@ -29,29 +31,61 @@ def get_data_entity_list(self) -> DataEntityList:
with PostgreSQLRepository(
ConnectionParams.from_config(self.config), self.config.schemas_filter
) as repo:
table_entities: list[DataEntity] = []
schema_entities: list[DataEntity] = []

all_table_entities: dict[str, DataEntity] = {}

tables = repo.get_tables()
schemas = repo.get_schemas()

self.generator.set_oddrn_paths(**{"databases": self.config.database})

tables_by_schema = defaultdict(list)
for table in tables:
tables_by_schema[table.table_schema].append(table)

for schema in schemas:
tables_per_schema = tables_by_schema.get(schema.schema_name, [])
tables_per_schema: list[Table] = tables_by_schema.get(
schema.schema_name, []
)
table_entities_tmp = map_tables(self.generator, tables_per_schema)
schema_entities.append(
map_schema(self.generator, schema, table_entities_tmp)
map_schema(
self.generator, schema, list(table_entities_tmp.values())
)
)
table_entities.extend(table_entities_tmp)
all_table_entities |= table_entities_tmp

database_entity = map_database(
self.generator, self.config.database, schema_entities
)

create_lineage(tables, all_table_entities)

return DataEntityList(
data_source_oddrn=self.get_data_source_oddrn(),
items=[*table_entities, *schema_entities, database_entity],
items=[*all_table_entities.values(), *schema_entities, database_entity],
)


def create_lineage(tables: list[Table], data_entities: dict[str, DataEntity]) -> None:
views = [table for table in tables if table.table_type in ("v", "m")]

for view in views:
try:
depending_entity = data_entities.get(view.as_dependency.uid)

if depending_entity.data_transformer is None:
continue

for dependency in view.dependencies:
if dependency_entity := data_entities.get(dependency.uid):
if (
dependency_entity.oddrn
not in depending_entity.data_transformer.inputs
):
depending_entity.data_transformer.inputs.append(
dependency_entity.oddrn
)
except Exception as e:
logger.warning(f"Error creating lineage for {view.table_name} {e=}")
15 changes: 4 additions & 11 deletions odd_collector/adapters/postgresql/mappers/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ def map_table(generator: PostgresqlGenerator, table: Table):
def map_tables(
generator: PostgresqlGenerator,
tables: list[Table],
) -> list[DataEntity]:
data_entities: dict[str, tuple[Table, DataEntity]] = {}
) -> dict[str, DataEntity]:
data_entities: dict[str, DataEntity] = {}

for table in tables:
logger.debug(f"Map table {table.table_name} {table.table_type}")
Expand All @@ -49,13 +49,6 @@ def map_tables(
)
continue

data_entities[table.as_dependency.uid] = table, entity
data_entities[table.as_dependency.uid] = entity

for table, data_entity in data_entities.values():
for dependency in table.dependencies:
if dependency.uid in data_entities and data_entity.data_transformer:
data_entity.data_transformer.inputs.append(
data_entities[dependency.uid][1].oddrn
)

return [entity for _, entity in data_entities.values()]
return data_entities
2 changes: 1 addition & 1 deletion tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def find_by_type(
def find_by_name(data_entity_list: DataEntityList, name: str) -> DataEntity:
return next(
filter(
lambda data_entity: data_entity.name == name,
lambda data_entity: data_entity.name.lower() == name.lower(),
data_entity_list.items,
)
)
Loading

0 comments on commit e84382f

Please sign in to comment.