diff --git a/src/databricks/labs/ucx/hive_metastore/workflows.py b/src/databricks/labs/ucx/hive_metastore/workflows.py index a07f8b6746..5a61c1ad37 100644 --- a/src/databricks/labs/ucx/hive_metastore/workflows.py +++ b/src/databricks/labs/ucx/hive_metastore/workflows.py @@ -16,7 +16,7 @@ def convert_managed_table(self, ctx: RuntimeContext): managed_table_external_storage=ctx.config.managed_table_external_storage ) - @job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables, convert_managed_table]) + @job_task(job_cluster="user_isolation", depends_on=[Assessment.crawl_tables, convert_managed_table]) def migrate_external_tables_sync(self, ctx: RuntimeContext): """This workflow task migrates the external tables that are supported by SYNC command from the Hive Metastore to the Unity Catalog. @@ -25,14 +25,14 @@ def migrate_external_tables_sync(self, ctx: RuntimeContext): what=What.EXTERNAL_SYNC, managed_table_external_storage=ctx.config.managed_table_external_storage ) - @job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables, convert_managed_table]) + @job_task(job_cluster="user_isolation", depends_on=[Assessment.crawl_tables, convert_managed_table]) def migrate_dbfs_root_delta_tables(self, ctx: RuntimeContext): """This workflow task migrates delta tables stored in DBFS root from the Hive Metastore to the Unity Catalog using deep clone. """ ctx.tables_migrator.migrate_tables(what=What.DBFS_ROOT_DELTA) - @job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables, convert_managed_table]) + @job_task(job_cluster="user_isolation", depends_on=[Assessment.crawl_tables, convert_managed_table]) def migrate_dbfs_root_non_delta_tables( self, ctx: RuntimeContext, @@ -43,7 +43,7 @@ def migrate_dbfs_root_non_delta_tables( ctx.tables_migrator.migrate_tables(what=What.DBFS_ROOT_NON_DELTA) @job_task( - job_cluster="table_migration", + job_cluster="user_isolation", depends_on=[ Assessment.crawl_tables, migrate_external_tables_sync, @@ -57,7 +57,7 @@ def migrate_views(self, ctx: RuntimeContext): """ ctx.tables_migrator.migrate_tables(what=What.VIEW) - @job_task(job_cluster="table_migration", depends_on=[migrate_views]) + @job_task(job_cluster="user_isolation", depends_on=[migrate_views]) def update_migration_status(self, ctx: RuntimeContext): """Refresh the migration status to present it in the dashboard.""" ctx.tables_migrator.get_remaining_tables() @@ -67,7 +67,7 @@ class MigrateHiveSerdeTablesInPlace(Workflow): def __init__(self): super().__init__('migrate-external-hiveserde-tables-in-place-experimental') - @job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables]) + @job_task(job_cluster="user_isolation", depends_on=[Assessment.crawl_tables]) def migrate_hive_serde_in_place(self, ctx: RuntimeContext): """This workflow task migrates ParquetHiveSerDe, OrcSerde, AvroSerDe tables in place from the Hive Metastore to the Unity Catalog.""" @@ -77,7 +77,7 @@ def migrate_hive_serde_in_place(self, ctx: RuntimeContext): ) @job_task( - job_cluster="table_migration", + job_cluster="user_isolation", depends_on=[Assessment.crawl_tables, migrate_hive_serde_in_place], ) def migrate_views(self, ctx: RuntimeContext): @@ -86,7 +86,7 @@ def migrate_views(self, ctx: RuntimeContext): """ ctx.tables_migrator.migrate_tables(what=What.VIEW) - @job_task(job_cluster="table_migration", depends_on=[migrate_views]) + @job_task(job_cluster="user_isolation", depends_on=[migrate_views]) def update_migration_status(self, ctx: RuntimeContext): """Refresh the migration status to present it in the dashboard.""" ctx.tables_migrator.get_remaining_tables() @@ -96,14 +96,14 @@ class MigrateExternalTablesCTAS(Workflow): def __init__(self): super().__init__('migrate-external-tables-ctas') - @job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables]) + @job_task(job_cluster="user_isolation", depends_on=[Assessment.crawl_tables]) def migrate_other_external_ctas(self, ctx: RuntimeContext): """This workflow task migrates non-SYNC supported and non HiveSerde external tables using CTAS""" ctx.tables_migrator.migrate_tables( what=What.EXTERNAL_NO_SYNC, ) - @job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables]) + @job_task(job_cluster="user_isolation", depends_on=[Assessment.crawl_tables]) def migrate_hive_serde_ctas(self, ctx: RuntimeContext): """This workflow task migrates HiveSerde tables using CTAS""" ctx.tables_migrator.migrate_tables( @@ -111,7 +111,7 @@ def migrate_hive_serde_ctas(self, ctx: RuntimeContext): ) @job_task( - job_cluster="table_migration", + job_cluster="user_isolation", depends_on=[Assessment.crawl_tables, migrate_other_external_ctas, migrate_hive_serde_ctas], ) def migrate_views(self, ctx: RuntimeContext): @@ -120,7 +120,7 @@ def migrate_views(self, ctx: RuntimeContext): """ ctx.tables_migrator.migrate_tables(what=What.VIEW) - @job_task(job_cluster="table_migration", depends_on=[migrate_views]) + @job_task(job_cluster="user_isolation", depends_on=[migrate_views]) def update_migration_status(self, ctx: RuntimeContext): """Refresh the migration status to present it in the dashboard.""" ctx.tables_migrator.get_remaining_tables() @@ -137,7 +137,7 @@ def scan_tables_in_mounts_experimental(self, ctx: RuntimeContext): replacing any existing content that might be present.""" ctx.tables_in_mounts.snapshot(force_refresh=True) - @job_task(job_cluster="table_migration", depends_on=[scan_tables_in_mounts_experimental]) + @job_task(job_cluster="user_isolation", depends_on=[scan_tables_in_mounts_experimental]) def update_migration_status(self, ctx: RuntimeContext): """Refresh the migration status to present it in the dashboard.""" ctx.tables_migrator.get_remaining_tables() @@ -147,12 +147,12 @@ class MigrateTablesInMounts(Workflow): def __init__(self): super().__init__('migrate-tables-in-mounts-experimental') - @job_task(job_cluster="table_migration", depends_on=[ScanTablesInMounts.scan_tables_in_mounts_experimental]) + @job_task(job_cluster="user_isolation", depends_on=[ScanTablesInMounts.scan_tables_in_mounts_experimental]) def migrate_tables_in_mounts_experimental(self, ctx: RuntimeContext): """[EXPERIMENTAL] This workflow migrates `delta tables stored in mount points` to Unity Catalog using a Create Table statement.""" ctx.tables_migrator.migrate_tables(what=What.TABLE_IN_MOUNT) - @job_task(job_cluster="table_migration", depends_on=[migrate_tables_in_mounts_experimental]) + @job_task(job_cluster="user_isolation", depends_on=[migrate_tables_in_mounts_experimental]) def update_migration_status(self, ctx: RuntimeContext): """Refresh the migration status to present it in the dashboard.""" ctx.tables_migrator.get_remaining_tables() diff --git a/src/databricks/labs/ucx/installer/workflows.py b/src/databricks/labs/ucx/installer/workflows.py index ee9466e0c8..60cce9904c 100644 --- a/src/databricks/labs/ucx/installer/workflows.py +++ b/src/databricks/labs/ucx/installer/workflows.py @@ -741,7 +741,7 @@ def _job_cluster_spark_conf(self, cluster_key: str): return spark_conf | conf_from_installation if cluster_key == "tacl": return {"spark.databricks.acl.sqlOnly": "true"} | conf_from_installation - if cluster_key == "table_migration": + if cluster_key == "user_isolation": return {"spark.sql.sources.parallelPartitionDiscovery.parallelism": "200"} | conf_from_installation return conf_from_installation @@ -918,14 +918,13 @@ def _job_clusters(self, names: set[str]): ), ) ) - if "table_migration" in names: - # TODO: rename to "user-isolation", so that we can use it in group migration workflows + if "user_isolation" in names: clusters.append( jobs.JobCluster( - job_cluster_key="table_migration", + job_cluster_key="user_isolation", new_cluster=compute.ClusterSpec( data_security_mode=compute.DataSecurityMode.USER_ISOLATION, - spark_conf=self._job_cluster_spark_conf("table_migration"), + spark_conf=self._job_cluster_spark_conf("user_isolation"), policy_id=self._config.policy_id, autoscale=compute.AutoScale( max_workers=self._config.max_workers, diff --git a/src/databricks/labs/ucx/progress/workflows.py b/src/databricks/labs/ucx/progress/workflows.py index 2646ba9a92..ff7ffbedfb 100644 --- a/src/databricks/labs/ucx/progress/workflows.py +++ b/src/databricks/labs/ucx/progress/workflows.py @@ -23,7 +23,7 @@ class MigrationProgress(Workflow): def __init__(self) -> None: super().__init__('migration-progress-experimental') - @job_task(job_cluster="table_migration") + @job_task(job_cluster="user_isolation") def verify_prerequisites(self, ctx: RuntimeContext) -> None: """Verify the prerequisites for running this job on the table migration cluster are fulfilled. @@ -42,14 +42,14 @@ def crawl_tables(self, ctx: RuntimeContext) -> None: # Step 1 of 3: Just refresh the inventory. ctx.tables_crawler.snapshot(force_refresh=True) - @job_task(depends_on=[verify_prerequisites, crawl_tables], job_cluster="table_migration") + @job_task(depends_on=[verify_prerequisites, crawl_tables], job_cluster="user_isolation") def refresh_table_migration_status(self, ctx: RuntimeContext) -> None: """Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" # Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.) ctx.migration_status_refresher.snapshot(force_refresh=True) @job_task( - depends_on=[verify_prerequisites, crawl_tables, refresh_table_migration_status], job_cluster="table_migration" + depends_on=[verify_prerequisites, crawl_tables, refresh_table_migration_status], job_cluster="user_isolation" ) def update_tables_history_log(self, ctx: RuntimeContext) -> None: """Update the history log with the latest tables inventory snapshot.""" @@ -60,7 +60,7 @@ def update_tables_history_log(self, ctx: RuntimeContext) -> None: tables_snapshot = ctx.tables_crawler.snapshot() history_log.append_inventory_snapshot(tables_snapshot) - @job_task(depends_on=[verify_prerequisites], job_cluster="table_migration") + @job_task(depends_on=[verify_prerequisites], job_cluster="user_isolation") def crawl_udfs(self, ctx: RuntimeContext) -> None: """Iterates over all UDFs in the Hive Metastore of the current workspace and persists their metadata in the table named `$inventory_database.udfs`. This inventory is currently used when scanning securable objects for @@ -69,7 +69,7 @@ def crawl_udfs(self, ctx: RuntimeContext) -> None: udfs_snapshot = ctx.udfs_crawler.snapshot(force_refresh=True) history_log.append_inventory_snapshot(udfs_snapshot) - @job_task(depends_on=[verify_prerequisites, crawl_tables, crawl_udfs], job_cluster="table_migration") + @job_task(depends_on=[verify_prerequisites, crawl_tables, crawl_udfs], job_cluster="user_isolation") def crawl_grants(self, ctx: RuntimeContext) -> None: """Scans all securable objects for permissions that have been assigned: this include database-level permissions, as well permissions directly configured on objects in the (already gathered) table and UDF inventories. The @@ -82,7 +82,7 @@ def crawl_grants(self, ctx: RuntimeContext) -> None: grants_snapshot = ctx.grants_crawler.snapshot(force_refresh=True) history_log.append_inventory_snapshot(grants_snapshot) - @job_task(depends_on=[verify_prerequisites], job_cluster="table_migration") + @job_task(depends_on=[verify_prerequisites], job_cluster="user_isolation") def assess_jobs(self, ctx: RuntimeContext) -> None: """Scans through all the jobs and identifies those that are not compatible with UC. The list of all the jobs is stored in the `$inventory.jobs` table. @@ -97,7 +97,7 @@ def assess_jobs(self, ctx: RuntimeContext) -> None: jobs_snapshot = ctx.jobs_crawler.snapshot(force_refresh=True) history_log.append_inventory_snapshot(jobs_snapshot) - @job_task(depends_on=[verify_prerequisites], job_cluster="table_migration") + @job_task(depends_on=[verify_prerequisites], job_cluster="user_isolation") def assess_clusters(self, ctx: RuntimeContext) -> None: """Scan through all the clusters and identifies those that are not compatible with UC. The list of all the clusters is stored in the`$inventory.clusters` table. @@ -112,7 +112,7 @@ def assess_clusters(self, ctx: RuntimeContext) -> None: clusters_snapshot = ctx.clusters_crawler.snapshot(force_refresh=True) history_log.append_inventory_snapshot(clusters_snapshot) - @job_task(depends_on=[verify_prerequisites], job_cluster="table_migration") + @job_task(depends_on=[verify_prerequisites], job_cluster="user_isolation") def assess_pipelines(self, ctx: RuntimeContext) -> None: """This module scans through all the Pipelines and identifies those pipelines which has Azure Service Principals embedded (who has been given access to the Azure storage accounts via spark configurations) in the pipeline @@ -127,7 +127,7 @@ def assess_pipelines(self, ctx: RuntimeContext) -> None: pipelines_snapshot = ctx.pipelines_crawler.snapshot(force_refresh=True) history_log.append_inventory_snapshot(pipelines_snapshot) - @job_task(depends_on=[verify_prerequisites], job_cluster="table_migration") + @job_task(depends_on=[verify_prerequisites], job_cluster="user_isolation") def crawl_cluster_policies(self, ctx: RuntimeContext) -> None: """This module scans through all the Cluster Policies and get the necessary information @@ -166,7 +166,7 @@ def assess_workflows(self, ctx: RuntimeContext): refresh_table_migration_status, update_tables_history_log, ], - job_cluster="table_migration", + job_cluster="user_isolation", ) def record_workflow_run(self, ctx: RuntimeContext) -> None: """Record the workflow run of this workflow.""" diff --git a/src/databricks/labs/ucx/recon/workflows.py b/src/databricks/labs/ucx/recon/workflows.py index c5d0a53560..130a15b193 100644 --- a/src/databricks/labs/ucx/recon/workflows.py +++ b/src/databricks/labs/ucx/recon/workflows.py @@ -6,7 +6,7 @@ class MigrationRecon(Workflow): def __init__(self): super().__init__('migrate-data-reconciliation') - @job_task(job_cluster="table_migration") + @job_task(job_cluster="user_isolation") def recon_migration_result(self, ctx: RuntimeContext): """This workflow validate post-migration datasets against their pre-migration counterparts. This includes all tables, by comparing their schema, row counts and row comparison diff --git a/src/databricks/labs/ucx/workspace_access/workflows.py b/src/databricks/labs/ucx/workspace_access/workflows.py index 33953c0a8d..3aa0cf1a57 100644 --- a/src/databricks/labs/ucx/workspace_access/workflows.py +++ b/src/databricks/labs/ucx/workspace_access/workflows.py @@ -11,7 +11,7 @@ class LegacyGroupMigration(Workflow): def __init__(self): super().__init__('migrate-groups-legacy') - @job_task(job_cluster="table_migration") + @job_task(job_cluster="user_isolation") def verify_metastore_attached(self, ctx: RuntimeContext): """Verifies if a metastore is attached to this workspace. If not, the workflow will fail. @@ -72,7 +72,7 @@ class PermissionsMigrationAPI(Workflow): def __init__(self): super().__init__('migrate-groups') - @job_task(job_cluster="table_migration") + @job_task(job_cluster="user_isolation") def verify_metastore_attached(self, ctx: RuntimeContext): """Verifies if a metastore is attached to this workspace. If not, the workflow will fail. diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index f50ce1bceb..dbe48d26a9 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -1022,7 +1022,7 @@ def config(self) -> WorkspaceConfig: override_clusters={ "main": default_cluster_id, "tacl": tacl_cluster_id, - "table_migration": table_migration_cluster_id, + "user_isolation": table_migration_cluster_id, }, workspace_start_path=self.installation.install_folder(), renamed_group_prefix=self.renamed_group_prefix, diff --git a/tests/integration/hive_metastore/test_ext_hms.py b/tests/integration/hive_metastore/test_ext_hms.py index f39bd44b91..ce11c690eb 100644 --- a/tests/integration/hive_metastore/test_ext_hms.py +++ b/tests/integration/hive_metastore/test_ext_hms.py @@ -27,7 +27,7 @@ def test_migration_job_ext_hms(ws, installation_ctx, prepare_tables_for_migratio wc, override_clusters={ "main": ext_hms_cluster_id, - "table_migration": ext_hms_cluster_id, + "user_isolation": ext_hms_cluster_id, }, ), extend_prompts={