Skip to content

Commit

Permalink
Fix open-metadata#9281: fix duplication of get_datamodel_column_fqn
Browse files Browse the repository at this point in the history
  • Loading branch information
harshsoni2024 committed Apr 18, 2024
1 parent e1d3659 commit 5a402e3
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,20 @@ def _get_add_lineage_request(

return None

@staticmethod
def _get_data_model_column_fqn(
data_model_entity: DashboardDataModel, column: str
) -> Optional[str]:
"""
Get fqn of column if exist in table entity
"""
if not data_model_entity:
return None
for tbl_column in data_model_entity.columns:
if tbl_column.displayName.lower() == column.lower():
return tbl_column.fullyQualifiedName.__root__
return None

def get_dashboard(self) -> Any:
"""
Method to iterate through dashboard lists filter dashboards & yield dashboard details
Expand Down Expand Up @@ -634,9 +648,10 @@ def _get_column_lineage(
data_model_entity=data_model_entity,
column=field,
)
column_lineage.append(
ColumnLineage(fromColumns=[from_column], toColumn=to_column)
)
if from_column and to_column:
column_lineage.append(
ColumnLineage(fromColumns=[from_column], toColumn=to_column)
)
return column_lineage
except Exception as exc:
logger.debug(f"Error to get column lineage: {exc}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,16 +184,17 @@ def yield_dashboard_lineage_details(
db_service_entity = self.metadata.get_by_name(
entity=DatabaseService, fqn=db_service_name
)
for data_model in self.data_models or []:
for datamodel in self.data_models or []:
try:
data_model_entity = self._get_datamodel(datamodel_id=data_model.id)
data_model_entity = self._get_datamodel(datamodel_id=datamodel.id)
if data_model_entity:
om_table = self._get_database_table(
db_service_entity, data_model_entity
)
if om_table:
columns_list = [col.name for col in datamodel.fields]
column_lineage = self._get_column_lineage(
data_model, om_table, data_model_entity
om_table, data_model_entity, columns_list
)
yield self._get_add_lineage_request(
to_entity=data_model_entity,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityLineage import ColumnLineage
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.sql_lineage import get_column_fqn
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
from metadata.ingestion.source.dashboard.qliksense.client import QlikSenseClient
Expand Down Expand Up @@ -247,45 +245,6 @@ def yield_datamodel(self, _: QlikDashboard) -> Iterable[Either[DashboardDataMode
)
)

@staticmethod
def _get_data_model_column_fqn(
data_model_entity: DashboardDataModel, column: str
) -> Optional[str]:
"""
Get fqn of column if exist in table entity
"""
if not data_model_entity:
return None
for tbl_column in data_model_entity.columns:
if tbl_column.displayName.lower() == column.lower():
return tbl_column.fullyQualifiedName.__root__
return None

def _get_column_lineage(
self,
data_model: QlikTable,
om_table: Table,
data_model_entity: DashboardDataModel,
) -> List[ColumnLineage]:
"""
Get the column lineage from the fields
"""
try:
column_lineage = []
for field in data_model.fields or []:
from_column = get_column_fqn(table_entity=om_table, column=field.name)
to_column = self._get_data_model_column_fqn(
data_model_entity=data_model_entity,
column=field.name,
)
column_lineage.append(
ColumnLineage(fromColumns=[from_column], toColumn=to_column)
)
return column_lineage
except Exception as exc:
logger.debug(f"Error to get column lineage: {exc}")
logger.debug(traceback.format_exc())

def _get_datamodel(self, datamodel_id: str):
datamodel_fqn = fqn.build(
self.metadata,
Expand Down Expand Up @@ -355,8 +314,9 @@ def yield_dashboard_lineage_details(
db_service_entity, datamodel=datamodel
)
if om_table:
columns_list = [col.name for col in datamodel.fields]
column_lineage = self._get_column_lineage(
datamodel, om_table, data_model_entity
om_table, data_model_entity, columns_list
)
yield self._get_add_lineage_request(
to_entity=data_model_entity,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,17 +258,3 @@ def get_column_info(
logger.debug(traceback.format_exc())
logger.warning(f"Error to yield datamodel column: {exc}")
return datasource_columns

@staticmethod
def _get_data_model_column_fqn(
data_model_entity: DashboardDataModel, column: str
) -> Optional[str]:
"""
Get fqn of column if exist in table entity
"""
if not data_model_entity:
return None
for tbl_column in data_model_entity.columns:
if column.lower() == tbl_column.displayName.lower():
return tbl_column.fullyQualifiedName.__root__
return None

0 comments on commit 5a402e3

Please sign in to comment.