Skip to content

Commit

Permalink
Add non delta dbfs table migration (What.DBFS_ROOT_NON_DELTA) in migr…
Browse files Browse the repository at this point in the history
…ate_table workflow (#1621)

## Changes

There are 3 workflows for migrating tables.

migrate_tables: covers EXTERNAL_SYNC, DBFS_ROOT_DELTA, VIEW
migrate-external-hiveserde-tables-in-place-experimental: covers
EXTERNAL_HIVESERDE and VIEW
migrate-external-tables-ctas: covers EXTERNAL_NO_SYNC,
EXTERNAL_HIVESERDE and VIEW

- This change adds the remaining scenario DBFS_ROOT_NON_DELTA to the
migrate_tables workflow
 - Also adds AclMigrationWhat.PRINCIPAL to the ACL migration strategy



Resolves #332 

### Functionality 

- [ ] added relevant user documentation
- [ ] added new CLI command
- [ ] modified existing command: `databricks labs ucx ...`
- [ ] added a new workflow
- [X] modified existing workflow: `...`
- [ ] added a new table
- [ ] modified existing table: `...`

### Tests
<!-- How is this tested? Please see the checklist below and also
describe any other relevant tests -->

- [ ] manually tested
- [X] added unit tests
- [X] added integration tests
- [ ] verified on staging environment (screenshot attached)

---------

Co-authored-by: vuong-nguyen <[email protected]>
Co-authored-by: Vuong <[email protected]>
  • Loading branch information
3 people authored May 7, 2024
1 parent 10522a6 commit e3bdd3d
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 23 deletions.
100 changes: 77 additions & 23 deletions src/databricks/labs/ucx/hive_metastore/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,63 @@ def __init__(self):

@job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables])
def migrate_external_tables_sync(self, ctx: RuntimeContext):
"""This workflow task migrates the *external tables that are supported by SYNC command* from the Hive Metastore to the Unity Catalog.
Following cli commands are required to be run before running this task:
- For Azure: `principal-prefix-access`, `create-table-mapping`, `create-uber-principal`, `migrate-credentials`, `migrate-locations`, `create-catalogs-schemas`
- For AWS: TBD
"""This workflow task migrates the external tables that are supported by SYNC command from the Hive Metastore
to the Unity Catalog.
"""
ctx.tables_migrator.migrate_tables(what=What.EXTERNAL_SYNC, acl_strategy=[AclMigrationWhat.LEGACY_TACL])
ctx.tables_migrator.migrate_tables(
what=What.EXTERNAL_SYNC,
acl_strategy=[
AclMigrationWhat.LEGACY_TACL,
AclMigrationWhat.PRINCIPAL,
],
)

@job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables])
def migrate_dbfs_root_delta_tables(self, ctx: RuntimeContext):
"""This workflow task migrates `delta tables stored in DBFS root` from the Hive Metastore to the Unity Catalog using deep clone.
Following cli commands are required to be run before running this task:
- For Azure: `principal-prefix-access`, `create-table-mapping`, `create-uber-principal`, `migrate-credentials`, `migrate-locations`, `create-catalogs-schemas`
- For AWS: TBD
"""This workflow task migrates delta tables stored in DBFS root from the Hive Metastore to the Unity Catalog
using deep clone.
"""
ctx.tables_migrator.migrate_tables(
what=What.DBFS_ROOT_DELTA,
acl_strategy=[
AclMigrationWhat.LEGACY_TACL,
AclMigrationWhat.PRINCIPAL,
],
)

@job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables])
def migrate_dbfs_root_non_delta_tables(self, ctx: RuntimeContext):
"""This workflow task migrates non delta tables stored in DBFS root from the Hive Metastore to the Unity Catalog
using CTAS.
"""
ctx.tables_migrator.migrate_tables(what=What.DBFS_ROOT_DELTA, acl_strategy=[AclMigrationWhat.LEGACY_TACL])
ctx.tables_migrator.migrate_tables(
what=What.DBFS_ROOT_NON_DELTA,
acl_strategy=[
AclMigrationWhat.LEGACY_TACL,
AclMigrationWhat.PRINCIPAL,
],
)

@job_task(
job_cluster="table_migration",
depends_on=[Assessment.crawl_tables, migrate_external_tables_sync, migrate_dbfs_root_delta_tables],
depends_on=[
Assessment.crawl_tables,
migrate_external_tables_sync,
migrate_dbfs_root_delta_tables,
migrate_dbfs_root_non_delta_tables,
],
)
def migrate_views(self, ctx: RuntimeContext):
"""This workflow task migrates views from the Hive Metastore to the Unity Catalog using create view sql statement.
It is dependent on the migration of the tables.
"""This workflow task migrates views from the Hive Metastore to the Unity Catalog using create view sql
statement. It is dependent on the migration of the tables.
"""
ctx.tables_migrator.migrate_tables(what=What.VIEW, acl_strategy=[AclMigrationWhat.LEGACY_TACL])
ctx.tables_migrator.migrate_tables(
what=What.VIEW,
acl_strategy=[
AclMigrationWhat.LEGACY_TACL,
AclMigrationWhat.PRINCIPAL,
],
)

@job_task(job_cluster="table_migration", depends_on=[migrate_views])
def refresh_migration_status(self, ctx: RuntimeContext):
Expand All @@ -53,10 +85,14 @@ def __init__(self):

@job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables])
def migrate_hive_serde_in_place(self, ctx: RuntimeContext):
"""This workflow task migrates ParquetHiveSerDe, OrcSerde, AvroSerDe tables in place from the Hive Metastore to the Unity Catalog."""
"""This workflow task migrates ParquetHiveSerDe, OrcSerde, AvroSerDe tables in place from
the Hive Metastore to the Unity Catalog."""
ctx.tables_migrator.migrate_tables(
what=What.EXTERNAL_HIVESERDE,
acl_strategy=[AclMigrationWhat.LEGACY_TACL],
acl_strategy=[
AclMigrationWhat.LEGACY_TACL,
AclMigrationWhat.PRINCIPAL,
],
mounts_crawler=ctx.mounts_crawler,
hiveserde_in_place_migrate=True,
)
Expand All @@ -69,7 +105,13 @@ def migrate_views(self, ctx: RuntimeContext):
"""This workflow task migrates views from the Hive Metastore to the Unity Catalog using create view sql statement.
It is dependent on the migration of the tables.
"""
ctx.tables_migrator.migrate_tables(what=What.VIEW, acl_strategy=[AclMigrationWhat.LEGACY_TACL])
ctx.tables_migrator.migrate_tables(
what=What.VIEW,
acl_strategy=[
AclMigrationWhat.LEGACY_TACL,
AclMigrationWhat.PRINCIPAL,
],
)

@job_task(job_cluster="table_migration", depends_on=[migrate_views])
def refresh_migration_status(self, ctx: RuntimeContext):
Expand All @@ -88,10 +130,13 @@ def __init__(self):

@job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables])
def migrate_other_external_ctas(self, ctx: RuntimeContext):
"""This workflow task migrates non SYNC supported and non HiveSerde external tables using CTAS"""
"""This workflow task migrates non-SYNC supported and non HiveSerde external tables using CTAS"""
ctx.tables_migrator.migrate_tables(
what=What.EXTERNAL_NO_SYNC,
acl_strategy=[AclMigrationWhat.LEGACY_TACL],
acl_strategy=[
AclMigrationWhat.LEGACY_TACL,
AclMigrationWhat.PRINCIPAL,
],
mounts_crawler=ctx.mounts_crawler,
)

Expand All @@ -100,7 +145,10 @@ def migrate_hive_serde_ctas(self, ctx: RuntimeContext):
"""This workflow task migrates HiveSerde tables using CTAS"""
ctx.tables_migrator.migrate_tables(
what=What.EXTERNAL_HIVESERDE,
acl_strategy=[AclMigrationWhat.LEGACY_TACL],
acl_strategy=[
AclMigrationWhat.LEGACY_TACL,
AclMigrationWhat.PRINCIPAL,
],
mounts_crawler=ctx.mounts_crawler,
)

Expand All @@ -109,10 +157,16 @@ def migrate_hive_serde_ctas(self, ctx: RuntimeContext):
depends_on=[Assessment.crawl_tables, migrate_other_external_ctas, migrate_hive_serde_ctas],
)
def migrate_views(self, ctx: RuntimeContext):
"""This workflow task migrates views from the Hive Metastore to the Unity Catalog using create view sql statement.
It is dependent on the migration of the tables.
"""This workflow task migrates views from the Hive Metastore to the Unity Catalog using create view sql
statement. It is dependent on the migration of the tables.
"""
ctx.tables_migrator.migrate_tables(what=What.VIEW, acl_strategy=[AclMigrationWhat.LEGACY_TACL])
ctx.tables_migrator.migrate_tables(
what=What.VIEW,
acl_strategy=[
AclMigrationWhat.LEGACY_TACL,
AclMigrationWhat.PRINCIPAL,
],
)

@job_task(job_cluster="table_migration", depends_on=[migrate_views])
def refresh_migration_status(self, ctx: RuntimeContext):
Expand Down
3 changes: 3 additions & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,9 @@ def prepare_hiveserde_tables(context, random, schema, table_base_dir) -> dict[st
def prepare_regular_tables(context, external_csv, schema) -> dict[str, TableInfo]:
tables: dict[str, TableInfo] = {
"src_managed_table": context.make_table(schema_name=schema.name),
"src_managed_non_delta_table": context.make_table(
catalog_name=schema.catalog_name, non_delta=True, schema_name=schema.name
),
"src_external_table": context.make_table(schema_name=schema.name, external_csv=external_csv),
}
src_view1_text = f"SELECT * FROM {tables['src_managed_table'].full_name}"
Expand Down
20 changes: 20 additions & 0 deletions tests/unit/hive_metastore/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,26 @@ def test_migrate_dbfs_root_delta_tables(run_workflow):
ctx.workspace_client.catalogs.list.assert_called_once()


def test_migrate_dbfs_root_non_delta_tables(run_workflow):
ctx = run_workflow(TableMigration.migrate_dbfs_root_non_delta_tables)
ctx.workspace_client.catalogs.list.assert_called_once()


def test_migrate_tables_views(run_workflow):
ctx = run_workflow(TableMigration.migrate_views)
ctx.workspace_client.catalogs.list.assert_called()


def test_migrate_hive_serde_in_place(run_workflow):
ctx = run_workflow(MigrateHiveSerdeTablesInPlace.migrate_hive_serde_in_place)
ctx.workspace_client.catalogs.list.assert_called_once()


def test_migrate_hive_serde_views(run_workflow):
ctx = run_workflow(MigrateHiveSerdeTablesInPlace.migrate_views)
ctx.workspace_client.catalogs.list.assert_called()


def test_migrate_other_external_ctas(run_workflow):
ctx = run_workflow(MigrateExternalTablesCTAS.migrate_other_external_ctas)
ctx.workspace_client.catalogs.list.assert_called_once()
Expand All @@ -33,6 +48,11 @@ def test_migrate_hive_serde_ctas(run_workflow):
ctx.workspace_client.catalogs.list.assert_called_once()


def test_migrate_ctas_views(run_workflow):
ctx = run_workflow(MigrateExternalTablesCTAS.migrate_views)
ctx.workspace_client.catalogs.list.assert_called()


@pytest.mark.parametrize(
"workflow", [TableMigration, MigrateHiveSerdeTablesInPlace, MigrateExternalTablesCTAS, MigrateTablesInMounts]
)
Expand Down

0 comments on commit e3bdd3d

Please sign in to comment.