diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py index 896b92b134f6..c88150e0e722 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py @@ -52,6 +52,7 @@ from metadata.ingestion.api.models import Either, Entity from metadata.ingestion.api.steps import Source from metadata.ingestion.api.topology_runner import C, TopologyRunnerMixin +from metadata.ingestion.lineage.sql_lineage import get_column_fqn from metadata.ingestion.models.delete_entity import DeleteEntity from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.models.patch_request import PatchRequest @@ -480,6 +481,20 @@ def _get_add_lineage_request( return None + @staticmethod + def _get_data_model_column_fqn( + data_model_entity: DashboardDataModel, column: str + ) -> Optional[str]: + """ + Get fqn of column if exist in table entity + """ + if not data_model_entity: + return None + for tbl_column in data_model_entity.columns: + if tbl_column.displayName.lower() == column.lower(): + return tbl_column.fullyQualifiedName.__root__ + return None + def get_dashboard(self) -> Any: """ Method to iterate through dashboard lists filter dashboards & yield dashboard details @@ -615,3 +630,29 @@ def create_patch_request( ) patch_request.new_entity.dataModels = datamodel_entity_ref_list return patch_request + + def _get_column_lineage( + self, + om_table: Table, + data_model_entity: DashboardDataModel, + columns_list: List[str], + ) -> List[ColumnLineage]: + """ + Get the column lineage from the fields + """ + try: + column_lineage = [] + for field in columns_list or []: + from_column = get_column_fqn(table_entity=om_table, column=field) + to_column = self._get_data_model_column_fqn( + data_model_entity=data_model_entity, + column=field, + ) + if from_column and to_column: + column_lineage.append( + ColumnLineage(fromColumns=[from_column], toColumn=to_column) + ) + return column_lineage + except Exception as exc: + logger.debug(f"Error to get column lineage: {exc}") + logger.debug(traceback.format_exc()) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/qlikcloud/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/qlikcloud/metadata.py index 115346abb878..c894def33ab3 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/qlikcloud/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/qlikcloud/metadata.py @@ -35,6 +35,7 @@ from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.dashboard.qlikcloud.client import QlikCloudClient from metadata.ingestion.source.dashboard.qlikcloud.models import QlikApp, QlikAppList from metadata.ingestion.source.dashboard.qliksense.metadata import QliksenseSource from metadata.ingestion.source.dashboard.qliksense.models import QlikTable @@ -52,6 +53,7 @@ class QlikcloudSource(QliksenseSource): """ config: WorkflowSource + client: QlikCloudClient metadata_config: OpenMetadataConnection @classmethod @@ -182,16 +184,22 @@ def yield_dashboard_lineage_details( db_service_entity = self.metadata.get_by_name( entity=DatabaseService, fqn=db_service_name ) - for datamodel_id in self.context.get().dataModels or []: + for datamodel in self.data_models or []: try: - data_model_entity = self._get_datamodel(datamodel_id=datamodel_id) + data_model_entity = self._get_datamodel(datamodel_id=datamodel.id) if data_model_entity: om_table = self._get_database_table( db_service_entity, data_model_entity ) if om_table: + columns_list = [col.name for col in datamodel.fields] + column_lineage = self._get_column_lineage( + om_table, data_model_entity, columns_list + ) yield self._get_add_lineage_request( - to_entity=data_model_entity, from_entity=om_table + to_entity=data_model_entity, + from_entity=om_table, + column_lineage=column_lineage, ) except Exception as err: yield Either( diff --git a/ingestion/src/metadata/ingestion/source/dashboard/qliksense/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/qliksense/metadata.py index cb62402feba7..652390353e6f 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/qliksense/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/qliksense/metadata.py @@ -314,8 +314,14 @@ def yield_dashboard_lineage_details( db_service_entity, datamodel=datamodel ) if om_table: + columns_list = [col.name for col in datamodel.fields] + column_lineage = self._get_column_lineage( + om_table, data_model_entity, columns_list + ) yield self._get_add_lineage_request( - to_entity=data_model_entity, from_entity=om_table + to_entity=data_model_entity, + from_entity=om_table, + column_lineage=column_lineage, ) except Exception as err: yield Either( diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset/mixin.py b/ingestion/src/metadata/ingestion/source/dashboard/superset/mixin.py index 857f9a9742e1..48a053a64a76 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/superset/mixin.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset/mixin.py @@ -182,9 +182,21 @@ def yield_dashboard_lineage_details( fqn=datamodel_fqn, ) + datasource_json = self.client.fetch_datasource( + chart_json.datasource_id + ) + datasource_columns = self.get_column_info( + datasource_json.result.columns + ) + columns_list = [col.displayName for col in datasource_columns] + column_lineage = self._get_column_lineage( + from_entity, to_entity, columns_list + ) if from_entity and to_entity: yield self._get_add_lineage_request( - to_entity=to_entity, from_entity=from_entity + to_entity=to_entity, + from_entity=from_entity, + column_lineage=column_lineage, ) except Exception as exc: yield Either(