From e84382f7e69894a54dabbbb2ff9a9c3ec1237881 Mon Sep 17 00:00:00 2001 From: Pavel Makarichev Date: Tue, 3 Oct 2023 16:12:34 +0300 Subject: [PATCH] feat(pg): lineage between schemas (#233) --- odd_collector/adapters/postgresql/adapter.py | 46 ++- .../adapters/postgresql/mappers/tables.py | 15 +- tests/integration/helpers.py | 2 +- tests/integration/test_postgres.py | 321 ++++++++++++------ 4 files changed, 259 insertions(+), 125 deletions(-) diff --git a/odd_collector/adapters/postgresql/adapter.py b/odd_collector/adapters/postgresql/adapter.py index da9dd7d5..bfd03c67 100644 --- a/odd_collector/adapters/postgresql/adapter.py +++ b/odd_collector/adapters/postgresql/adapter.py @@ -5,11 +5,13 @@ from odd_models.models import DataEntityList from oddrn_generator import PostgresqlGenerator +from odd_collector.adapters.postgresql.models import Table from odd_collector.domain.plugin import PostgreSQLPlugin +from .logger import logger from .mappers.database import map_database -from .mappers.tables import map_tables from .mappers.schemas import map_schema +from .mappers.tables import map_tables from .repository import ConnectionParams, PostgreSQLRepository @@ -29,10 +31,13 @@ def get_data_entity_list(self) -> DataEntityList: with PostgreSQLRepository( ConnectionParams.from_config(self.config), self.config.schemas_filter ) as repo: - table_entities: list[DataEntity] = [] schema_entities: list[DataEntity] = [] + + all_table_entities: dict[str, DataEntity] = {} + tables = repo.get_tables() schemas = repo.get_schemas() + self.generator.set_oddrn_paths(**{"databases": self.config.database}) tables_by_schema = defaultdict(list) @@ -40,18 +45,47 @@ def get_data_entity_list(self) -> DataEntityList: tables_by_schema[table.table_schema].append(table) for schema in schemas: - tables_per_schema = tables_by_schema.get(schema.schema_name, []) + tables_per_schema: list[Table] = tables_by_schema.get( + schema.schema_name, [] + ) table_entities_tmp = map_tables(self.generator, tables_per_schema) schema_entities.append( - map_schema(self.generator, schema, table_entities_tmp) + map_schema( + self.generator, schema, list(table_entities_tmp.values()) + ) ) - table_entities.extend(table_entities_tmp) + all_table_entities |= table_entities_tmp database_entity = map_database( self.generator, self.config.database, schema_entities ) + create_lineage(tables, all_table_entities) + return DataEntityList( data_source_oddrn=self.get_data_source_oddrn(), - items=[*table_entities, *schema_entities, database_entity], + items=[*all_table_entities.values(), *schema_entities, database_entity], ) + + +def create_lineage(tables: list[Table], data_entities: dict[str, DataEntity]) -> None: + views = [table for table in tables if table.table_type in ("v", "m")] + + for view in views: + try: + depending_entity = data_entities.get(view.as_dependency.uid) + + if depending_entity.data_transformer is None: + continue + + for dependency in view.dependencies: + if dependency_entity := data_entities.get(dependency.uid): + if ( + dependency_entity.oddrn + not in depending_entity.data_transformer.inputs + ): + depending_entity.data_transformer.inputs.append( + dependency_entity.oddrn + ) + except Exception as e: + logger.warning(f"Error creating lineage for {view.table_name} {e=}") diff --git a/odd_collector/adapters/postgresql/mappers/tables.py b/odd_collector/adapters/postgresql/mappers/tables.py index 7a74d381..49ea1533 100644 --- a/odd_collector/adapters/postgresql/mappers/tables.py +++ b/odd_collector/adapters/postgresql/mappers/tables.py @@ -33,8 +33,8 @@ def map_table(generator: PostgresqlGenerator, table: Table): def map_tables( generator: PostgresqlGenerator, tables: list[Table], -) -> list[DataEntity]: - data_entities: dict[str, tuple[Table, DataEntity]] = {} +) -> dict[str, DataEntity]: + data_entities: dict[str, DataEntity] = {} for table in tables: logger.debug(f"Map table {table.table_name} {table.table_type}") @@ -49,13 +49,6 @@ def map_tables( ) continue - data_entities[table.as_dependency.uid] = table, entity + data_entities[table.as_dependency.uid] = entity - for table, data_entity in data_entities.values(): - for dependency in table.dependencies: - if dependency.uid in data_entities and data_entity.data_transformer: - data_entity.data_transformer.inputs.append( - data_entities[dependency.uid][1].oddrn - ) - - return [entity for _, entity in data_entities.values()] + return data_entities diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index fcedc62c..98947d79 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -15,7 +15,7 @@ def find_by_type( def find_by_name(data_entity_list: DataEntityList, name: str) -> DataEntity: return next( filter( - lambda data_entity: data_entity.name == name, + lambda data_entity: data_entity.name.lower() == name.lower(), data_entity_list.items, ) ) diff --git a/tests/integration/test_postgres.py b/tests/integration/test_postgres.py index de4e91d9..8461d506 100644 --- a/tests/integration/test_postgres.py +++ b/tests/integration/test_postgres.py @@ -1,78 +1,83 @@ +import odd_models import pytest import sqlalchemy -from funcy import filter, first -from odd_models import DataEntity -from odd_models.models import DataEntityType from pydantic import SecretStr from testcontainers.postgres import PostgresContainer -from tests.integration.helpers import find_by_type - -create_enum = """CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy');""" -create_tables = """CREATE TABLE IF NOT EXISTS TABLE_ONE ( - code char(5) CONSTRAINT firstkey PRIMARY KEY, - title varchar(40) NOT NULL, - did integer NOT NULL, - date_prod date, - kind varchar(10), - len interval hour to minute, - status mood -)""" - -create_view = """ -CREATE VIEW VIEW_ONE AS -SELECT * -FROM TABLE_ONE -""" - -create_second_view = """ -CREATE VIEW VIEW_TWO AS -SELECT t.code, v.title -FROM TABLE_ONE t, VIEW_ONE v -""" - -create_schema = """ -CREATE SCHEMA IF NOT EXISTS other_schema; -""" - -create_view_three = """ -CREATE VIEW other_schema.VIEW_THREE AS -SELECT * -FROM TABLE_ONE -""" - -create_view_four = """ -CREATE VIEW VIEW_FOUR AS -SELECT v1.code, v3.title -FROM VIEW_ONE v1, other_schema.VIEW_THREE v3 -""" - -create_materialized_view = """ -CREATE MATERIALIZED VIEW materialized_view AS -SELECT * -FROM TABLE_ONE -""" - from odd_collector.adapters.postgresql.adapter import Adapter from odd_collector.domain.plugin import PostgreSQLPlugin - - -@pytest.mark.integration -def test_postgres(): +from tests.integration.helpers import find_by_name, find_by_type + + +def create_primary_schema(connection: sqlalchemy.engine.Connection): + create_enum = """CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy');""" + create_tables = """CREATE TABLE IF NOT EXISTS TABLE_ONE ( + code char(5) CONSTRAINT firstkey PRIMARY KEY, + title varchar(40) NOT NULL, + did integer NOT NULL, + date_prod date, + kind varchar(10), + len interval hour to minute, + status mood + )""" + + create_view = """ + CREATE VIEW VIEW_ONE AS + SELECT * + FROM TABLE_ONE + """ + + create_second_view = """ + CREATE VIEW VIEW_TWO AS + SELECT t.code, v.title + FROM TABLE_ONE t, VIEW_ONE v + """ + + connection.exec_driver_sql(create_enum) + connection.exec_driver_sql(create_tables) + connection.exec_driver_sql(create_view) + connection.exec_driver_sql(create_second_view) + + +def create_other_schema(connection: sqlalchemy.engine.Connection): + create_schema = """ + CREATE SCHEMA IF NOT EXISTS other_schema; + """ + + create_view_three = """ + CREATE VIEW other_schema.VIEW_THREE AS + SELECT * + FROM TABLE_ONE + """ + + create_view_four = """ + CREATE VIEW other_schema.VIEW_FOUR AS + SELECT v1.code, v3.title + FROM VIEW_ONE v1, other_schema.VIEW_THREE v3 + """ + + create_materialized_view = """ + CREATE MATERIALIZED VIEW other_schema.materialized_view AS + SELECT * + FROM TABLE_ONE + """ + + connection.exec_driver_sql(create_schema) + connection.exec_driver_sql(create_view_three) + connection.exec_driver_sql(create_view_four) + connection.exec_driver_sql(create_materialized_view) + + +@pytest.fixture(scope="module") +def data_entity_list() -> odd_models.DataEntityList: with PostgresContainer( "postgres:14.7", password="postgres", user="postgres" - ) as postgres: - engine = sqlalchemy.create_engine(postgres.get_connection_url()) + ) as container: + engine = sqlalchemy.create_engine(container.get_connection_url()) with engine.connect() as connection: - connection.exec_driver_sql(create_enum) - connection.exec_driver_sql(create_tables) - connection.exec_driver_sql(create_view) - connection.exec_driver_sql(create_second_view) - connection.exec_driver_sql(create_schema) - connection.exec_driver_sql(create_view_three) - connection.exec_driver_sql(create_view_four) - connection.exec_driver_sql(create_materialized_view) + create_primary_schema(connection) + create_other_schema(connection) config = PostgreSQLPlugin( type="postgresql", @@ -80,48 +85,150 @@ def test_postgres(): database="test", password=SecretStr("postgres"), user="postgres", - host=postgres.get_container_host_ip(), - port=postgres.get_exposed_port(5432), + host=container.get_container_host_ip(), + port=int(container.get_exposed_port(5432)), ) - data_entities = Adapter(config).get_data_entity_list() - database_services: list[DataEntity] = find_by_type( - data_entities, DataEntityType.DATABASE_SERVICE - ) - assert len(database_services) == 1 - database_service = database_services[0] - assert len(database_service.data_entity_group.entities_list) == 6 - - tables = find_by_type(data_entities, DataEntityType.TABLE) - assert len(tables) == 1 - table = tables[0] - assert len(table.dataset.field_list) == 7 - - views = find_by_type(data_entities, DataEntityType.VIEW) - assert len(views) == 5 - view_one = first(filter(lambda x: x.name == "view_one", views)) - assert len(view_one.dataset.field_list) == 7 - - view_two = first(filter(lambda x: x.name == "view_two", views)) - assert len(view_two.dataset.field_list) == 2 - depends = view_two.data_transformer.inputs - assert len(depends) == 2 - - assert table.oddrn in depends - assert view_one.oddrn in depends - - view_three = first(filter(lambda x: x.name == "view_three", views)) - depends = view_three.data_transformer.inputs - assert table.oddrn in depends - - view_four = first(filter(lambda x: x.name == "view_four", views)) - depends = view_four.data_transformer.inputs - assert view_one.oddrn in depends - assert view_three.oddrn in depends - - mat_view = first(filter(lambda x: x.name == "materialized_view", views)) - depends = mat_view.data_transformer.inputs - assert len(mat_view.dataset.field_list) == 7 - assert table.oddrn in depends - - assert data_entities.json() + return Adapter(config).get_data_entity_list() + + +def test_decoding_to_json(data_entity_list: odd_models.DataEntityList): + assert data_entity_list.json() + + +def test_data_base_service(data_entity_list: odd_models.DataEntityList): + database_services: list[odd_models.DataEntity] = find_by_type( + data_entity_list, odd_models.DataEntityType.DATABASE_SERVICE + ) + assert len(database_services) == 3 + + database = find_by_name(data_entity_list, "test") + public_schema = find_by_name(data_entity_list, "public") + other_schema = find_by_name(data_entity_list, "other_schema") + + assert database is not None + public_schema is not None + other_schema is not None + + assert database.data_entity_group is not None + assert len(database.data_entity_group.entities_list) == 2 + assert public_schema.oddrn in database.data_entity_group.entities_list + assert other_schema.oddrn in database.data_entity_group.entities_list + + +def test_public_schema(data_entity_list: odd_models.DataEntityList): + public_schema = find_by_name(data_entity_list, "public") + assert public_schema is not None + assert public_schema.data_entity_group is not None + + table_one = find_by_name(data_entity_list, "table_one") + view_one = find_by_name(data_entity_list, "view_one") + view_two = find_by_name(data_entity_list, "view_two") + + assert len(public_schema.data_entity_group.entities_list) == 3 + + for data_entity in [table_one, view_one, view_two]: + assert data_entity is not None + assert data_entity.oddrn in public_schema.data_entity_group.entities_list + + +def test_other_schema(data_entity_list: odd_models.DataEntityList): + other_schema = find_by_name(data_entity_list, "other_schema") + assert other_schema is not None + assert other_schema.data_entity_group is not None + assert len(other_schema.data_entity_group.entities_list) == 3 + + view_three = find_by_name(data_entity_list, "view_three") + view_four = find_by_name(data_entity_list, "view_four") + materialized_view = find_by_name(data_entity_list, "materialized_view") + + for data_entity in [view_three, view_four, materialized_view]: + assert data_entity is not None + assert data_entity.oddrn in other_schema.data_entity_group.entities_list + + +def test_table_one(data_entity_list: odd_models.DataEntityList): + table_one = find_by_name(data_entity_list, "table_one") + assert table_one is not None + assert table_one.dataset is not None + assert table_one.dataset.field_list is not None + assert len(table_one.dataset.field_list) == 7 + + +def test_view_one(data_entity_list: odd_models.DataEntityList): + view_one = find_by_name(data_entity_list, "view_one") + assert view_one is not None + assert view_one.dataset is not None + assert view_one.dataset.field_list is not None + assert len(view_one.dataset.field_list) == 7 + + assert view_one.data_transformer is not None + assert view_one.data_transformer.inputs is not None + + table_one = find_by_name(data_entity_list, "table_one") + assert table_one.oddrn in view_one.data_transformer.inputs + + +def test_view_two(data_entity_list: odd_models.DataEntityList): + view_two = find_by_name(data_entity_list, "view_two") + assert view_two is not None + assert view_two.dataset is not None + assert view_two.dataset.field_list is not None + assert len(view_two.dataset.field_list) == 2 + + assert view_two.data_transformer is not None + assert view_two.data_transformer.inputs is not None + assert len(view_two.data_transformer.inputs) == 2 + + table_one = find_by_name(data_entity_list, "table_one") + assert table_one.oddrn in view_two.data_transformer.inputs + view_one = find_by_name(data_entity_list, "view_one") + assert view_one.oddrn in view_two.data_transformer.inputs + + +def test_view_three(data_entity_list: odd_models.DataEntityList): + view_three = find_by_name(data_entity_list, "view_three") + assert view_three is not None + assert view_three.dataset is not None + assert view_three.dataset.field_list is not None + assert len(view_three.dataset.field_list) == 7 + + assert view_three.data_transformer is not None + assert view_three.data_transformer.inputs is not None + assert len(view_three.data_transformer.inputs) == 1 + + table_one = find_by_name(data_entity_list, "table_one") + assert table_one.oddrn in view_three.data_transformer.inputs + + +def test_view_four(data_entity_list: odd_models.DataEntityList): + view_four = find_by_name(data_entity_list, "view_four") + assert view_four is not None + assert view_four.dataset is not None + assert view_four.dataset.field_list is not None + assert len(view_four.dataset.field_list) == 2 + + assert view_four.data_transformer is not None + assert view_four.data_transformer.inputs is not None + assert len(view_four.data_transformer.inputs) == 2 + + view_one = find_by_name(data_entity_list, "view_one") + assert view_one.oddrn in view_four.data_transformer.inputs + + view_three = find_by_name(data_entity_list, "view_three") + assert view_three.oddrn in view_four.data_transformer.inputs + + +def test_materialized_view(data_entity_list: odd_models.DataEntityList): + materialized_view = find_by_name(data_entity_list, "materialized_view") + assert materialized_view is not None + assert materialized_view.dataset is not None + assert materialized_view.dataset.field_list is not None + assert len(materialized_view.dataset.field_list) == 7 + + assert materialized_view.data_transformer is not None + assert materialized_view.data_transformer.inputs is not None + assert len(materialized_view.data_transformer.inputs) == 1 + + table_one = find_by_name(data_entity_list, "table_one") + assert table_one.oddrn in materialized_view.data_transformer.inputs