diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index 7e325c2ef92e..239d32218219 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -174,6 +174,7 @@ def yield_database( service=self.context.database_service, description=self.get_database_description(database_name), sourceUrl=self.get_source_url(database_name=database_name), + tags=self.get_database_tag_labels(database_name=database_name), ) ) diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index 79442a4647e7..f94a02e54caf 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -113,6 +113,13 @@ class DatabaseServiceTopology(ServiceTopology): database = TopologyNode( producer="get_database_names", stages=[ + NodeStage( + type_=OMetaTagAndClassification, + context="tags", + processor="yield_database_tag_details", + nullable=True, + store_all_in_context=True, + ), NodeStage( type_=Database, context="database", @@ -120,7 +127,7 @@ class DatabaseServiceTopology(ServiceTopology): consumer=["database_service"], cache_entities=True, use_cache=True, - ) + ), ], children=["databaseSchema"], ) @@ -273,6 +280,13 @@ def yield_tag( From topology. To be run for each schema """ + def yield_database_tag( + self, database_name: str + ) -> Iterable[Either[OMetaTagAndClassification]]: + """ + From topology. To be run for each database + """ + def yield_table_tags( self, table_name_and_type: Tuple[str, TableType] ) -> Iterable[Either[OMetaTagAndClassification]]: @@ -298,6 +312,15 @@ def yield_database_schema_tag_details( if self.source_config.includeTags: yield from self.yield_tag(schema_name) or [] + def yield_database_tag_details( + self, database_name: str + ) -> Iterable[Either[OMetaTagAndClassification]]: + """ + From topology. To be run for each database + """ + if self.source_config.includeTags: + yield from self.yield_database_tag(database_name) or [] + @abstractmethod def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]: """ @@ -364,6 +387,20 @@ def get_tag_by_fqn(self, entity_fqn: str) -> Optional[List[TagLabel]]: tag_labels.append(tag_label) return tag_labels or None + def get_database_tag_labels(self, database_name: str) -> Optional[List[TagLabel]]: + """ + Method to get schema tags + This will only get executed if the tags context + is properly informed + """ + database_fqn = fqn.build( + self.metadata, + entity_type=Database, + service_name=self.context.database_service, + database_name=database_name, + ) + return self.get_tag_by_fqn(entity_fqn=database_fqn) + def get_schema_tag_labels(self, schema_name: str) -> Optional[List[TagLabel]]: """ Method to get schema tags diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/metadata.py b/ingestion/src/metadata/ingestion/source/database/databricks/metadata.py index 1e9102ad9719..97efc271e792 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/metadata.py @@ -13,7 +13,7 @@ import re import traceback from copy import deepcopy -from typing import Iterable, Optional +from typing import Iterable, Optional, Tuple from pyhive.sqlalchemy_hive import _type_map from sqlalchemy import types, util @@ -24,20 +24,31 @@ from sqlalchemy_databricks._dialect import DatabricksDialect from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema +from metadata.generated.schema.entity.data.table import Column, Table, TableType from metadata.generated.schema.entity.services.connections.database.databricksConnection import ( DatabricksConnection, ) +from metadata.generated.schema.entity.services.ingestionPipelines.status import ( + StackTraceError, +) from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) +from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException +from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection from metadata.ingestion.source.database.column_type_parser import create_sqlalchemy_type from metadata.ingestion.source.database.common_db_source import CommonDbSourceService from metadata.ingestion.source.database.databricks.queries import ( DATABRICKS_GET_CATALOGS, + DATABRICKS_GET_CATALOGS_TAGS, + DATABRICKS_GET_COLUMN_TAGS, + DATABRICKS_GET_SCHEMA_TAGS, DATABRICKS_GET_TABLE_COMMENTS, + DATABRICKS_GET_TABLE_TAGS, DATABRICKS_VIEW_DEFINITIONS, ) from metadata.ingestion.source.database.multi_db_source import MultiDBSource @@ -49,9 +60,13 @@ get_all_view_definitions, get_view_definition_wrapper, ) +from metadata.utils.tag_utils import get_ometa_tag_and_classification logger = ingestion_logger() +DATABRICKS_TAG = "DATABRICK TAG" +DATABRICKS_TAG_CLASSIFICATION = "DATABRICK TAG CLASSIFICATION" + class STRUCT(String): # This class is added to support STRUCT datatype @@ -284,6 +299,7 @@ def set_inspector(self, database_name: str) -> None: new_service_connection.catalog = database_name self.engine = get_connection(new_service_connection) self.inspector = inspect(self.engine) + self._connection = None # Lazy init as well def get_configured_database(self) -> Optional[str]: return self.service_connection.catalog @@ -337,3 +353,133 @@ def get_raw_database_schema_names(self) -> Iterable[str]: is_old_version=self.is_older_version, ): yield schema_name + + def yield_database_tag( + self, database_name: str + ) -> Iterable[Either[OMetaTagAndClassification]]: + """ + Method to yield database tags + """ + try: + tags = self.connection.execute( + DATABRICKS_GET_CATALOGS_TAGS.format(database_name=database_name) + ) + for tag in tags: + yield from get_ometa_tag_and_classification( + tag_fqn=fqn.build( + self.metadata, + Database, + service_name=self.context.database_service, + database_name=database_name, + ), + tags=[tag.tag_value], + classification_name=tag.tag_name, + tag_description=DATABRICKS_TAG, + classification_description=DATABRICKS_TAG_CLASSIFICATION, + ) + + except Exception as exc: + yield Either( + left=StackTraceError( + name="Tags and Classifications", + error=f"Failed to fetch database tags due to [{exc}]", + stackTrace=traceback.format_exc(), + ) + ) + + def yield_tag( + self, schema_name: str + ) -> Iterable[Either[OMetaTagAndClassification]]: + """ + Method to yield schema tags + """ + try: + tags = self.connection.execute( + DATABRICKS_GET_SCHEMA_TAGS.format( + database_name=self.context.database, schema_name=schema_name + ) + ) + for tag in tags: + yield from get_ometa_tag_and_classification( + tag_fqn=fqn.build( + self.metadata, + DatabaseSchema, + service_name=self.context.database_service, + database_name=self.context.database, + schema_name=schema_name, + ), + tags=[tag.tag_value], + classification_name=tag.tag_name, + tag_description=DATABRICKS_TAG, + classification_description=DATABRICKS_TAG_CLASSIFICATION, + ) + + except Exception as exc: + yield Either( + left=StackTraceError( + name="Tags and Classifications", + error=f"Failed to fetch schema tags due to [{exc}]", + stackTrace=traceback.format_exc(), + ) + ) + + def yield_table_tags( + self, table_name_and_type: Tuple[str, TableType] + ) -> Iterable[Either[OMetaTagAndClassification]]: + table_name, _ = table_name_and_type + try: + table_tags = self.connection.execute( + DATABRICKS_GET_TABLE_TAGS.format( + database_name=self.context.database, + schema_name=self.context.database_schema, + table_name=table_name, + ) + ) + for tag in table_tags: + yield from get_ometa_tag_and_classification( + tag_fqn=fqn.build( + self.metadata, + Table, + service_name=self.context.database_service, + database_name=self.context.database, + schema_name=self.context.database_schema, + table_name=table_name, + ), + tags=[tag.tag_value], + classification_name=tag.tag_name, + tag_description=DATABRICKS_TAG, + classification_description=DATABRICKS_TAG_CLASSIFICATION, + ) + + column_tags = self.connection.execute( + DATABRICKS_GET_COLUMN_TAGS.format( + database_name=self.context.database, + schema_name=self.context.database_schema, + table_name=table_name, + ) + ) + for tag in column_tags: + yield from get_ometa_tag_and_classification( + tag_fqn=fqn.build( + self.metadata, + Column, + service_name=self.context.database_service, + database_name=self.context.database, + schema_name=self.context.database_schema, + table_name=table_name, + column_name=tag.column_name, + ), + tags=[tag.tag_value], + classification_name=tag.tag_name, + tag_description=DATABRICKS_TAG, + classification_description=DATABRICKS_TAG_CLASSIFICATION, + ) + + except Exception as exc: + yield Either( + left=StackTraceError( + name="Tags and Classifications", + error=f"Failed to fetch table/column tags due to [{exc}]", + stackTrace=traceback.format_exc(), + ) + ) diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/queries.py b/ingestion/src/metadata/ingestion/source/database/databricks/queries.py index d409bdeb54ee..17e37f67a0b0 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/queries.py @@ -28,3 +28,31 @@ DATABRICKS_GET_TABLE_COMMENTS = "DESCRIBE TABLE EXTENDED {schema_name}.{table_name}" DATABRICKS_GET_CATALOGS = "SHOW CATALOGS" + +DATABRICKS_GET_CATALOGS_TAGS = textwrap.dedent( + """SELECT * FROM {database_name}.information_schema.catalog_tags;""" +) + +DATABRICKS_GET_SCHEMA_TAGS = textwrap.dedent( + """ + SELECT + * + FROM {database_name}.information_schema.schema_tags + WHERE schema_name='{schema_name}'""" +) + +DATABRICKS_GET_TABLE_TAGS = textwrap.dedent( + """ + SELECT + * + FROM {database_name}.information_schema.table_tags + WHERE schema_name = '{schema_name}' AND table_name = '{table_name}';""" +) + +DATABRICKS_GET_COLUMN_TAGS = textwrap.dedent( + """ + SELECT + * + FROM {database_name}.information_schema.column_tags + WHERE schema_name='{schema_name}' AND table_name='{table_name}';""" +) diff --git a/ingestion/tests/unit/topology/database/test_hive.py b/ingestion/tests/unit/topology/database/test_hive.py index 8ed0ebc3521b..c0adb11afed3 100644 --- a/ingestion/tests/unit/topology/database/test_hive.py +++ b/ingestion/tests/unit/topology/database/test_hive.py @@ -347,7 +347,8 @@ def __init__( def test_yield_database(self): assert EXPECTED_DATABASE == [ - either.right for either in self.hive.yield_database(MOCK_DATABASE.name) + either.right + for either in self.hive.yield_database(MOCK_DATABASE.name.__root__) ] self.hive.context.__dict__[ diff --git a/ingestion/tests/unit/topology/database/test_mssql.py b/ingestion/tests/unit/topology/database/test_mssql.py index 0dd8416d95b3..80191098191c 100644 --- a/ingestion/tests/unit/topology/database/test_mssql.py +++ b/ingestion/tests/unit/topology/database/test_mssql.py @@ -321,7 +321,8 @@ def __init__( def test_yield_database(self): assert EXPECTED_DATABASE == [ - either.right for either in self.mssql.yield_database(MOCK_DATABASE.name) + either.right + for either in self.mssql.yield_database(MOCK_DATABASE.name.__root__) ] self.mssql.context.__dict__[ diff --git a/ingestion/tests/unit/topology/database/test_oracle.py b/ingestion/tests/unit/topology/database/test_oracle.py index e780610aa509..ced9e87f28de 100644 --- a/ingestion/tests/unit/topology/database/test_oracle.py +++ b/ingestion/tests/unit/topology/database/test_oracle.py @@ -201,7 +201,8 @@ def __init__( def test_yield_database(self): assert EXPECTED_DATABASE == [ - either.right for either in self.oracle.yield_database(MOCK_DATABASE.name) + either.right + for either in self.oracle.yield_database(MOCK_DATABASE.name.__root__) ] self.oracle.context.__dict__["database"] = MOCK_DATABASE.name.__root__ diff --git a/openmetadata-docs/content/v1.3.x-SNAPSHOT/connectors/database/databricks/index.md b/openmetadata-docs/content/v1.3.x-SNAPSHOT/connectors/database/databricks/index.md index e1df1830e875..6d6253aef560 100644 --- a/openmetadata-docs/content/v1.3.x-SNAPSHOT/connectors/database/databricks/index.md +++ b/openmetadata-docs/content/v1.3.x-SNAPSHOT/connectors/database/databricks/index.md @@ -15,7 +15,7 @@ slug: /connectors/database/databricks | Data Profiler | {% icon iconName="check" /%} | | Data Quality | {% icon iconName="check" /%} | | Owners | {% icon iconName="cross" /%} | -| Tags | {% icon iconName="cross" /%} | +| Tags | {% icon iconName="check" /%} | | DBT | {% icon iconName="check" /%} | | Supported Versions | Databricks Runtime Version 9+ | @@ -27,6 +27,11 @@ slug: /connectors/database/databricks {% /multiTablesWrapper %} +{% note %} +As per the [documentation](https://docs.databricks.com/en/data-governance/unity-catalog/tags.html#manage-tags-with-sql-commands) here, note that we only support metadata `tag` extraction for databricks version 13.3 version and higher. +{% /note %} + + In this section, we provide guides and references to use the Databricks connector. Configure and schedule Databricks metadata and profiler workflows from the OpenMetadata UI: