Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing assessment throwing an error when the owner of an object is empty #485

Merged
merged 17 commits into from
Oct 27, 2023
Merged
73 changes: 60 additions & 13 deletions src/databricks/labs/ucx/assessment/crawlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,28 @@
@dataclass
class JobInfo:
job_id: str
job_name: str
creator: str
success: int
failures: str
job_name: str = None
creator: str = None


@dataclass
class ClusterInfo:
cluster_id: str
cluster_name: str
creator: str
success: int
failures: str
cluster_name: str = None
creator: str = None


@dataclass
class PipelineInfo:
pipeline_id: str
pipeline_name: str
creator_name: str
success: int
failures: str
pipeline_name: str = None
creator_name: str = None


@dataclass
Expand All @@ -78,11 +78,11 @@ class AzureServicePrincipalInfo:
@dataclass
class GlobalInitScriptInfo:
script_id: str
script_name: str
created_by: str
enabled: bool
success: int
failures: str
script_name: str = None
created_by: str = None
enabled: bool = None


def _get_init_script_data(w, init_script_info):
Expand Down Expand Up @@ -150,7 +150,19 @@ def _crawl(self) -> list[GlobalInitScriptInfo]:

def _assess_global_init_scripts(self, all_global_init_scripts):
for gis in all_global_init_scripts:
global_init_script_info = GlobalInitScriptInfo(gis.script_id, gis.name, gis.created_by, gis.enabled, 1, "")
if not gis.created_by:
logger.warning(
f"Script {gis.name} have Unknown creator, it means that the original creator has been deleted"
f" and should be re-created"
)
global_init_script_info = GlobalInitScriptInfo(
script_id=gis.script_id,
script_name=gis.name,
created_by=gis.created_by,
enabled=gis.enabled,
success=1,
failures="[]",
)
failures = []
global_init_script = base64.b64decode(self._ws.global_init_scripts.get(gis.script_id).script).decode(
"utf-8"
Expand Down Expand Up @@ -394,7 +406,19 @@ def _crawl(self) -> list[PipelineInfo]:

def _assess_pipelines(self, all_pipelines):
for pipeline in all_pipelines:
pipeline_info = PipelineInfo(pipeline.pipeline_id, pipeline.name, pipeline.creator_user_name, 1, "")
if not pipeline.creator_user_name:
logger.warning(
f"Pipeline {pipeline.name} have Unknown creator, it means that the original creator "
f"has been deleted and should be re-created"
)
pipeline_info = PipelineInfo(
pipeline_id=pipeline.pipeline_id,
pipeline_name=pipeline.name,
creator_name=pipeline.creator_user_name,
success=1,
failures="[]",
)

failures = []
pipeline_config = self._ws.pipelines.get(pipeline.pipeline_id).spec.configuration
if pipeline_config:
Expand Down Expand Up @@ -427,7 +451,18 @@ def _assess_clusters(self, all_clusters):
for cluster in all_clusters:
if cluster.cluster_source == ClusterSource.JOB:
continue
cluster_info = ClusterInfo(cluster.cluster_id, cluster.cluster_name, cluster.creator_user_name, 1, "")
if not cluster.creator_user_name:
logger.warning(
f"Cluster {cluster.cluster_id} have Unknown creator, it means that the original creator "
f"has been deleted and should be re-created"
)
cluster_info = ClusterInfo(
cluster_id=cluster.cluster_id,
cluster_name=cluster.cluster_name,
creator=cluster.creator_user_name,
success=1,
failures="[]",
)
support_status = spark_version_compatibility(cluster.spark_version)
failures = []
if support_status != "supported":
Expand Down Expand Up @@ -527,7 +562,19 @@ def _assess_jobs(self, all_jobs: list[BaseJob], all_clusters_by_id) -> list[JobI
job_details = {}
for job in all_jobs:
job_assessment[job.job_id] = set()
job_details[job.job_id] = JobInfo(str(job.job_id), job.settings.name, job.creator_user_name, 1, "")
if not job.creator_user_name:
logger.warning(
f"Job {job.job_id} have Unknown creator, it means that the original creator has been deleted "
f"and should be re-created"
)

job_details[job.job_id] = JobInfo(
job_id=str(job.job_id),
job_name=job.settings.name,
creator=job.creator_user_name,
success=1,
failures="[]",
)

for job, cluster_config in self._get_cluster_configs_from_all_jobs(all_jobs, all_clusters_by_id):
support_status = spark_version_compatibility(cluster_config.spark_version)
Expand Down
17 changes: 17 additions & 0 deletions src/databricks/labs/ucx/workspace_access/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from databricks.sdk.core import DatabricksError
from databricks.sdk.retries import retried
from databricks.sdk.service import iam, ml
from databricks.sdk.service.iam import PermissionLevel

from databricks.labs.ucx.framework.crawlers import CrawlerBase, SqlBackend
from databricks.labs.ucx.mixins.hardening import rate_limited
Expand Down Expand Up @@ -129,15 +130,31 @@ def _applier_task(self, object_type: str, object_id: str, acl: list[iam.AccessCo

@rate_limited(max_requests=100)
def _crawler_task(self, object_type: str, object_id: str) -> Permissions | None:
objects_with_owner_permission = ["jobs", "pipelines"]

permissions = self._safe_get_permissions(object_type, object_id)
if not permissions:
logger.warning(f"Object {object_type} {object_id} doesn't have any permissions")
return None
if not self._object_have_owner(permissions) and object_type in objects_with_owner_permission:
logger.warning(
f"Object {object_type} {object_id} doesn't have any Owner and cannot be migrated "
f"to account level groups, consider setting a new owner or deleting this object"
)
return None
return Permissions(
object_id=object_id,
object_type=object_type,
raw=json.dumps(permissions.as_dict()),
)

def _object_have_owner(self, permissions: iam.ObjectPermissions | None):
for acl in permissions.access_control_list:
for perm in acl.all_permissions:
if perm.permission_level == PermissionLevel.IS_OWNER:
return True
return False

def _load_as_request(self, object_type: str, object_id: str) -> list[iam.AccessControlRequest]:
loaded = self._safe_get_permissions(object_type, object_id)
if loaded is None:
Expand Down
135 changes: 135 additions & 0 deletions tests/unit/assessment/test_assessment.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import base64
import json
from unittest.mock import Mock

Expand Down Expand Up @@ -26,8 +27,11 @@

from databricks.labs.ucx.assessment.crawlers import (
AzureServicePrincipalCrawler,
ClusterInfo,
ClustersCrawler,
GlobalInitScriptCrawler,
GlobalInitScriptInfo,
JobInfo,
JobsCrawler,
PipelineInfo,
PipelinesCrawler,
Expand Down Expand Up @@ -2664,3 +2668,134 @@ def test_list_all_pipeline_with_conf_spn_secret_avlb(mocker):
assert result_set[0].get("application_id") == "Hello, World!"
assert result_set[0].get("tenant_id") == "directory_12345"
assert result_set[0].get("storage_account") == "newstorageacct"


def test_job_crawler_with_no_owner_should_have_empty_creator_name():
sample_jobs = [
BaseJob(
created_time=1694536604319,
creator_user_name=None,
job_id=536591785949415,
settings=JobSettings(
compute=None,
continuous=None,
tasks=[
Task(
task_key="Ingest",
existing_cluster_id="0807-225846-avon493",
notebook_task=NotebookTask(
notebook_path="/Users/[email protected]/Customers/Example/Test/Load"
),
timeout_seconds=0,
)
],
timeout_seconds=0,
),
)
]

sample_clusters = [
ClusterDetails(
autoscale=AutoScale(min_workers=1, max_workers=6),
spark_context_id=5134472582179566666,
spark_env_vars=None,
spark_version="13.3.x-cpu-ml-scala2.12",
cluster_id="0807-225846-avon493",
cluster_source=ClusterSource.JOB,
)
]
ws = Mock()
mockbackend = MockBackend()
ws.jobs.list.return_value = sample_jobs
ws.clusters.list.return_value = sample_clusters
JobsCrawler(ws, mockbackend, "ucx").snapshot()
result = mockbackend.rows_written_for("hive_metastore.ucx.jobs", "append")
assert result == [JobInfo(job_id="536591785949415", job_name=None, creator=None, success=1, failures="[]")]


def test_cluster_without_owner_should_have_empty_creator_name(mocker):
sample_clusters = [
ClusterDetails(
autoscale=AutoScale(min_workers=1, max_workers=6),
cluster_source=ClusterSource.UI,
spark_context_id=5134472582179565315,
spark_env_vars=None,
spark_version="12.3.x-cpu-ml-scala2.12",
cluster_id="0810-225833-atlanta69",
cluster_name="Tech Summit FY24 Cluster-1",
)
]
ws = mocker.Mock()
mockbackend = MockBackend()

ws.clusters.list.return_value = sample_clusters
ClustersCrawler(ws, mockbackend, "ucx").snapshot()
result = mockbackend.rows_written_for("hive_metastore.ucx.clusters", "append")
assert result == [
ClusterInfo(
cluster_id="0810-225833-atlanta69",
cluster_name="Tech Summit FY24 Cluster-1",
creator=None,
success=1,
failures="[]",
)
]


def test_pipeline_without_owners_should_have_empty_creator_name():
sample_pipelines = [
PipelineStateInfo(
cluster_id=None,
creator_user_name=None,
latest_updates=None,
name="New DLT Pipeline",
pipeline_id="0112eae7-9d11-4b40-a2b8-6c83cb3c7407",
run_as_user_name="[email protected]",
state=PipelineState.IDLE,
)
]

ws = Mock()
ws.pipelines.list_pipelines.return_value = sample_pipelines
ws.pipelines.get().spec.configuration = {}
mockbackend = MockBackend()
PipelinesCrawler(ws, mockbackend, "ucx").snapshot()
result = mockbackend.rows_written_for("hive_metastore.ucx.pipelines", "append")

assert result == [
PipelineInfo(
pipeline_id="0112eae7-9d11-4b40-a2b8-6c83cb3c7407",
pipeline_name="New DLT Pipeline",
creator_name=None,
success=1,
failures="[]",
)
]


def test_init_script_without_config_should_have_empty_creator_name(mocker):
mock_ws = mocker.Mock()
mocker.Mock()
mock_ws.global_init_scripts.list.return_value = [
GlobalInitScriptDetails(
created_at=111,
created_by=None,
enabled=False,
name="newscript",
position=4,
script_id="222",
updated_at=111,
updated_by="[email protected]",
)
]
mock_ws.global_init_scripts.get().script = base64.b64encode(b"hello world")
mockbackend = MockBackend()
crawler = GlobalInitScriptCrawler(mock_ws, mockbackend, schema="ucx")
result = crawler.snapshot()
result = mockbackend.rows_written_for("hive_metastore.ucx.global_init_scripts", "append")

assert result == [
GlobalInitScriptInfo(
script_id="222", script_name="newscript", enabled=False, created_by=None, success=1, failures="[]"
),
]
Loading