diff --git a/src/databricks/labs/ucx/assessment/crawlers.py b/src/databricks/labs/ucx/assessment/crawlers.py index 75f44e9cb4..89b01fcd7b 100644 --- a/src/databricks/labs/ucx/assessment/crawlers.py +++ b/src/databricks/labs/ucx/assessment/crawlers.py @@ -37,28 +37,28 @@ @dataclass class JobInfo: job_id: str - job_name: str - creator: str success: int failures: str + job_name: str = None + creator: str = None @dataclass class ClusterInfo: cluster_id: str - cluster_name: str - creator: str success: int failures: str + cluster_name: str = None + creator: str = None @dataclass class PipelineInfo: pipeline_id: str - pipeline_name: str - creator_name: str success: int failures: str + pipeline_name: str = None + creator_name: str = None @dataclass @@ -78,11 +78,11 @@ class AzureServicePrincipalInfo: @dataclass class GlobalInitScriptInfo: script_id: str - script_name: str - created_by: str - enabled: bool success: int failures: str + script_name: str = None + created_by: str = None + enabled: bool = None def _get_init_script_data(w, init_script_info): @@ -150,7 +150,19 @@ def _crawl(self) -> list[GlobalInitScriptInfo]: def _assess_global_init_scripts(self, all_global_init_scripts): for gis in all_global_init_scripts: - global_init_script_info = GlobalInitScriptInfo(gis.script_id, gis.name, gis.created_by, gis.enabled, 1, "") + if not gis.created_by: + logger.warning( + f"Script {gis.name} have Unknown creator, it means that the original creator has been deleted" + f" and should be re-created" + ) + global_init_script_info = GlobalInitScriptInfo( + script_id=gis.script_id, + script_name=gis.name, + created_by=gis.created_by, + enabled=gis.enabled, + success=1, + failures="[]", + ) failures = [] global_init_script = base64.b64decode(self._ws.global_init_scripts.get(gis.script_id).script).decode( "utf-8" @@ -394,7 +406,19 @@ def _crawl(self) -> list[PipelineInfo]: def _assess_pipelines(self, all_pipelines): for pipeline in all_pipelines: - pipeline_info = PipelineInfo(pipeline.pipeline_id, pipeline.name, pipeline.creator_user_name, 1, "") + if not pipeline.creator_user_name: + logger.warning( + f"Pipeline {pipeline.name} have Unknown creator, it means that the original creator " + f"has been deleted and should be re-created" + ) + pipeline_info = PipelineInfo( + pipeline_id=pipeline.pipeline_id, + pipeline_name=pipeline.name, + creator_name=pipeline.creator_user_name, + success=1, + failures="[]", + ) + failures = [] pipeline_config = self._ws.pipelines.get(pipeline.pipeline_id).spec.configuration if pipeline_config: @@ -427,7 +451,18 @@ def _assess_clusters(self, all_clusters): for cluster in all_clusters: if cluster.cluster_source == ClusterSource.JOB: continue - cluster_info = ClusterInfo(cluster.cluster_id, cluster.cluster_name, cluster.creator_user_name, 1, "") + if not cluster.creator_user_name: + logger.warning( + f"Cluster {cluster.cluster_id} have Unknown creator, it means that the original creator " + f"has been deleted and should be re-created" + ) + cluster_info = ClusterInfo( + cluster_id=cluster.cluster_id, + cluster_name=cluster.cluster_name, + creator=cluster.creator_user_name, + success=1, + failures="[]", + ) support_status = spark_version_compatibility(cluster.spark_version) failures = [] if support_status != "supported": @@ -527,7 +562,19 @@ def _assess_jobs(self, all_jobs: list[BaseJob], all_clusters_by_id) -> list[JobI job_details = {} for job in all_jobs: job_assessment[job.job_id] = set() - job_details[job.job_id] = JobInfo(str(job.job_id), job.settings.name, job.creator_user_name, 1, "") + if not job.creator_user_name: + logger.warning( + f"Job {job.job_id} have Unknown creator, it means that the original creator has been deleted " + f"and should be re-created" + ) + + job_details[job.job_id] = JobInfo( + job_id=str(job.job_id), + job_name=job.settings.name, + creator=job.creator_user_name, + success=1, + failures="[]", + ) for job, cluster_config in self._get_cluster_configs_from_all_jobs(all_jobs, all_clusters_by_id): support_status = spark_version_compatibility(cluster_config.spark_version) diff --git a/src/databricks/labs/ucx/workspace_access/generic.py b/src/databricks/labs/ucx/workspace_access/generic.py index 4a1ecd8e2e..92833a3070 100644 --- a/src/databricks/labs/ucx/workspace_access/generic.py +++ b/src/databricks/labs/ucx/workspace_access/generic.py @@ -11,6 +11,7 @@ from databricks.sdk.core import DatabricksError from databricks.sdk.retries import retried from databricks.sdk.service import iam, ml +from databricks.sdk.service.iam import PermissionLevel from databricks.labs.ucx.framework.crawlers import CrawlerBase, SqlBackend from databricks.labs.ucx.mixins.hardening import rate_limited @@ -129,8 +130,17 @@ def _applier_task(self, object_type: str, object_id: str, acl: list[iam.AccessCo @rate_limited(max_requests=100) def _crawler_task(self, object_type: str, object_id: str) -> Permissions | None: + objects_with_owner_permission = ["jobs", "pipelines"] + permissions = self._safe_get_permissions(object_type, object_id) if not permissions: + logger.warning(f"Object {object_type} {object_id} doesn't have any permissions") + return None + if not self._object_have_owner(permissions) and object_type in objects_with_owner_permission: + logger.warning( + f"Object {object_type} {object_id} doesn't have any Owner and cannot be migrated " + f"to account level groups, consider setting a new owner or deleting this object" + ) return None return Permissions( object_id=object_id, @@ -138,6 +148,13 @@ def _crawler_task(self, object_type: str, object_id: str) -> Permissions | None: raw=json.dumps(permissions.as_dict()), ) + def _object_have_owner(self, permissions: iam.ObjectPermissions | None): + for acl in permissions.access_control_list: + for perm in acl.all_permissions: + if perm.permission_level == PermissionLevel.IS_OWNER: + return True + return False + def _load_as_request(self, object_type: str, object_id: str) -> list[iam.AccessControlRequest]: loaded = self._safe_get_permissions(object_type, object_id) if loaded is None: diff --git a/tests/unit/assessment/test_assessment.py b/tests/unit/assessment/test_assessment.py index 6d7920099d..0519f65ce0 100644 --- a/tests/unit/assessment/test_assessment.py +++ b/tests/unit/assessment/test_assessment.py @@ -1,3 +1,4 @@ +import base64 import json from unittest.mock import Mock @@ -26,8 +27,11 @@ from databricks.labs.ucx.assessment.crawlers import ( AzureServicePrincipalCrawler, + ClusterInfo, ClustersCrawler, GlobalInitScriptCrawler, + GlobalInitScriptInfo, + JobInfo, JobsCrawler, PipelineInfo, PipelinesCrawler, @@ -2664,3 +2668,134 @@ def test_list_all_pipeline_with_conf_spn_secret_avlb(mocker): assert result_set[0].get("application_id") == "Hello, World!" assert result_set[0].get("tenant_id") == "directory_12345" assert result_set[0].get("storage_account") == "newstorageacct" + + +def test_job_crawler_with_no_owner_should_have_empty_creator_name(): + sample_jobs = [ + BaseJob( + created_time=1694536604319, + creator_user_name=None, + job_id=536591785949415, + settings=JobSettings( + compute=None, + continuous=None, + tasks=[ + Task( + task_key="Ingest", + existing_cluster_id="0807-225846-avon493", + notebook_task=NotebookTask( + notebook_path="/Users/foo.bar@databricks.com/Customers/Example/Test/Load" + ), + timeout_seconds=0, + ) + ], + timeout_seconds=0, + ), + ) + ] + + sample_clusters = [ + ClusterDetails( + autoscale=AutoScale(min_workers=1, max_workers=6), + spark_context_id=5134472582179566666, + spark_env_vars=None, + spark_version="13.3.x-cpu-ml-scala2.12", + cluster_id="0807-225846-avon493", + cluster_source=ClusterSource.JOB, + ) + ] + ws = Mock() + mockbackend = MockBackend() + ws.jobs.list.return_value = sample_jobs + ws.clusters.list.return_value = sample_clusters + JobsCrawler(ws, mockbackend, "ucx").snapshot() + result = mockbackend.rows_written_for("hive_metastore.ucx.jobs", "append") + assert result == [JobInfo(job_id="536591785949415", job_name=None, creator=None, success=1, failures="[]")] + + +def test_cluster_without_owner_should_have_empty_creator_name(mocker): + sample_clusters = [ + ClusterDetails( + autoscale=AutoScale(min_workers=1, max_workers=6), + cluster_source=ClusterSource.UI, + spark_context_id=5134472582179565315, + spark_env_vars=None, + spark_version="12.3.x-cpu-ml-scala2.12", + cluster_id="0810-225833-atlanta69", + cluster_name="Tech Summit FY24 Cluster-1", + ) + ] + ws = mocker.Mock() + mockbackend = MockBackend() + + ws.clusters.list.return_value = sample_clusters + ClustersCrawler(ws, mockbackend, "ucx").snapshot() + result = mockbackend.rows_written_for("hive_metastore.ucx.clusters", "append") + assert result == [ + ClusterInfo( + cluster_id="0810-225833-atlanta69", + cluster_name="Tech Summit FY24 Cluster-1", + creator=None, + success=1, + failures="[]", + ) + ] + + +def test_pipeline_without_owners_should_have_empty_creator_name(): + sample_pipelines = [ + PipelineStateInfo( + cluster_id=None, + creator_user_name=None, + latest_updates=None, + name="New DLT Pipeline", + pipeline_id="0112eae7-9d11-4b40-a2b8-6c83cb3c7407", + run_as_user_name="abcde.defgh@databricks.com", + state=PipelineState.IDLE, + ) + ] + + ws = Mock() + ws.pipelines.list_pipelines.return_value = sample_pipelines + ws.pipelines.get().spec.configuration = {} + mockbackend = MockBackend() + PipelinesCrawler(ws, mockbackend, "ucx").snapshot() + result = mockbackend.rows_written_for("hive_metastore.ucx.pipelines", "append") + + assert result == [ + PipelineInfo( + pipeline_id="0112eae7-9d11-4b40-a2b8-6c83cb3c7407", + pipeline_name="New DLT Pipeline", + creator_name=None, + success=1, + failures="[]", + ) + ] + + +def test_init_script_without_config_should_have_empty_creator_name(mocker): + mock_ws = mocker.Mock() + mocker.Mock() + mock_ws.global_init_scripts.list.return_value = [ + GlobalInitScriptDetails( + created_at=111, + created_by=None, + enabled=False, + name="newscript", + position=4, + script_id="222", + updated_at=111, + updated_by="2123l@eee.com", + ) + ] + mock_ws.global_init_scripts.get().script = base64.b64encode(b"hello world") + mockbackend = MockBackend() + crawler = GlobalInitScriptCrawler(mock_ws, mockbackend, schema="ucx") + result = crawler.snapshot() + result = mockbackend.rows_written_for("hive_metastore.ucx.global_init_scripts", "append") + + assert result == [ + GlobalInitScriptInfo( + script_id="222", script_name="newscript", enabled=False, created_by=None, success=1, failures="[]" + ), + ] diff --git a/tests/unit/workspace_access/test_generic.py b/tests/unit/workspace_access/test_generic.py index 8142f882a7..8224c14eca 100644 --- a/tests/unit/workspace_access/test_generic.py +++ b/tests/unit/workspace_access/test_generic.py @@ -4,6 +4,15 @@ from databricks.sdk.core import DatabricksError from databricks.sdk.service import compute, iam, ml +from databricks.sdk.service.compute import ClusterDetails +from databricks.sdk.service.iam import ( + AccessControlResponse, + ObjectPermissions, + Permission, + PermissionLevel, +) +from databricks.sdk.service.jobs import BaseJob +from databricks.sdk.service.pipelines import PipelineStateInfo from databricks.sdk.service.workspace import Language, ObjectInfo, ObjectType from databricks.labs.ucx.mixins.sql import Row @@ -541,3 +550,110 @@ def test_workspace_snapshot(): assert len(result_set) == 2 assert result_set[0] == WorkspaceObjectInfo("NOTEBOOK", "123", "/rootobj/notebook1", "PYTHON") + + +def test_eligibles_assets_with_owner_should_be_accepted(): + ws = MagicMock() + ws.jobs.list.return_value = [BaseJob(job_id=13)] + ws.pipelines.list_pipelines.return_value = [PipelineStateInfo(pipeline_id="12")] + + def perms(object_type: str, object_id: str): + if object_type == "jobs": + return ObjectPermissions( + object_id=object_id, + object_type=object_type, + access_control_list=[ + AccessControlResponse( + group_name="de", all_permissions=[Permission(permission_level=PermissionLevel.IS_OWNER)] + ), + AccessControlResponse( + group_name="ds", all_permissions=[Permission(permission_level=PermissionLevel.CAN_USE)] + ), + ], + ) + elif object_type == "pipelines": + return ObjectPermissions( + object_id=object_id, + object_type=object_type, + access_control_list=[ + AccessControlResponse( + group_name="de", all_permissions=[Permission(permission_level=PermissionLevel.IS_OWNER)] + ), + AccessControlResponse( + group_name="de", all_permissions=[Permission(permission_level=PermissionLevel.CAN_RUN)] + ), + ], + ) + + ws.permissions.get.side_effect = perms + + sup = GenericPermissionsSupport( + ws=ws, + listings=[ + Listing(ws.jobs.list, "job_id", "jobs"), + Listing(ws.pipelines.list_pipelines, "pipeline_id", "pipelines"), + ], + ) + tasks = [] + for executable in list(sup.get_crawler_tasks()): + task = executable() + if task is not None: + tasks.append(task) + assert len(tasks) == 2 + + +def test_eligibles_assets_without_owner_should_be_ignored(): + ws = MagicMock() + ws.clusters.list.return_value = [ClusterDetails(cluster_id="1234")] + ws.jobs.list.return_value = [BaseJob(job_id=13)] + ws.pipelines.list_pipelines.return_value = [PipelineStateInfo(pipeline_id="12")] + + def perms(object_type: str, object_id: str): + if object_type == "clusters": + return ObjectPermissions( + object_id=object_id, + object_type=object_type, + access_control_list=[ + AccessControlResponse( + group_name="de", all_permissions=[Permission(permission_level=PermissionLevel.CAN_USE)] + ) + ], + ) + elif object_type == "pipelines": + return ObjectPermissions( + object_id=object_id, + object_type=object_type, + access_control_list=[ + AccessControlResponse( + group_name="de", all_permissions=[Permission(permission_level=PermissionLevel.CAN_USE)] + ) + ], + ) + elif object_type == "jobs": + return ObjectPermissions( + object_id=object_id, + object_type=object_type, + access_control_list=[ + AccessControlResponse( + group_name="ds", all_permissions=[Permission(permission_level=PermissionLevel.CAN_USE)] + ), + ], + ) + + ws.permissions.get.side_effect = perms + + sup = GenericPermissionsSupport( + ws=ws, + listings=[ + Listing(ws.clusters.list, "cluster_id", "clusters"), + Listing(ws.jobs.list, "job_id", "jobs"), + Listing(ws.pipelines.list_pipelines, "pipeline_id", "pipelines"), + ], + ) + tasks = [] + for executable in list(sup.get_crawler_tasks()): + task = executable() + if task is not None: + tasks.append(task) + assert len(tasks) == 1 + assert tasks[0].object_type == "clusters"