Skip to content

Commit

Permalink
Fix open-metadata#9281: reduce code. common method shifted in parent …
Browse files Browse the repository at this point in the history
…class
  • Loading branch information
harshsoni2024 committed Apr 18, 2024
1 parent ca2a033 commit e1d3659
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from metadata.ingestion.api.models import Either, Entity
from metadata.ingestion.api.steps import Source
from metadata.ingestion.api.topology_runner import C, TopologyRunnerMixin
from metadata.ingestion.lineage.sql_lineage import get_column_fqn
from metadata.ingestion.models.delete_entity import DeleteEntity
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.models.patch_request import PatchRequest
Expand Down Expand Up @@ -615,3 +616,28 @@ def create_patch_request(
)
patch_request.new_entity.dataModels = datamodel_entity_ref_list
return patch_request

def _get_column_lineage(
self,
om_table: Table,
data_model_entity: DashboardDataModel,
columns_list: List[str],
) -> List[ColumnLineage]:
"""
Get the column lineage from the fields
"""
try:
column_lineage = []
for field in columns_list or []:
from_column = get_column_fqn(table_entity=om_table, column=field)
to_column = self._get_data_model_column_fqn(
data_model_entity=data_model_entity,
column=field,
)
column_lineage.append(
ColumnLineage(fromColumns=[from_column], toColumn=to_column)
)
return column_lineage
except Exception as exc:
logger.debug(f"Error to get column lineage: {exc}")
logger.debug(traceback.format_exc())
49 changes: 23 additions & 26 deletions ingestion/src/metadata/ingestion/source/dashboard/superset/mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityLineage import ColumnLineage
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.sql_lineage import get_column_fqn
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
from metadata.ingestion.source.dashboard.superset.models import (
Expand Down Expand Up @@ -148,29 +146,6 @@ def _get_charts_of_dashboard(
)
return []

def _get_column_lineage(
self, om_table: Table, data_model_entity: DashboardDataModel
) -> List[ColumnLineage]:
"""
Get the column lineage from database table columns
to data_model columns
"""
try:
column_lineage = []
for column in data_model_entity.columns or []:
from_column = get_column_fqn(
table_entity=om_table, column=column.displayName
)
to_column = column.fullyQualifiedName.__root__
if from_column and to_column:
column_lineage.append(
ColumnLineage(fromColumns=[from_column], toColumn=to_column)
)
return column_lineage
except Exception as exc:
logger.debug(f"Error to get column lineage: {exc}")
logger.debug(traceback.format_exc())

def yield_dashboard_lineage_details(
self,
dashboard_details: Union[FetchDashboard, DashboardResult],
Expand Down Expand Up @@ -206,8 +181,16 @@ def yield_dashboard_lineage_details(
entity=DashboardDataModel,
fqn=datamodel_fqn,
)

datasource_json = self.client.fetch_datasource(
chart_json.datasource_id
)
datasource_columns = self.get_column_info(
datasource_json.result.columns
)
columns_list = [col.displayName for col in datasource_columns]
column_lineage = self._get_column_lineage(
from_entity, to_entity
from_entity, to_entity, columns_list
)
if from_entity and to_entity:
yield self._get_add_lineage_request(
Expand Down Expand Up @@ -275,3 +258,17 @@ def get_column_info(
logger.debug(traceback.format_exc())
logger.warning(f"Error to yield datamodel column: {exc}")
return datasource_columns

@staticmethod
def _get_data_model_column_fqn(
data_model_entity: DashboardDataModel, column: str
) -> Optional[str]:
"""
Get fqn of column if exist in table entity
"""
if not data_model_entity:
return None
for tbl_column in data_model_entity.columns:
if column.lower() == tbl_column.displayName.lower():
return tbl_column.fullyQualifiedName.__root__
return None

0 comments on commit e1d3659

Please sign in to comment.