Skip to content

Commit

Permalink
changing table_migration to user_isolation (#3389)
Browse files Browse the repository at this point in the history
<!-- REMOVE IRRELEVANT COMMENTS BEFORE CREATING A PULL REQUEST -->
## Changes
Replaced table_migration job cluster to user_isolation job cluster. Its
just a naming change and updated in all placed where there is reference
<!-- Summary of your changes that are easy to understand. Add
screenshots when necessary -->



Resolves #3172
  • Loading branch information
HariGS-DB authored Dec 4, 2024
1 parent 74245ae commit d0275ba
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 35 deletions.
30 changes: 15 additions & 15 deletions src/databricks/labs/ucx/hive_metastore/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def convert_managed_table(self, ctx: RuntimeContext):
managed_table_external_storage=ctx.config.managed_table_external_storage
)

@job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables, convert_managed_table])
@job_task(job_cluster="user_isolation", depends_on=[Assessment.crawl_tables, convert_managed_table])
def migrate_external_tables_sync(self, ctx: RuntimeContext):
"""This workflow task migrates the external tables that are supported by SYNC command from the Hive Metastore
to the Unity Catalog.
Expand All @@ -25,14 +25,14 @@ def migrate_external_tables_sync(self, ctx: RuntimeContext):
what=What.EXTERNAL_SYNC, managed_table_external_storage=ctx.config.managed_table_external_storage
)

@job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables, convert_managed_table])
@job_task(job_cluster="user_isolation", depends_on=[Assessment.crawl_tables, convert_managed_table])
def migrate_dbfs_root_delta_tables(self, ctx: RuntimeContext):
"""This workflow task migrates delta tables stored in DBFS root from the Hive Metastore to the Unity Catalog
using deep clone.
"""
ctx.tables_migrator.migrate_tables(what=What.DBFS_ROOT_DELTA)

@job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables, convert_managed_table])
@job_task(job_cluster="user_isolation", depends_on=[Assessment.crawl_tables, convert_managed_table])
def migrate_dbfs_root_non_delta_tables(
self,
ctx: RuntimeContext,
Expand All @@ -43,7 +43,7 @@ def migrate_dbfs_root_non_delta_tables(
ctx.tables_migrator.migrate_tables(what=What.DBFS_ROOT_NON_DELTA)

@job_task(
job_cluster="table_migration",
job_cluster="user_isolation",
depends_on=[
Assessment.crawl_tables,
migrate_external_tables_sync,
Expand All @@ -57,7 +57,7 @@ def migrate_views(self, ctx: RuntimeContext):
"""
ctx.tables_migrator.migrate_tables(what=What.VIEW)

@job_task(job_cluster="table_migration", depends_on=[migrate_views])
@job_task(job_cluster="user_isolation", depends_on=[migrate_views])
def update_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.get_remaining_tables()
Expand All @@ -67,7 +67,7 @@ class MigrateHiveSerdeTablesInPlace(Workflow):
def __init__(self):
super().__init__('migrate-external-hiveserde-tables-in-place-experimental')

@job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables])
@job_task(job_cluster="user_isolation", depends_on=[Assessment.crawl_tables])
def migrate_hive_serde_in_place(self, ctx: RuntimeContext):
"""This workflow task migrates ParquetHiveSerDe, OrcSerde, AvroSerDe tables in place from
the Hive Metastore to the Unity Catalog."""
Expand All @@ -77,7 +77,7 @@ def migrate_hive_serde_in_place(self, ctx: RuntimeContext):
)

@job_task(
job_cluster="table_migration",
job_cluster="user_isolation",
depends_on=[Assessment.crawl_tables, migrate_hive_serde_in_place],
)
def migrate_views(self, ctx: RuntimeContext):
Expand All @@ -86,7 +86,7 @@ def migrate_views(self, ctx: RuntimeContext):
"""
ctx.tables_migrator.migrate_tables(what=What.VIEW)

@job_task(job_cluster="table_migration", depends_on=[migrate_views])
@job_task(job_cluster="user_isolation", depends_on=[migrate_views])
def update_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.get_remaining_tables()
Expand All @@ -96,22 +96,22 @@ class MigrateExternalTablesCTAS(Workflow):
def __init__(self):
super().__init__('migrate-external-tables-ctas')

@job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables])
@job_task(job_cluster="user_isolation", depends_on=[Assessment.crawl_tables])
def migrate_other_external_ctas(self, ctx: RuntimeContext):
"""This workflow task migrates non-SYNC supported and non HiveSerde external tables using CTAS"""
ctx.tables_migrator.migrate_tables(
what=What.EXTERNAL_NO_SYNC,
)

@job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables])
@job_task(job_cluster="user_isolation", depends_on=[Assessment.crawl_tables])
def migrate_hive_serde_ctas(self, ctx: RuntimeContext):
"""This workflow task migrates HiveSerde tables using CTAS"""
ctx.tables_migrator.migrate_tables(
what=What.EXTERNAL_HIVESERDE,
)

@job_task(
job_cluster="table_migration",
job_cluster="user_isolation",
depends_on=[Assessment.crawl_tables, migrate_other_external_ctas, migrate_hive_serde_ctas],
)
def migrate_views(self, ctx: RuntimeContext):
Expand All @@ -120,7 +120,7 @@ def migrate_views(self, ctx: RuntimeContext):
"""
ctx.tables_migrator.migrate_tables(what=What.VIEW)

@job_task(job_cluster="table_migration", depends_on=[migrate_views])
@job_task(job_cluster="user_isolation", depends_on=[migrate_views])
def update_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.get_remaining_tables()
Expand All @@ -137,7 +137,7 @@ def scan_tables_in_mounts_experimental(self, ctx: RuntimeContext):
replacing any existing content that might be present."""
ctx.tables_in_mounts.snapshot(force_refresh=True)

@job_task(job_cluster="table_migration", depends_on=[scan_tables_in_mounts_experimental])
@job_task(job_cluster="user_isolation", depends_on=[scan_tables_in_mounts_experimental])
def update_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.get_remaining_tables()
Expand All @@ -147,12 +147,12 @@ class MigrateTablesInMounts(Workflow):
def __init__(self):
super().__init__('migrate-tables-in-mounts-experimental')

@job_task(job_cluster="table_migration", depends_on=[ScanTablesInMounts.scan_tables_in_mounts_experimental])
@job_task(job_cluster="user_isolation", depends_on=[ScanTablesInMounts.scan_tables_in_mounts_experimental])
def migrate_tables_in_mounts_experimental(self, ctx: RuntimeContext):
"""[EXPERIMENTAL] This workflow migrates `delta tables stored in mount points` to Unity Catalog using a Create Table statement."""
ctx.tables_migrator.migrate_tables(what=What.TABLE_IN_MOUNT)

@job_task(job_cluster="table_migration", depends_on=[migrate_tables_in_mounts_experimental])
@job_task(job_cluster="user_isolation", depends_on=[migrate_tables_in_mounts_experimental])
def update_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.get_remaining_tables()
9 changes: 4 additions & 5 deletions src/databricks/labs/ucx/installer/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ def _job_cluster_spark_conf(self, cluster_key: str):
return spark_conf | conf_from_installation
if cluster_key == "tacl":
return {"spark.databricks.acl.sqlOnly": "true"} | conf_from_installation
if cluster_key == "table_migration":
if cluster_key == "user_isolation":
return {"spark.sql.sources.parallelPartitionDiscovery.parallelism": "200"} | conf_from_installation
return conf_from_installation

Expand Down Expand Up @@ -918,14 +918,13 @@ def _job_clusters(self, names: set[str]):
),
)
)
if "table_migration" in names:
# TODO: rename to "user-isolation", so that we can use it in group migration workflows
if "user_isolation" in names:
clusters.append(
jobs.JobCluster(
job_cluster_key="table_migration",
job_cluster_key="user_isolation",
new_cluster=compute.ClusterSpec(
data_security_mode=compute.DataSecurityMode.USER_ISOLATION,
spark_conf=self._job_cluster_spark_conf("table_migration"),
spark_conf=self._job_cluster_spark_conf("user_isolation"),
policy_id=self._config.policy_id,
autoscale=compute.AutoScale(
max_workers=self._config.max_workers,
Expand Down
20 changes: 10 additions & 10 deletions src/databricks/labs/ucx/progress/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class MigrationProgress(Workflow):
def __init__(self) -> None:
super().__init__('migration-progress-experimental')

@job_task(job_cluster="table_migration")
@job_task(job_cluster="user_isolation")
def verify_prerequisites(self, ctx: RuntimeContext) -> None:
"""Verify the prerequisites for running this job on the table migration cluster are fulfilled.
Expand All @@ -42,14 +42,14 @@ def crawl_tables(self, ctx: RuntimeContext) -> None:
# Step 1 of 3: Just refresh the inventory.
ctx.tables_crawler.snapshot(force_refresh=True)

@job_task(depends_on=[verify_prerequisites, crawl_tables], job_cluster="table_migration")
@job_task(depends_on=[verify_prerequisites, crawl_tables], job_cluster="user_isolation")
def refresh_table_migration_status(self, ctx: RuntimeContext) -> None:
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not."""
# Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.)
ctx.migration_status_refresher.snapshot(force_refresh=True)

@job_task(
depends_on=[verify_prerequisites, crawl_tables, refresh_table_migration_status], job_cluster="table_migration"
depends_on=[verify_prerequisites, crawl_tables, refresh_table_migration_status], job_cluster="user_isolation"
)
def update_tables_history_log(self, ctx: RuntimeContext) -> None:
"""Update the history log with the latest tables inventory snapshot."""
Expand All @@ -60,7 +60,7 @@ def update_tables_history_log(self, ctx: RuntimeContext) -> None:
tables_snapshot = ctx.tables_crawler.snapshot()
history_log.append_inventory_snapshot(tables_snapshot)

@job_task(depends_on=[verify_prerequisites], job_cluster="table_migration")
@job_task(depends_on=[verify_prerequisites], job_cluster="user_isolation")
def crawl_udfs(self, ctx: RuntimeContext) -> None:
"""Iterates over all UDFs in the Hive Metastore of the current workspace and persists their metadata in the
table named `$inventory_database.udfs`. This inventory is currently used when scanning securable objects for
Expand All @@ -69,7 +69,7 @@ def crawl_udfs(self, ctx: RuntimeContext) -> None:
udfs_snapshot = ctx.udfs_crawler.snapshot(force_refresh=True)
history_log.append_inventory_snapshot(udfs_snapshot)

@job_task(depends_on=[verify_prerequisites, crawl_tables, crawl_udfs], job_cluster="table_migration")
@job_task(depends_on=[verify_prerequisites, crawl_tables, crawl_udfs], job_cluster="user_isolation")
def crawl_grants(self, ctx: RuntimeContext) -> None:
"""Scans all securable objects for permissions that have been assigned: this include database-level permissions,
as well permissions directly configured on objects in the (already gathered) table and UDF inventories. The
Expand All @@ -82,7 +82,7 @@ def crawl_grants(self, ctx: RuntimeContext) -> None:
grants_snapshot = ctx.grants_crawler.snapshot(force_refresh=True)
history_log.append_inventory_snapshot(grants_snapshot)

@job_task(depends_on=[verify_prerequisites], job_cluster="table_migration")
@job_task(depends_on=[verify_prerequisites], job_cluster="user_isolation")
def assess_jobs(self, ctx: RuntimeContext) -> None:
"""Scans through all the jobs and identifies those that are not compatible with UC. The list of all the jobs is
stored in the `$inventory.jobs` table.
Expand All @@ -97,7 +97,7 @@ def assess_jobs(self, ctx: RuntimeContext) -> None:
jobs_snapshot = ctx.jobs_crawler.snapshot(force_refresh=True)
history_log.append_inventory_snapshot(jobs_snapshot)

@job_task(depends_on=[verify_prerequisites], job_cluster="table_migration")
@job_task(depends_on=[verify_prerequisites], job_cluster="user_isolation")
def assess_clusters(self, ctx: RuntimeContext) -> None:
"""Scan through all the clusters and identifies those that are not compatible with UC. The list of all the clusters
is stored in the`$inventory.clusters` table.
Expand All @@ -112,7 +112,7 @@ def assess_clusters(self, ctx: RuntimeContext) -> None:
clusters_snapshot = ctx.clusters_crawler.snapshot(force_refresh=True)
history_log.append_inventory_snapshot(clusters_snapshot)

@job_task(depends_on=[verify_prerequisites], job_cluster="table_migration")
@job_task(depends_on=[verify_prerequisites], job_cluster="user_isolation")
def assess_pipelines(self, ctx: RuntimeContext) -> None:
"""This module scans through all the Pipelines and identifies those pipelines which has Azure Service Principals
embedded (who has been given access to the Azure storage accounts via spark configurations) in the pipeline
Expand All @@ -127,7 +127,7 @@ def assess_pipelines(self, ctx: RuntimeContext) -> None:
pipelines_snapshot = ctx.pipelines_crawler.snapshot(force_refresh=True)
history_log.append_inventory_snapshot(pipelines_snapshot)

@job_task(depends_on=[verify_prerequisites], job_cluster="table_migration")
@job_task(depends_on=[verify_prerequisites], job_cluster="user_isolation")
def crawl_cluster_policies(self, ctx: RuntimeContext) -> None:
"""This module scans through all the Cluster Policies and get the necessary information
Expand Down Expand Up @@ -166,7 +166,7 @@ def assess_workflows(self, ctx: RuntimeContext):
refresh_table_migration_status,
update_tables_history_log,
],
job_cluster="table_migration",
job_cluster="user_isolation",
)
def record_workflow_run(self, ctx: RuntimeContext) -> None:
"""Record the workflow run of this workflow."""
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/recon/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ class MigrationRecon(Workflow):
def __init__(self):
super().__init__('migrate-data-reconciliation')

@job_task(job_cluster="table_migration")
@job_task(job_cluster="user_isolation")
def recon_migration_result(self, ctx: RuntimeContext):
"""This workflow validate post-migration datasets against their pre-migration counterparts. This includes all
tables, by comparing their schema, row counts and row comparison
Expand Down
4 changes: 2 additions & 2 deletions src/databricks/labs/ucx/workspace_access/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class LegacyGroupMigration(Workflow):
def __init__(self):
super().__init__('migrate-groups-legacy')

@job_task(job_cluster="table_migration")
@job_task(job_cluster="user_isolation")
def verify_metastore_attached(self, ctx: RuntimeContext):
"""Verifies if a metastore is attached to this workspace. If not, the workflow will fail.
Expand Down Expand Up @@ -72,7 +72,7 @@ class PermissionsMigrationAPI(Workflow):
def __init__(self):
super().__init__('migrate-groups')

@job_task(job_cluster="table_migration")
@job_task(job_cluster="user_isolation")
def verify_metastore_attached(self, ctx: RuntimeContext):
"""Verifies if a metastore is attached to this workspace. If not, the workflow will fail.
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1022,7 +1022,7 @@ def config(self) -> WorkspaceConfig:
override_clusters={
"main": default_cluster_id,
"tacl": tacl_cluster_id,
"table_migration": table_migration_cluster_id,
"user_isolation": table_migration_cluster_id,
},
workspace_start_path=self.installation.install_folder(),
renamed_group_prefix=self.renamed_group_prefix,
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/hive_metastore/test_ext_hms.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def test_migration_job_ext_hms(ws, installation_ctx, prepare_tables_for_migratio
wc,
override_clusters={
"main": ext_hms_cluster_id,
"table_migration": ext_hms_cluster_id,
"user_isolation": ext_hms_cluster_id,
},
),
extend_prompts={
Expand Down

0 comments on commit d0275ba

Please sign in to comment.