Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR - metadata tag extraction for Databricks #14874

Merged
merged 7 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ def yield_database(
service=self.context.database_service,
description=self.get_database_description(database_name),
sourceUrl=self.get_source_url(database_name=database_name),
tags=self.get_database_tag_labels(database_name=database_name),
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,21 @@ class DatabaseServiceTopology(ServiceTopology):
database = TopologyNode(
producer="get_database_names",
stages=[
NodeStage(
type_=OMetaTagAndClassification,
context="tags",
processor="yield_database_tag_details",
nullable=True,
store_all_in_context=True,
),
NodeStage(
type_=Database,
context="database",
processor="yield_database",
consumer=["database_service"],
cache_entities=True,
use_cache=True,
)
),
],
children=["databaseSchema"],
)
Expand Down Expand Up @@ -273,6 +280,13 @@ def yield_tag(
From topology. To be run for each schema
"""

def yield_database_tag(
self, database_name: str
) -> Iterable[Either[OMetaTagAndClassification]]:
"""
From topology. To be run for each database
"""

def yield_table_tags(
self, table_name_and_type: Tuple[str, TableType]
) -> Iterable[Either[OMetaTagAndClassification]]:
Expand All @@ -298,6 +312,15 @@ def yield_database_schema_tag_details(
if self.source_config.includeTags:
yield from self.yield_tag(schema_name) or []

def yield_database_tag_details(
self, database_name: str
) -> Iterable[Either[OMetaTagAndClassification]]:
"""
From topology. To be run for each database
"""
if self.source_config.includeTags:
yield from self.yield_database_tag(database_name) or []

@abstractmethod
def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
"""
Expand Down Expand Up @@ -364,6 +387,20 @@ def get_tag_by_fqn(self, entity_fqn: str) -> Optional[List[TagLabel]]:
tag_labels.append(tag_label)
return tag_labels or None

def get_database_tag_labels(self, database_name: str) -> Optional[List[TagLabel]]:
"""
Method to get schema tags
This will only get executed if the tags context
is properly informed
"""
database_fqn = fqn.build(
self.metadata,
entity_type=Database,
service_name=self.context.database_service,
database_name=database_name,
)
return self.get_tag_by_fqn(entity_fqn=database_fqn)

def get_schema_tag_labels(self, schema_name: str) -> Optional[List[TagLabel]]:
"""
Method to get schema tags
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import re
import traceback
from copy import deepcopy
from typing import Iterable, Optional
from typing import Iterable, Optional, Tuple

from pyhive.sqlalchemy_hive import _type_map
from sqlalchemy import types, util
Expand All @@ -24,20 +24,31 @@
from sqlalchemy_databricks._dialect import DatabricksDialect

from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import Column, Table, TableType
from metadata.generated.schema.entity.services.connections.database.databricksConnection import (
DatabricksConnection,
)
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.column_type_parser import create_sqlalchemy_type
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.ingestion.source.database.databricks.queries import (
DATABRICKS_GET_CATALOGS,
DATABRICKS_GET_CATALOGS_TAGS,
DATABRICKS_GET_COLUMN_TAGS,
DATABRICKS_GET_SCHEMA_TAGS,
DATABRICKS_GET_TABLE_COMMENTS,
DATABRICKS_GET_TABLE_TAGS,
DATABRICKS_VIEW_DEFINITIONS,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
Expand All @@ -49,9 +60,13 @@
get_all_view_definitions,
get_view_definition_wrapper,
)
from metadata.utils.tag_utils import get_ometa_tag_and_classification

logger = ingestion_logger()

DATABRICKS_TAG = "DATABRICK TAG"
DATABRICKS_TAG_CLASSIFICATION = "DATABRICK TAG CLASSIFICATION"


class STRUCT(String):
# This class is added to support STRUCT datatype
Expand Down Expand Up @@ -284,6 +299,7 @@ def set_inspector(self, database_name: str) -> None:
new_service_connection.catalog = database_name
self.engine = get_connection(new_service_connection)
self.inspector = inspect(self.engine)
self._connection = None # Lazy init as well

def get_configured_database(self) -> Optional[str]:
return self.service_connection.catalog
Expand Down Expand Up @@ -337,3 +353,133 @@ def get_raw_database_schema_names(self) -> Iterable[str]:
is_old_version=self.is_older_version,
):
yield schema_name

def yield_database_tag(
self, database_name: str
) -> Iterable[Either[OMetaTagAndClassification]]:
"""
Method to yield database tags
"""
try:
tags = self.connection.execute(
DATABRICKS_GET_CATALOGS_TAGS.format(database_name=database_name)
)
for tag in tags:
yield from get_ometa_tag_and_classification(
tag_fqn=fqn.build(
self.metadata,
Database,
service_name=self.context.database_service,
database_name=database_name,
),
tags=[tag.tag_value],
classification_name=tag.tag_name,
tag_description=DATABRICKS_TAG,
classification_description=DATABRICKS_TAG_CLASSIFICATION,
)

except Exception as exc:
yield Either(
left=StackTraceError(
name="Tags and Classifications",
error=f"Failed to fetch database tags due to [{exc}]",
stackTrace=traceback.format_exc(),
)
)

def yield_tag(
self, schema_name: str
) -> Iterable[Either[OMetaTagAndClassification]]:
"""
Method to yield schema tags
"""
try:
tags = self.connection.execute(
DATABRICKS_GET_SCHEMA_TAGS.format(
database_name=self.context.database, schema_name=schema_name
)
)
for tag in tags:
yield from get_ometa_tag_and_classification(
tag_fqn=fqn.build(
self.metadata,
DatabaseSchema,
service_name=self.context.database_service,
database_name=self.context.database,
schema_name=schema_name,
),
tags=[tag.tag_value],
classification_name=tag.tag_name,
tag_description=DATABRICKS_TAG,
classification_description=DATABRICKS_TAG_CLASSIFICATION,
)

except Exception as exc:
yield Either(
left=StackTraceError(
name="Tags and Classifications",
error=f"Failed to fetch schema tags due to [{exc}]",
stackTrace=traceback.format_exc(),
)
)

def yield_table_tags(
self, table_name_and_type: Tuple[str, TableType]
) -> Iterable[Either[OMetaTagAndClassification]]:
table_name, _ = table_name_and_type
try:
table_tags = self.connection.execute(
DATABRICKS_GET_TABLE_TAGS.format(
database_name=self.context.database,
schema_name=self.context.database_schema,
table_name=table_name,
)
)
for tag in table_tags:
yield from get_ometa_tag_and_classification(
tag_fqn=fqn.build(
self.metadata,
Table,
service_name=self.context.database_service,
database_name=self.context.database,
schema_name=self.context.database_schema,
table_name=table_name,
),
tags=[tag.tag_value],
classification_name=tag.tag_name,
tag_description=DATABRICKS_TAG,
classification_description=DATABRICKS_TAG_CLASSIFICATION,
)

column_tags = self.connection.execute(
DATABRICKS_GET_COLUMN_TAGS.format(
database_name=self.context.database,
schema_name=self.context.database_schema,
table_name=table_name,
)
)
for tag in column_tags:
yield from get_ometa_tag_and_classification(
tag_fqn=fqn.build(
self.metadata,
Column,
service_name=self.context.database_service,
database_name=self.context.database,
schema_name=self.context.database_schema,
table_name=table_name,
column_name=tag.column_name,
),
tags=[tag.tag_value],
classification_name=tag.tag_name,
tag_description=DATABRICKS_TAG,
classification_description=DATABRICKS_TAG_CLASSIFICATION,
)

except Exception as exc:
yield Either(
left=StackTraceError(
name="Tags and Classifications",
error=f"Failed to fetch table/column tags due to [{exc}]",
stackTrace=traceback.format_exc(),
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,31 @@
DATABRICKS_GET_TABLE_COMMENTS = "DESCRIBE TABLE EXTENDED {schema_name}.{table_name}"

DATABRICKS_GET_CATALOGS = "SHOW CATALOGS"

DATABRICKS_GET_CATALOGS_TAGS = textwrap.dedent(
"""SELECT * FROM {database_name}.information_schema.catalog_tags;"""
)

DATABRICKS_GET_SCHEMA_TAGS = textwrap.dedent(
"""
SELECT
*
FROM {database_name}.information_schema.schema_tags
WHERE schema_name='{schema_name}'"""
)

DATABRICKS_GET_TABLE_TAGS = textwrap.dedent(
"""
SELECT
*
FROM {database_name}.information_schema.table_tags
WHERE schema_name = '{schema_name}' AND table_name = '{table_name}';"""
)

DATABRICKS_GET_COLUMN_TAGS = textwrap.dedent(
"""
SELECT
*
FROM {database_name}.information_schema.column_tags
WHERE schema_name='{schema_name}' AND table_name='{table_name}';"""
)
3 changes: 2 additions & 1 deletion ingestion/tests/unit/topology/database/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,8 @@ def __init__(

def test_yield_database(self):
assert EXPECTED_DATABASE == [
either.right for either in self.hive.yield_database(MOCK_DATABASE.name)
either.right
for either in self.hive.yield_database(MOCK_DATABASE.name.__root__)
]

self.hive.context.__dict__[
Expand Down
3 changes: 2 additions & 1 deletion ingestion/tests/unit/topology/database/test_mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,8 @@ def __init__(

def test_yield_database(self):
assert EXPECTED_DATABASE == [
either.right for either in self.mssql.yield_database(MOCK_DATABASE.name)
either.right
for either in self.mssql.yield_database(MOCK_DATABASE.name.__root__)
]

self.mssql.context.__dict__[
Expand Down
3 changes: 2 additions & 1 deletion ingestion/tests/unit/topology/database/test_oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ def __init__(

def test_yield_database(self):
assert EXPECTED_DATABASE == [
either.right for either in self.oracle.yield_database(MOCK_DATABASE.name)
either.right
for either in self.oracle.yield_database(MOCK_DATABASE.name.__root__)
]

self.oracle.context.__dict__["database"] = MOCK_DATABASE.name.__root__
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ slug: /connectors/database/databricks
| Data Profiler | {% icon iconName="check" /%} |
| Data Quality | {% icon iconName="check" /%} |
| Owners | {% icon iconName="cross" /%} |
| Tags | {% icon iconName="cross" /%} |
| Tags | {% icon iconName="check" /%} |
| DBT | {% icon iconName="check" /%} |
| Supported Versions | Databricks Runtime Version 9+ |

Expand All @@ -27,6 +27,11 @@ slug: /connectors/database/databricks

{% /multiTablesWrapper %}

{% note %}
As per the [documentation](https://docs.databricks.com/en/data-governance/unity-catalog/tags.html#manage-tags-with-sql-commands) here, note that we only support metadata `tag` extraction for databricks version 13.3 version and higher.
{% /note %}


In this section, we provide guides and references to use the Databricks connector.

Configure and schedule Databricks metadata and profiler workflows from the OpenMetadata UI:
Expand Down
Loading