Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support joining an existing collection when installing UCX #1675

Merged
merged 29 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ process can handle exceptions and infer errors from job runs and task runs. The
and wheel runners to the workspace. It can also handle the creation of job tasks for a given task, such as job dashboard tasks, job notebook tasks,
and job wheel tasks. The class handles the installation of UCX, including configuring the workspace, installing necessary libraries, and verifying
the installation, making it easier for users to migrate their workspaces to UCX.
At the end of the installation, the user will be prompted if the current installation needs to join an existing collection (create new collection if none present).
For large organization with many workspaces, grouping workspaces into collection helps in managing UCX migration at collection level (instead of workspaces level)
User should be an account admin to be able to join a collection.

After this, UCX will be installed locally and a number of assets will be deployed in the selected workspace.
These assets are available under the installation folder, i.e. `/Users/<your user>/.ucx/`.
Expand Down
5 changes: 5 additions & 0 deletions src/databricks/labs/ucx/contexts/account_cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from functools import cached_property
from os import environ

from databricks.sdk import AccountClient

Expand Down Expand Up @@ -30,6 +31,10 @@ def account_workspaces(self):
def account_aggregate(self):
return AccountAggregate(self.account_workspaces)

@cached_property
def is_account_install(self):
return environ.get("UCX_FORCE_INSTALL") == "account"

@cached_property
def account_metastores(self):
return AccountMetastores(self.account_client)
77 changes: 76 additions & 1 deletion src/databricks/labs/ucx/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,16 +652,91 @@
# upload the json dump of workspace info in the .ucx folder
ctx.account_workspaces.sync_workspace_info(installed_workspaces)

def join_collection(
self,
current_workspace_id: int,
):
if not self.is_account_install and self.prompts.confirm(
"Do you want to join the current installation to an existing collection?"
):

installed_workspaces: list[Workspace] | None = []
accessible_workspaces: list[Workspace] = []
account_client = self._get_safe_account_client()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would this return if user does not have account admin permission?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesnt fail. The first step of failure if a user is not an account admin would be at the workspace.list() cmd

ctx = AccountContext(account_client)
try:
accessible_workspaces = ctx.account_workspaces.get_accessible_workspaces()
except PermissionDenied:
logger.warning("User doesnt have account admin permission, cant join a collection, skipping...")

Check warning on line 670 in src/databricks/labs/ucx/install.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/install.py#L669-L670

Added lines #L669 - L670 were not covered by tests
collection_workspace = self._get_collection_workspace(accessible_workspaces, account_client)
if collection_workspace is not None:
installed_workspaces = self._sync_collection(collection_workspace, current_workspace_id, account_client)
if installed_workspaces is not None:
ctx.account_workspaces.sync_workspace_info(installed_workspaces)

def _sync_collection(
self,
collection_workspace: Workspace,
current_workspace_id: int,
account_client: AccountClient,
) -> list[Workspace] | None:
installer = self._get_installer(collection_workspace)
installed_workspace_ids = installer.config.installed_workspace_ids
if installed_workspace_ids is None:
installed_workspace_ids = []
logger.warning(
f"Workspace {collection_workspace.deployment_name} does not belong to any existing "
f"collection, creating a new collection"
)
installed_workspace_ids.append(current_workspace_id)
installed_workspaces = []
for account_workspace in account_client.workspaces.list():
if account_workspace.workspace_id in installed_workspace_ids:
installed_workspaces.append(account_workspace)

for installed_workspace in installed_workspaces:
installer = self._get_installer(installed_workspace)
installer.replace_config(installed_workspace_ids=installed_workspace_ids)
return installed_workspaces

def _get_collection_workspace(
self,
accessible_workspaces: list[Workspace],
account_client: AccountClient,
) -> Workspace | None:
installed_workspaces = []
for workspace in accessible_workspaces:
workspace_client = account_client.get_workspace_client(workspace)
workspace_installation = Installation.existing(workspace_client, self.product_info.product_name())
if len(workspace_installation) > 0:
installed_workspaces.append(workspace)

if len(installed_workspaces) == 0:
logger.warning("No existing installation found , setting up new installation without")
return None
workspaces = {
workspace.deployment_name: workspace
for workspace in installed_workspaces
if workspace.deployment_name is not None
}
workspace = self.prompts.choice_from_dict(
"Please select a workspace, the current installation of ucx will be grouped as a "
"collection with the selected workspace",
workspaces,
)
return workspace


if __name__ == "__main__":
logger = get_logger(__file__)
if is_in_debug():
logging.getLogger('databricks').setLevel(logging.DEBUG)
env = dict(os.environ.items())
force_install = env.get("UCX_FORCE_INSTALL")
account_installer = AccountInstaller(AccountClient(product="ucx", product_version=__version__))
if force_install == "account":
account_installer = AccountInstaller(AccountClient(product="ucx", product_version=__version__))
account_installer.install_on_account()
else:
workspace_installer = WorkspaceInstaller(WorkspaceClient(product="ucx", product_version=__version__))
workspace_installer.run()
account_installer.join_collection(workspace_installer.workspace_client.get_workspace_id())
10 changes: 9 additions & 1 deletion tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from databricks.labs.ucx.hive_metastore.locations import Mount, Mounts, ExternalLocation
from databricks.labs.ucx.hive_metastore.mapping import Rule, TableMapping
from databricks.labs.ucx.hive_metastore.tables import Table
from databricks.labs.ucx.install import WorkspaceInstallation, WorkspaceInstaller
from databricks.labs.ucx.install import WorkspaceInstallation, WorkspaceInstaller, AccountInstaller
from databricks.labs.ucx.installer.workflows import WorkflowsDeployment

# pylint: disable-next=unused-wildcard-import,wildcard-import
Expand Down Expand Up @@ -609,6 +609,14 @@ def running_clusters(self):
def installation(self):
return Installation(self.workspace_client, self.product_info.product_name())

@cached_property
def account_client(self):
return AccountClient(product="ucx", product_version=__version__)

@cached_property
def account_installer(self):
return AccountInstaller(self.account_client)

@cached_property
def environ(self) -> dict[str, str]:
return {**os.environ}
Expand Down
21 changes: 21 additions & 0 deletions tests/integration/test_installation.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,3 +442,24 @@ def test_compare_remote_local_install_versions(ws, installation_ctx):
installation_ctx.__dict__.pop("workspace_installer")
installation_ctx.__dict__.pop("prompts")
installation_ctx.workspace_installer.configure()


def test_new_collection(ws, sql_backend, installation_ctx, env_or_skip):
installation_ctx.workspace_installation.run()
workspace_id = installation_ctx.workspace_installer.workspace_client.get_workspace_id()
acc_installer = installation_ctx.account_installer
prompts = MockPrompts(
{
r"Do you want to join the current.*": "yes",
r"Please provide the Databricks account id.*": env_or_skip("DATABRICKS_ACCOUNT_ID"),
r"Please select a workspace, the current installation.*": 0,
}
)
acc_installer.replace(
prompts=prompts,
product_info=installation_ctx.product_info,
)
acc_installer.join_collection(workspace_id)
config = installation_ctx.installation.load(WorkspaceConfig)
workspace_id = installation_ctx.workspace_installer.workspace_client.get_workspace_id()
assert config.installed_workspace_ids == [workspace_id]
20 changes: 18 additions & 2 deletions tests/unit/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import base64
import dataclasses
import io
import json
import logging
import os
import pathlib
from pathlib import Path
from unittest.mock import create_autospec

import yaml
from databricks.labs.blueprint.installation import MockInstallation
from databricks.labs.lsql.backends import MockBackend
from databricks.sdk import WorkspaceClient
Expand All @@ -15,8 +17,8 @@
from databricks.sdk.service.jobs import BaseJob, BaseRun
from databricks.sdk.service.pipelines import GetPipelineResponse, PipelineStateInfo
from databricks.sdk.service.sql import EndpointConfPair
from databricks.sdk.service.workspace import ExportResponse, GetSecretResponse

from databricks.sdk.service.workspace import ExportResponse, GetSecretResponse, ObjectInfo
from databricks.sdk.service import iam
from databricks.labs.ucx.hive_metastore.mapping import TableMapping, TableToMigrate
from databricks.labs.ucx.source_code.graph import SourceContainer
from databricks.labs.ucx.source_code.path_lookup import PathLookup
Expand Down Expand Up @@ -184,11 +186,13 @@ def workspace_client_mock(
secret_exists=True,
):
ws = create_autospec(WorkspaceClient)
ws.current_user.me = lambda: iam.User(user_name="[email protected]", groups=[iam.ComplexValue(display="admins")])
ws.clusters.list.return_value = _id_list(ClusterDetails, cluster_ids)
ws.cluster_policies.list.return_value = _id_list(Policy, policy_ids)
ws.cluster_policies.get = _cluster_policy
ws.pipelines.list_pipelines.return_value = _id_list(PipelineStateInfo, pipeline_ids)
ws.pipelines.get = _pipeline
ws.workspace.get_status = lambda _: ObjectInfo(object_id=123)
ws.jobs.list.return_value = _id_list(BaseJob, job_ids)
ws.jobs.list_runs.return_value = _id_list(BaseRun, jobruns_ids)
ws.warehouses.get_workspace_warehouse_config().data_access_config = _load_list(EndpointConfPair, warehouse_config)
Expand All @@ -197,6 +201,18 @@ def workspace_client_mock(
ws.secrets.get_secret.return_value = GetSecretResponse(key="username", value="SGVsbG8sIFdvcmxkIQ==")
else:
ws.secrets.get_secret = _secret_not_found
download_yaml = yaml.dump(
{
'version': 1,
'inventory_database': 'ucx_exists',
'connect': {
'host': '...',
'token': '...',
},
'installed_workspace_ids': [123, 456],
}
)
ws.workspace.download.return_value = io.StringIO(download_yaml)
return ws


Expand Down
120 changes: 120 additions & 0 deletions tests/unit/test_collection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import io
from unittest.mock import create_autospec

import yaml

from databricks.sdk.service.provisioning import Workspace

from databricks.labs.blueprint.tui import MockPrompts
from databricks.labs.blueprint.wheels import ProductInfo
from databricks.sdk import AccountClient
from databricks.labs.ucx.config import WorkspaceConfig
from databricks.labs.ucx.install import AccountInstaller
from . import workspace_client_mock

PRODUCT_INFO = ProductInfo.from_class(WorkspaceConfig)


def test_join_collection_prompt_no_join():
account_client = create_autospec(AccountClient)
account_installer = AccountInstaller(account_client)
prompts = MockPrompts(
{
r".*PRO or SERVERLESS SQL warehouse.*": "1",
r"Open job overview.*": "no",
r"Do you want to join the current.*": "no",
r".*": "",
}
)
account_installer.replace(
prompts=prompts,
product_info=ProductInfo.for_testing(WorkspaceConfig),
)
account_installer.join_collection(123)
account_client.workspaces.list.assert_not_called()


def test_join_collection_no_sync_called():
account_client = create_autospec(AccountClient)
account_installer = AccountInstaller(account_client)
prompts = MockPrompts(
{
r".*PRO or SERVERLESS SQL warehouse.*": "1",
r"Open job overview.*": "no",
r"Do you want to join the current.*": "yes",
r".*": "",
}
)
account_installer.replace(
prompts=prompts,
product_info=ProductInfo.for_testing(WorkspaceConfig),
)
account_installer.join_collection(123)
account_client.workspaces.list.assert_called()
account_client.get_workspace_client.assert_not_called()


def test_join_collection_join_collection_no_installation_id():
ws = workspace_client_mock()
download_yaml = yaml.dump(
{
'version': 1,
'inventory_database': 'ucx_exists',
'connect': {
'host': '...',
'token': '...',
},
}
)
ws.workspace.download.return_value = io.StringIO(download_yaml)
account_client = create_autospec(AccountClient)
account_client.workspaces.list.return_value = [
Workspace(workspace_id=123, deployment_name="test"),
Workspace(workspace_id=456, deployment_name="test2"),
]
account_client.get_workspace_client.return_value = ws
account_installer = AccountInstaller(account_client)
prompts = MockPrompts(
{
r".*PRO or SERVERLESS SQL warehouse.*": "1",
r"Open job overview.*": "no",
r"Do you want to join the current.*": "yes",
r"Please select a workspace, the current installation.*": 1,
r".*": "",
}
)
account_installer.replace(
prompts=prompts,
product_info=ProductInfo.for_testing(WorkspaceConfig),
)

account_installer.join_collection(456)
ws.workspace.upload.assert_called()
assert ws.workspace.upload.call_count == 1


def test_join_collection_join_collection():
ws = workspace_client_mock()
account_client = create_autospec(AccountClient)
account_client.workspaces.list.return_value = [
Workspace(workspace_id=123, deployment_name="test"),
Workspace(workspace_id=456, deployment_name="test2"),
]
account_client.get_workspace_client.return_value = ws
account_installer = AccountInstaller(account_client)
prompts = MockPrompts(
{
r".*PRO or SERVERLESS SQL warehouse.*": "1",
r"Open job overview.*": "no",
r"Do you want to join the current.*": "yes",
r"Please select a workspace, the current installation.*": 1,
r".*": "",
}
)
account_installer.replace(
prompts=prompts,
product_info=ProductInfo.for_testing(WorkspaceConfig),
)
account_installer.join_collection(789)
ws.workspace.upload.assert_called()
assert ws.workspace.upload.call_count == 2