Skip to content

Commit

Permalink
#11799 - Fix Airfow ownership & add pipeline tasks (#14510)
Browse files Browse the repository at this point in the history
* Fix airflow owner and add tasks

* Add pipeline tasks ownership

* MINOR - Fix py CI

* Add pipeline tasks ownership

* Add pipeline tasks ownership

* MINOR - Fix py CI

* MINOR - Fix py CI

* Add pipeline tasks ownership

* patch team

* patch team

* Format
  • Loading branch information
pmbrull authored Dec 28, 2023
1 parent 95b90bc commit b84ce33
Show file tree
Hide file tree
Showing 20 changed files with 574 additions and 217 deletions.
17 changes: 17 additions & 0 deletions ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,23 @@ def _search_es_entity(

return None

def _get_entity_from_es(
self, entity: Type[T], query_string: str, fields: Optional[list] = None
) -> Optional[T]:
"""Fetch an entity instance from ES"""

try:
entity_list = self._search_es_entity(
entity_type=entity, query_string=query_string, fields=fields
)
for instance in entity_list or []:
return instance
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(f"Could not get {entity.__name__} info from ES due to {err}")

return None

def es_search_from_fqn(
self,
entity_type: Type[T],
Expand Down
139 changes: 117 additions & 22 deletions ingestion/src/metadata/ingestion/ometa/mixins/user_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
To be used by OpenMetadata class
"""
import traceback
from functools import lru_cache
from typing import Optional
from typing import Optional, Type

from metadata.generated.schema.entity.teams.team import Team
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import T
from metadata.ingestion.ometa.client import REST
from metadata.utils.constants import ENTITY_REFERENCE_TYPE_MAP
from metadata.utils.elasticsearch import ES_INDEX_MAP
from metadata.utils.logger import ometa_logger

Expand All @@ -34,42 +37,134 @@ class OMetaUserMixin:

client: REST

email_search = (
"/search/query?q=email.keyword:{email}&from={from_}&size={size}&index="
+ ES_INDEX_MAP[User.__name__]
)
@staticmethod
def email_search_query_es(entity: Type[T]) -> str:
return (
"/search/query?q=email.keyword:{email}&from={from_}&size={size}&index="
+ ES_INDEX_MAP[entity.__name__]
)

@lru_cache(maxsize=None)
def get_user_by_email(
@staticmethod
def name_search_query_es(entity: Type[T]) -> str:
"""
Allow for more flexible lookup following what the UI is doing when searching users.
We don't want to stick to `q=name:{name}` since in case a user is named `random.user`
but looked as `Random User`, we want to find this match.
"""
return (
"/search/query?q={name}&from={from_}&size={size}&index="
+ ES_INDEX_MAP[entity.__name__]
)

def _search_by_email(
self,
entity: Type[T],
email: Optional[str],
from_count: int = 0,
size: int = 1,
fields: Optional[list] = None,
) -> Optional[User]:
) -> Optional[T]:
"""
GET user entity by name
GET user or team entity by mail
Args:
email: user email to search
from_count: records to expect
size: number of records
fields: Optional field list to pass to ES request
"""
if email:
query_string = self.email_search.format(
query_string = self.email_search_query_es(entity=entity).format(
email=email, from_=from_count, size=size
)
return self._get_entity_from_es(
entity=entity, query_string=query_string, fields=fields
)

return None

def _search_by_name(
self,
entity: Type[T],
name: Optional[str],
from_count: int = 0,
size: int = 1,
fields: Optional[list] = None,
) -> Optional[T]:
"""
GET entity by name
Args:
name: user name to search
from_count: records to expect
size: number of records
fields: Optional field list to pass to ES request
"""
if name:
query_string = self.name_search_query_es(entity=entity).format(
name=name, from_=from_count, size=size
)
return self._get_entity_from_es(
entity=entity, query_string=query_string, fields=fields
)

try:
entity_list = self._search_es_entity(
entity_type=User, query_string=query_string, fields=fields
)
for user in entity_list or []:
return user
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Could not get user info from ES for user email {email} due to {err}"
)
return None

@lru_cache(maxsize=None)
def get_reference_by_email(
self,
email: Optional[str],
from_count: int = 0,
size: int = 1,
fields: Optional[list] = None,
) -> Optional[EntityReference]:
"""
Get a User or Team Entity Reference by searching by its mail
"""
maybe_user = self._search_by_email(
entity=User, email=email, from_count=from_count, size=size, fields=fields
)
if maybe_user:
return EntityReference(
id=maybe_user.id.__root__, type=ENTITY_REFERENCE_TYPE_MAP[User.__name__]
)

maybe_team = self._search_by_email(
entity=Team, email=email, from_count=from_count, size=size, fields=fields
)
if maybe_team:
return EntityReference(
id=maybe_team.id.__root__, type=ENTITY_REFERENCE_TYPE_MAP[Team.__name__]
)

return None

@lru_cache(maxsize=None)
def get_reference_by_name(
self,
name: Optional[str],
from_count: int = 0,
size: int = 1,
fields: Optional[list] = None,
) -> Optional[EntityReference]:
"""
Get a User or Team Entity Reference by searching by its name
"""
maybe_user = self._search_by_name(
entity=User, name=name, from_count=from_count, size=size, fields=fields
)
if maybe_user:
return EntityReference(
id=maybe_user.id.__root__, type=ENTITY_REFERENCE_TYPE_MAP[User.__name__]
)

maybe_team = self._search_by_name(
entity=Team, name=name, from_count=from_count, size=size, fields=fields
)
if maybe_team:
return EntityReference(
id=maybe_team.id.__root__, type=ENTITY_REFERENCE_TYPE_MAP[Team.__name__]
)

return None
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,7 @@ def get_owner_details(
try:
owner_details = self.client.domo.users_get(owner.id)
if owner_details.get("email"):
user = self.metadata.get_user_by_email(owner_details["email"])
if user:
return EntityReference(id=user.id.__root__, type="user")
logger.warning(
f"No user found with email [{owner_details['email']}] in OMD"
)
return self.metadata.get_reference_by_email(owner_details["email"])
except Exception as exc:
logger.warning(
f"Error while getting details of user {owner.displayName} - {exc}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,9 +634,7 @@ def get_owner_details(
try:
if dashboard_details.user_id is not None:
dashboard_owner = self.client.user(dashboard_details.user_id)
user = self.metadata.get_user_by_email(dashboard_owner.email)
if user:
return EntityReference(id=user.id.__root__, type="user")
return self.metadata.get_reference_by_email(dashboard_owner.email)

except Exception as err:
logger.debug(traceback.format_exc())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,9 @@ def get_dashboard_details(self, dashboard: dict) -> dict:
def get_owner_details(self, dashboard_details) -> Optional[EntityReference]:
"""Get owner from mail"""
if dashboard_details.get("user") and dashboard_details["user"].get("email"):
user = self.metadata.get_user_by_email(
return self.metadata.get_reference_by_email(
dashboard_details["user"].get("email")
)
if user:
return EntityReference(id=user.id.__root__, type="user")
return None

def get_dashboard_url(self, dashboard_details: dict) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,7 @@ def get_dashboard_details(

def _get_user_by_email(self, email: Optional[str]) -> Optional[EntityReference]:
if email:
user = self.metadata.get_user_by_email(email)
if user:
return EntityReference(id=user.id.__root__, type="user")

return self.metadata.get_reference_by_email(email)
return None

def get_owner_details(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,7 @@ def get_owner_details(
) -> Optional[EntityReference]:
"""Get dashboard owner from email"""
if dashboard_details.owner and dashboard_details.owner.email:
user = self.metadata.get_user_by_email(dashboard_details.owner.email)
if user:
return EntityReference(id=user.id.__root__, type="user")
return self.metadata.get_reference_by_email(dashboard_details.owner.email)
return None

def yield_tag(self, *_, **__) -> Iterable[Either[OMetaTagAndClassification]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,7 @@ def get_owners(self, owner: Owner) -> Optional[EntityReference]:
try:
owner_details = User(**self.domo_client.users_get(owner.id))
if owner_details.email:
user = self.metadata.get_user_by_email(owner_details.email)
if user:
return EntityReference(id=user.id.__root__, type="user")
return self.metadata.get_reference_by_email(owner_details.email)
except Exception as exc:
logger.warning(f"Error while getting details of user {owner.name} - {exc}")
return None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1054,9 +1054,9 @@ def ingest_pipelines(self) -> Iterable[Either[Pipeline]]:
for pipeline in self.pipelines["pipelines"]:
owner = None
if pipeline.get("owner"):
user = self.metadata.get_user_by_email(email=pipeline.get("owner"))
if user:
owner = EntityReference(id=user.id.__root__, type="user")
owner = self.metadata.get_reference_by_email(
email=pipeline.get("owner")
)
pipeline_ev = CreatePipelineRequest(
name=pipeline["name"],
displayName=pipeline["displayName"],
Expand Down
Loading

0 comments on commit b84ce33

Please sign in to comment.