Skip to content

Commit

Permalink
add installation to all task functions signature in runtime.py
Browse files Browse the repository at this point in the history
  • Loading branch information
qziyuan committed Mar 13, 2024
1 parent cd73d4c commit 23f93e8
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 45 deletions.
5 changes: 1 addition & 4 deletions src/databricks/labs/ucx/framework/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,4 @@ def trigger(*argv):
) as task_logger:
ucx_logger = logging.getLogger("databricks.labs.ucx")
ucx_logger.info(f"UCX v{__version__} After job finishes, see debug logs at {task_logger}")
if current_task.workflow == "migrate-tables":
current_task.fn(cfg, workspace_client, sql_backend, installation)
else:
current_task.fn(cfg, workspace_client, sql_backend)
current_task.fn(cfg, workspace_client, sql_backend, installation)
60 changes: 41 additions & 19 deletions src/databricks/labs/ucx/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def setup_tacl(*_):


@task("assessment", depends_on=[crawl_tables, setup_tacl], job_cluster="tacl")
def crawl_grants(cfg: WorkspaceConfig, _: WorkspaceClient, sql_backend: SqlBackend):
def crawl_grants(cfg: WorkspaceConfig, _: WorkspaceClient, sql_backend: SqlBackend, _installation: Installation):
"""Scans the previously created Delta table named `$inventory_database.tables` and issues a `SHOW GRANTS`
statement for every object to retrieve the permissions it has assigned to it. The permissions include information
such as the _principal_, _action type_, and the _table_ it applies to. This is persisted in the Delta table
Expand All @@ -63,7 +63,9 @@ def crawl_grants(cfg: WorkspaceConfig, _: WorkspaceClient, sql_backend: SqlBacke


@task("assessment", depends_on=[crawl_tables])
def estimate_table_size_for_migration(cfg: WorkspaceConfig, _: WorkspaceClient, sql_backend: SqlBackend):
def estimate_table_size_for_migration(
cfg: WorkspaceConfig, _: WorkspaceClient, sql_backend: SqlBackend, _installation: Installation
):
"""Scans the previously created Delta table named `$inventory_database.tables` and locate tables that cannot be
"synced". These tables will have to be cloned in the migration process.
Assesses the size of these tables and create `$inventory_database.table_size` table to list these sizes.
Expand All @@ -73,7 +75,7 @@ def estimate_table_size_for_migration(cfg: WorkspaceConfig, _: WorkspaceClient,


@task("assessment")
def crawl_mounts(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend):
def crawl_mounts(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend, _installation: Installation):
"""Defines the scope of the _mount points_ intended for migration into Unity Catalog. As these objects are not
compatible with the Unity Catalog paradigm, a key component of the migration process involves transferring them
to Unity Catalog External Locations.
Expand All @@ -85,7 +87,9 @@ def crawl_mounts(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBack


@task("assessment", depends_on=[crawl_mounts, crawl_tables])
def guess_external_locations(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend):
def guess_external_locations(
cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend, _installation: Installation
):
"""Determines the shared path prefixes of all the tables. Specifically, the focus is on identifying locations that
utilize mount points. The goal is to identify the _external locations_ necessary for a successful migration and
store this information in the `$inventory.external_locations` table.
Expand All @@ -99,7 +103,7 @@ def guess_external_locations(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_back


@task("assessment")
def assess_jobs(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend):
def assess_jobs(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend, _installation: Installation):
"""Scans through all the jobs and identifies those that are not compatible with UC. The list of all the jobs is
stored in the `$inventory.jobs` table.
Expand All @@ -114,7 +118,7 @@ def assess_jobs(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBacke


@task("assessment")
def assess_clusters(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend):
def assess_clusters(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend, _installation: Installation):
"""Scan through all the clusters and identifies those that are not compatible with UC. The list of all the clusters
is stored in the`$inventory.clusters` table.
Expand All @@ -129,7 +133,7 @@ def assess_clusters(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlB


@task("assessment")
def assess_pipelines(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend):
def assess_pipelines(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend, _installation: Installation):
"""This module scans through all the Pipelines and identifies those pipelines which has Azure Service Principals
embedded (who has been given access to the Azure storage accounts via spark configurations) in the pipeline
configurations.
Expand All @@ -144,7 +148,9 @@ def assess_pipelines(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: Sql


@task("assessment")
def assess_incompatible_submit_runs(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend):
def assess_incompatible_submit_runs(
cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend, _installation: Installation
):
"""This module scans through all the Submit Runs and identifies those runs which may become incompatible after
the workspace attachment.
Expand All @@ -159,7 +165,9 @@ def assess_incompatible_submit_runs(cfg: WorkspaceConfig, ws: WorkspaceClient, s


@task("assessment", cloud="azure")
def assess_azure_service_principals(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend):
def assess_azure_service_principals(
cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend, _installation: Installation
):
"""This module scans through all the clusters configurations, cluster policies, job cluster configurations,
Pipeline configurations, Warehouse configuration and identifies all the Azure Service Principals who has been
given access to the Azure storage accounts via spark configurations referred in those entities.
Expand All @@ -175,7 +183,9 @@ def assess_azure_service_principals(cfg: WorkspaceConfig, ws: WorkspaceClient, s


@task("assessment")
def assess_global_init_scripts(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend):
def assess_global_init_scripts(
cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend, _installation: Installation
):
"""This module scans through all the global init scripts and identifies if there is an Azure Service Principal
who has been given access to the Azure storage accounts via spark configurations referred in those scripts.
Expand All @@ -186,7 +196,7 @@ def assess_global_init_scripts(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_ba


@task("assessment")
def workspace_listing(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend):
def workspace_listing(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend, _installation: Installation):
"""Scans the workspace for workspace objects. It recursively list all sub directories
and compiles a list of directories, notebooks, files, repos and libraries in the workspace.
Expand All @@ -197,7 +207,7 @@ def workspace_listing(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: Sq


@task("assessment", depends_on=[crawl_grants, workspace_listing])
def crawl_permissions(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend):
def crawl_permissions(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend, _installation: Installation):
"""Scans the workspace-local groups and all their permissions. The list is stored in the `$inventory.permissions`
Delta table.
Expand All @@ -215,7 +225,7 @@ def crawl_permissions(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: Sq


@task("assessment")
def crawl_groups(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend):
def crawl_groups(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend, _installation: Installation):
"""Scans all groups for the local group migration scope"""
group_manager = GroupManager(
sql_backend,
Expand Down Expand Up @@ -270,7 +280,9 @@ def estimates_report(*_):


@task("migrate-groups", depends_on=[crawl_groups])
def rename_workspace_local_groups(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend):
def rename_workspace_local_groups(
cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend, _installation: Installation
):
"""Renames workspace local groups by adding `ucx-renamed-` prefix."""
verify_has_metastore = VerifyHasMetastore(ws)
if verify_has_metastore.verify_metastore():
Expand All @@ -291,7 +303,9 @@ def rename_workspace_local_groups(cfg: WorkspaceConfig, ws: WorkspaceClient, sql


@task("migrate-groups", depends_on=[rename_workspace_local_groups])
def reflect_account_groups_on_workspace(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend):
def reflect_account_groups_on_workspace(
cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend, _installation: Installation
):
"""Adds matching account groups to this workspace. The matching account level group(s) must preexist(s) for this
step to be successful. This process does not create the account level group(s)."""
group_manager = GroupManager(
Expand All @@ -309,7 +323,9 @@ def reflect_account_groups_on_workspace(cfg: WorkspaceConfig, ws: WorkspaceClien


@task("migrate-groups", depends_on=[reflect_account_groups_on_workspace], job_cluster="tacl")
def apply_permissions_to_account_groups(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend):
def apply_permissions_to_account_groups(
cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend, _installation: Installation
):
"""Fourth phase of the workspace-local group migration process. It does the following:
- Assigns the full set of permissions of the original group to the account-level one
Expand Down Expand Up @@ -347,7 +363,9 @@ def apply_permissions_to_account_groups(cfg: WorkspaceConfig, ws: WorkspaceClien


@task("validate-groups-permissions", job_cluster="tacl")
def validate_groups_permissions(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend):
def validate_groups_permissions(
cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend, _installation: Installation
):
"""Validate that all the crawled permissions are applied correctly to the destination groups."""
logger.info("Running validation of permissions applied to destination groups.")
permission_manager = PermissionManager.factory(
Expand All @@ -361,7 +379,9 @@ def validate_groups_permissions(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_b


@task("remove-workspace-local-backup-groups", depends_on=[apply_permissions_to_account_groups])
def delete_backup_groups(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend):
def delete_backup_groups(
cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend, _installation: Installation
):
"""Last step of the group migration process. Removes all workspace-level backup groups, along with their
permissions. Execute this workflow only after you've confirmed that workspace-local migration worked
successfully for all the groups involved."""
Expand All @@ -380,7 +400,9 @@ def delete_backup_groups(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend:


@task("099-destroy-schema")
def destroy_schema(cfg: WorkspaceConfig, _: WorkspaceClient, sql_backend: SqlBackend):
def destroy_schema(
cfg: WorkspaceConfig, _workspace_client: WorkspaceClient, sql_backend: SqlBackend, _installation: Installation
):
"""This _clean-up_ workflow allows to removes the `$inventory` database, with all the inventory tables created by
the previous workflow runs. Use this to reset the entire state and start with the assessment step again."""
sql_backend.execute(f"DROP DATABASE {cfg.inventory_database} CASCADE")
Expand Down
21 changes: 1 addition & 20 deletions tests/unit/framework/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def mock_cfg_response():
return mock_cfg, mock_api_response


def test_trigger_task_of_migrate_tables(mocker, capsys):
def test_trigger_task(mocker, capsys):
# define a mock task that is under "migrate-tables" workflow, which needs 4 parameters including installation
@task("migrate-tables", job_cluster="migration_sync")
def mock_migrate_external_tables_sync(cfg, workspace_client, sql_backend, installation):
Expand All @@ -130,22 +130,3 @@ def mock_migrate_external_tables_sync(cfg, workspace_client, sql_backend, instal
sys.modules["pyspark.sql.session"] = mocker.Mock()
trigger("--config=config.yml", "--task=mock_migrate_external_tables_sync")
assert "This mock task of migrate-tables" in capsys.readouterr().out


def test_trigger_task_of_assessment(mocker, capsys):
# define a mock task that is under "assessment" workflow, which needs 3 parameters
@task("assessment", job_cluster="main")
def mock_crawl_tables(cfg, workspace_client, sql_backend):
"""This mock task of assessment"""
return f"Hello, World! {cfg} {workspace_client} {sql_backend}"

mock_cfg, mock_api_response = mock_cfg_response()

with (
patch('pathlib.Path.open', return_value=mock_cfg),
patch.dict(os.environ, {"DATABRICKS_RUNTIME_VERSION": "14.0"}),
patch("requests.Session.send", return_value=mock_api_response),
):
sys.modules["pyspark.sql.session"] = mocker.Mock()
trigger("--config=config.yml", "--task=mock_crawl_tables")
assert "This mock task of assessment" in capsys.readouterr().out
4 changes: 2 additions & 2 deletions tests/unit/test_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def test_azure_crawler(mocker):
)
ws = create_autospec(WorkspaceClient)
sql_backend = MockBackend()
assess_azure_service_principals(cfg, ws, sql_backend)
assess_azure_service_principals(cfg, ws, sql_backend, MockInstallation())


def test_tasks():
Expand Down Expand Up @@ -92,7 +92,7 @@ def test_runtime_grants(mocker):
cfg = azure_mock_config()
ws = create_autospec(WorkspaceClient)
sql_backend = MockBackend()
crawl_grants(cfg, ws, sql_backend)
crawl_grants(cfg, ws, sql_backend, MockInstallation())

assert "SHOW DATABASES FROM hive_metastore" in sql_backend.queries
assert "SHOW DATABASES" in sql_backend.queries
Expand Down

0 comments on commit 23f93e8

Please sign in to comment.