Skip to content

Commit

Permalink
MINOR: Added Support for Bigquery and Redshift lifeCycle data (open-m…
Browse files Browse the repository at this point in the history
…etadata#14622)

* Added BQ and redshift lifeCycle

* added usage fixes
  • Loading branch information
OnkarVO7 authored and Abhishek332 committed Jan 25, 2024
1 parent 3e8f5bd commit e81b452
Show file tree
Hide file tree
Showing 14 changed files with 203 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedureCode
from metadata.generated.schema.entity.data.table import (
IntervalType,
Table,
TablePartition,
TableType,
)
Expand All @@ -53,6 +54,7 @@
from metadata.generated.schema.type.tagLabel import TagLabel
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.life_cycle import OMetaLifeCycleData
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_test_connection_fn
Expand All @@ -64,6 +66,7 @@
from metadata.ingestion.source.database.bigquery.queries import (
BIGQUERY_GET_STORED_PROCEDURE_QUERIES,
BIGQUERY_GET_STORED_PROCEDURES,
BIGQUERY_LIFE_CYCLE_QUERY,
BIGQUERY_SCHEMA_DESCRIPTION,
BIGQUERY_TABLE_AND_TYPE,
)
Expand All @@ -72,6 +75,9 @@
CommonDbSourceService,
TableNameAndType,
)
from metadata.ingestion.source.database.life_cycle_query_mixin import (
LifeCycleQueryMixin,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.ingestion.source.database.stored_procedures_mixin import (
QueryByProcedure,
Expand Down Expand Up @@ -195,7 +201,9 @@ def _build_formatted_table_id(table):
)


class BigquerySource(StoredProcedureMixin, CommonDbSourceService, MultiDBSource):
class BigquerySource(
LifeCycleQueryMixin, StoredProcedureMixin, CommonDbSourceService, MultiDBSource
):
"""
Implements the necessary methods to extract
Database metadata from Bigquery Source
Expand Down Expand Up @@ -534,6 +542,37 @@ def close(self):
if os.path.exists(temp_file_path):
os.remove(temp_file_path)

def yield_life_cycle_data(self, _) -> Iterable[Either[OMetaLifeCycleData]]:
"""
Get the life cycle data of the table
"""
try:
table_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=self.context.database_service,
database_name=self.context.database,
schema_name=self.context.database_schema,
table_name=self.context.table,
skip_es_search=True,
)
table = self.metadata.get_by_name(entity=Table, fqn=table_fqn)
yield from self.get_life_cycle_data(
entity=table,
query=BIGQUERY_LIFE_CYCLE_QUERY.format(
database_name=table.database.name,
schema_name=table.databaseSchema.name,
),
)
except Exception as exc:
yield Either(
left=StackTraceError(
name="lifeCycle",
error=f"Error Processing life cycle data: {exc}",
stackTrace=traceback.format_exc(),
)
)

def _get_source_url(
self,
database_name: Optional[str] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,14 @@
ORDER BY procedure_start_time DESC
"""
)

BIGQUERY_LIFE_CYCLE_QUERY = textwrap.dedent(
"""
select
table_name as table_name,
creation_time as created_at
from `{schema_name}`.INFORMATION_SCHEMA.TABLES
where table_schema = '{schema_name}'
and table_catalog = '{database_name}'
"""
)
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from abc import ABC
from copy import deepcopy
from datetime import datetime
from typing import Optional

from google import auth

Expand Down Expand Up @@ -86,3 +87,15 @@ def get_engine(self):
yield inspector_details.engine
else:
yield self.engine

def check_life_cycle_query(
self, query_type: Optional[str], query_text: Optional[str]
) -> bool:
"""
returns true if query is to be used for life cycle processing.
Override if we have specific parameters
"""
if query_type != "SELECT":
return True
return False
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,4 @@ class BigqueryUsageSource(BigqueryQueryParserSource, UsageSource):

sql_stmt = BIGQUERY_STATEMENT

filters = """
AND statement_type = "SELECT"
"""
filters = ""
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseServiceMetadataPipeline,
)
from metadata.generated.schema.type.lifeCycle import AccessDetails, LifeCycle
from metadata.ingestion.api.models import Either, Entity
from metadata.ingestion.api.status import Status
from metadata.ingestion.models.life_cycle import OMetaLifeCycleData
from metadata.ingestion.models.topology import TopologyContext
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.logger import ingestion_logger
from metadata.utils.time_utils import convert_timestamp_to_milliseconds

logger = ingestion_logger()

Expand Down Expand Up @@ -85,3 +89,32 @@ def life_cycle_query_dict(
)

return queries_dict

def get_life_cycle_data(self, entity: Entity, query: str):
"""
Get the life cycle data
"""
if entity:
try:
life_cycle_data = self.life_cycle_query_dict(query=query).get(
entity.name.__root__
)
if life_cycle_data:
life_cycle = LifeCycle(
created=AccessDetails(
timestamp=convert_timestamp_to_milliseconds(
life_cycle_data.created_at.timestamp()
)
)
)
yield Either(
right=OMetaLifeCycleData(entity=entity, life_cycle=life_cycle)
)
except Exception as exc:
yield Either(
left=StackTraceError(
name=entity.name.__root__,
error=f"Unable to get the table life cycle data for table {entity.name.__root__}: {exc}",
stackTrace=traceback.format_exc(),
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"""
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Iterator
from typing import Iterator, Optional

from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
Expand Down Expand Up @@ -90,7 +90,9 @@ def get_sql_statement(self, start_time: datetime, end_time: datetime) -> str:
)

def check_life_cycle_query(
self, query_type: str # pylint: disable=unused-argument
self,
query_type: Optional[str], # pylint: disable=unused-argument
query_text: Optional[str], # pylint: disable=unused-argument
) -> bool:
"""
returns true if query is to be used for life cycle processing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from metadata.generated.schema.entity.data.table import (
ConstraintType,
IntervalType,
Table,
TableConstraint,
TablePartition,
TableType,
Expand All @@ -49,18 +50,23 @@
from metadata.generated.schema.type.basic import EntityName
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.life_cycle import OMetaLifeCycleData
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.common_db_source import (
CommonDbSourceService,
TableNameAndType,
)
from metadata.ingestion.source.database.life_cycle_query_mixin import (
LifeCycleQueryMixin,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.ingestion.source.database.redshift.models import RedshiftStoredProcedure
from metadata.ingestion.source.database.redshift.queries import (
REDSHIFT_GET_ALL_RELATION_INFO,
REDSHIFT_GET_DATABASE_NAMES,
REDSHIFT_GET_STORED_PROCEDURE_QUERIES,
REDSHIFT_GET_STORED_PROCEDURES,
REDSHIFT_LIFE_CYCLE_QUERY,
REDSHIFT_PARTITION_DETAILS,
)
from metadata.ingestion.source.database.redshift.utils import (
Expand Down Expand Up @@ -106,7 +112,9 @@
)


class RedshiftSource(StoredProcedureMixin, CommonDbSourceService, MultiDBSource):
class RedshiftSource(
LifeCycleQueryMixin, StoredProcedureMixin, CommonDbSourceService, MultiDBSource
):
"""
Implements the necessary methods to extract
Database metadata from Redshift Source
Expand Down Expand Up @@ -305,3 +313,34 @@ def get_stored_procedure_queries_dict(self) -> Dict[str, List[QueryByProcedure]]
)

return queries_dict

def yield_life_cycle_data(self, _) -> Iterable[Either[OMetaLifeCycleData]]:
"""
Get the life cycle data of the table
"""
try:
table_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=self.context.database_service,
database_name=self.context.database,
schema_name=self.context.database_schema,
table_name=self.context.table,
skip_es_search=True,
)
table = self.metadata.get_by_name(entity=Table, fqn=table_fqn)
yield from self.get_life_cycle_data(
entity=table,
query=REDSHIFT_LIFE_CYCLE_QUERY.format(
database_name=table.database.name,
schema_name=table.databaseSchema.name,
),
)
except Exception as exc:
yield Either(
left=StackTraceError(
name="lifeCycle",
error=f"Error Processing life cycle data: {exc}",
stackTrace=traceback.format_exc(),
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -350,3 +350,13 @@
order by procedure_start_time DESC
"""
)

REDSHIFT_LIFE_CYCLE_QUERY = textwrap.dedent(
"""
select "table" as table_name,
create_time as created_at
from pg_catalog.svv_table_info o
where o.schema = '{schema_name}'
and o.database = '{database_name}'
"""
)
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
"""
Redshift usage module
"""
import re
from abc import ABC
from datetime import datetime
from typing import Optional

from metadata.generated.schema.entity.services.connections.database.redshiftConnection import (
RedshiftConnection,
Expand Down Expand Up @@ -55,3 +57,17 @@ def get_sql_statement(self, start_time: datetime, end_time: datetime) -> str:
filters=self.get_filters(),
result_limit=self.source_config.resultLimit,
)

def check_life_cycle_query(
self, query_type: Optional[str], query_text: Optional[str]
) -> bool:
"""
returns true if query is to be used for life cycle processing.
Override if we have specific parameters
"""
create_pattern = re.compile(r".*\s*CREATE", re.IGNORECASE)
insert_pattern = re.compile(r".*\s*INSERT", re.IGNORECASE)
if re.match(create_pattern, query_text) or re.match(insert_pattern, query_text):
return True
return False
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ class RedshiftUsageSource(RedshiftQueryParserSource, UsageSource):
AND querytxt NOT ILIKE 'fetch%%'
AND querytxt NOT ILIKE 'padb_fetch_sample:%%'
AND querytxt NOT ILIKE 'Undoing%%transactions%%on%%table%%with%%current%%xid%%'
AND querytxt NOT ILIKE '%%create%%table%%as%%select%%'
AND querytxt NOT ILIKE '%%insert%%'
"""

sql_stmt = REDSHIFT_SQL_STATEMENT
Loading

0 comments on commit e81b452

Please sign in to comment.