Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate UCX output across workspaces with CLI command #1596

Merged
merged 19 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ See [contributing instructions](CONTRIBUTING.md) to help improve this project.
* [`workflows` command](#workflows-command)
* [`open-remote-config` command](#open-remote-config-command)
* [`installations` command](#installations-command)
* [`report-account-compatibility` command](#report-account-compatibility-command)
* [Metastore related commands](#metastore-related-commands)
* [`show-all-metastores` command](#show-all-metastores-command)
* [`assign-metastore` command](#assign-metastore-command)
Expand Down Expand Up @@ -575,6 +576,30 @@ related to multiple installations of `ucx` on the same workspace.

[[back to top](#databricks-labs-ucx)]


## `report-account-compatibility` command

```text
databricks labs ucx report-account-compatibility --profile labs-azure-account
12:56:09 INFO [databricks.sdk] Using Azure CLI authentication with AAD tokens
12:56:09 INFO [d.l.u.account.aggregate] Generating readiness report
12:56:10 INFO [databricks.sdk] Using Azure CLI authentication with AAD tokens
12:56:10 INFO [databricks.sdk] Using Azure CLI authentication with AAD tokens
12:56:15 INFO [databricks.sdk] Using Azure CLI authentication with AAD tokens
12:56:15 INFO [d.l.u.account.aggregate] Querying Schema ucx
12:56:21 WARN [d.l.u.account.aggregate] Workspace 4045495039142306 does not have UCX installed
12:56:21 INFO [d.l.u.account.aggregate] UC compatibility: 30.303030303030297% (69/99)
12:56:21 INFO [d.l.u.account.aggregate] cluster type not supported : LEGACY_TABLE_ACL: 22 objects
12:56:21 INFO [d.l.u.account.aggregate] cluster type not supported : LEGACY_SINGLE_USER: 24 objects
12:56:21 INFO [d.l.u.account.aggregate] unsupported config: spark.hadoop.javax.jdo.option.ConnectionURL: 10 objects
12:56:21 INFO [d.l.u.account.aggregate] Uses azure service principal credentials config in cluster.: 1 objects
12:56:21 INFO [d.l.u.account.aggregate] No isolation shared clusters not supported in UC: 1 objects
12:56:21 INFO [d.l.u.account.aggregate] Data is in DBFS Root: 23 objects
12:56:21 INFO [d.l.u.account.aggregate] Non-DELTA format: UNKNOWN: 5 objects
```

[[back to top](#databricks-labs-ucx)]

# Metastore related commands

These commands are used to assign a Unity Catalog metastore to a workspace. The metastore assignment is a pre-requisite
Expand Down
8 changes: 8 additions & 0 deletions labs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ commands:
is_account_level: true
description: upload workspace config to all workspaces in the account where ucx is installed

- name: report-account-compatibility
is_account_level: true
description: aggregation of UCX output of multiple workspaces in the account.
If --workspace-ids is not provided, it will use all workspaces present in the account.
flags:
- name: workspace-ids
description: List of workspace IDs to create account groups from.

- name: manual-workspace-info
description: only supposed to be run if cannot get admins to run `databricks labs ucx sync-workspace-info`

Expand Down
108 changes: 108 additions & 0 deletions src/databricks/labs/ucx/account/aggregate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import collections
import json
import logging
from collections.abc import Iterable, Callable
from dataclasses import dataclass
from functools import cached_property

from databricks.labs.lsql import Row
from databricks.sdk import WorkspaceClient

from databricks.labs.ucx.account.workspaces import AccountWorkspaces
from databricks.labs.blueprint.installation import NotInstalled

from databricks.labs.ucx.contexts.workspace_cli import WorkspaceContext
from databricks.labs.ucx.hive_metastore.migration_status import MigrationIndex, MigrationStatus
from databricks.labs.ucx.source_code.base import CurrentSessionState
from databricks.labs.ucx.source_code.queries import FromTable

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


@dataclass
class AssessmentObject:
workspace_id: int
object_type: str
object_id: str
failures: list[str]


class AccountAggregate:
def __init__(
self,
account_workspaces: AccountWorkspaces,
workspace_context_factory: Callable[[WorkspaceClient], WorkspaceContext] = WorkspaceContext,
):
self._account_workspaces = account_workspaces
self._workspace_context_factory = workspace_context_factory

@cached_property
def _workspace_contexts(self):
contexts = []
for workspace_client in self._account_workspaces.workspace_clients():
ctx = self._workspace_context_factory(workspace_client)
contexts.append(ctx)
return contexts

def _federated_ucx_query(self, query: str, table_name='objects') -> Iterable[tuple[int, Row]]:
"""Modifies a query with a workspace-specific UCX schema and executes it on each workspace,
yielding a tuple of workspace_id and Row. This means that you don't have to specify a database in the query,
as it will be replaced with the UCX schema for each workspace. Use this method to aggregate results across
all workspaces, where UCX is installed.

At the moment, it's done sequentially, which is theoretically inefficient, but the number of workspaces is
expected to be small. If this becomes a performance bottleneck, we can optimize it later via Threads.strict()
"""
for ctx in self._workspace_contexts:
workspace_id = ctx.workspace_client.get_workspace_id()
try:
# use already existing code to replace tables in the query, assuming that UCX database is in HMS
inventory_database = ctx.config.inventory_database
stub_index = MigrationIndex(
[
MigrationStatus(
src_schema=inventory_database,
src_table=table_name,
dst_catalog='hive_metastore',
dst_schema=inventory_database,
dst_table=table_name,
)
]
)
from_table = FromTable(stub_index, CurrentSessionState(schema=inventory_database))
logger.info(f"Querying Schema {inventory_database}")

workspace_specific_query = from_table.apply(query)
for row in ctx.sql_backend.fetch(workspace_specific_query):
yield workspace_id, row
except NotInstalled:
logger.warning(f"Workspace {workspace_id} does not have UCX installed")

Check warning on line 80 in src/databricks/labs/ucx/account/aggregate.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/account/aggregate.py#L79-L80

Added lines #L79 - L80 were not covered by tests

@cached_property
def _aggregate_objects(self) -> list[AssessmentObject]:
objects = []
# view is defined in src/databricks/labs/ucx/queries/views/objects.sql
for workspace_id, row in self._federated_ucx_query('SELECT * FROM objects'):
objects.append(AssessmentObject(workspace_id, row.object_type, row.object_id, json.loads(row.failures)))
return objects

def readiness_report(self):
logger.info("Generating readiness report")
all_objects = 0
incompatible_objects = 0
failures = collections.defaultdict(list)

for obj in self._aggregate_objects:
all_objects += 1
has_failures = False
for failure in obj.failures:
failures[failure].append(obj)
has_failures = True
if has_failures:
incompatible_objects += 1
compatibility = (1 - incompatible_objects / all_objects if all_objects > 0 else 0) * 100
logger.info(f"UC compatibility: {compatibility}% ({incompatible_objects}/{all_objects})")

for failure, objects in failures.items():
logger.info(f"{failure}: {len(objects)} objects")
7 changes: 6 additions & 1 deletion src/databricks/labs/ucx/account/workspaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
self._include_workspace_ids = include_workspace_ids if include_workspace_ids else []

def _workspaces(self):
return self._ac.workspaces.list()
for workspace in self._ac.workspaces.list():
if self._include_workspace_ids and workspace.workspace_id not in self._include_workspace_ids:
logger.debug(f"Skipping {workspace.workspace_name} ({workspace.workspace_id}): not in include list")
continue

Check warning on line 24 in src/databricks/labs/ucx/account/workspaces.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/account/workspaces.py#L23-L24

Added lines #L23 - L24 were not covered by tests
yield workspace

def client_for(self, workspace: Workspace) -> WorkspaceClient:
return self._ac.get_workspace_client(workspace)
Expand All @@ -28,6 +32,7 @@
Return a list of WorkspaceClient for each configured workspace in the account
:return: list[WorkspaceClient]
"""
# TODO: move _can_administer() from account installer over here
if workspaces is None:
workspaces = self._workspaces()
clients = []
Expand Down
8 changes: 8 additions & 0 deletions src/databricks/labs/ucx/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@
ctx.account_workspaces.sync_workspace_info()


@ucx.command(is_account=True)
def report_account_compatibility(a: AccountClient, ctx: AccountContext | None = None, **named_parameters):
"""upload workspace config to all workspaces in the account where ucx is installed"""
if not ctx:
ctx = AccountContext(a, named_parameters)
ctx.account_aggregate.readiness_report()

Check warning on line 98 in src/databricks/labs/ucx/cli.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/cli.py#L97-L98

Added lines #L97 - L98 were not covered by tests


@ucx.command(is_account=True)
def create_account_groups(
a: AccountClient,
Expand Down
6 changes: 6 additions & 0 deletions src/databricks/labs/ucx/contexts/account_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from databricks.sdk import AccountClient


from databricks.labs.ucx.account.aggregate import AccountAggregate
from databricks.labs.ucx.account.metastores import AccountMetastores
from databricks.labs.ucx.account.workspaces import AccountWorkspaces
from databricks.labs.ucx.contexts.application import CliContext
Expand All @@ -24,6 +26,10 @@
def account_workspaces(self):
return AccountWorkspaces(self.account_client, self.workspace_ids)

@cached_property
def account_aggregate(self):
return AccountAggregate(self.account_workspaces)

Check warning on line 31 in src/databricks/labs/ucx/contexts/account_cli.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/contexts/account_cli.py#L31

Added line #L31 was not covered by tests

@cached_property
def account_metastores(self):
return AccountMetastores(self.account_client)
7 changes: 5 additions & 2 deletions src/databricks/labs/ucx/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from typing import Any

import databricks.sdk.errors
from databricks.labs.blueprint.entrypoint import get_logger
from databricks.labs.blueprint.entrypoint import get_logger, is_in_debug
from databricks.labs.blueprint.installation import Installation, SerdeError
from databricks.labs.blueprint.installer import InstallState
from databricks.labs.blueprint.parallel import ManyError, Threads
Expand Down Expand Up @@ -612,6 +612,7 @@ def _get_safe_account_client(self) -> AccountClient:
return AccountClient(host=host, account_id=account_id, product="ucx", product_version=__version__)

def _can_administer(self, workspace: Workspace):
# TODO: move to AccountWorkspaces
try:
# check if user is a workspace admin
ws = self.account_client.get_workspace_client(workspace)
Expand All @@ -633,6 +634,7 @@ def _get_accessible_workspaces(self):
"""
Get all workspaces that the user has access to
"""
# TODO: move to AccountWorkspaces
accessible_workspaces = []
for workspace in self.account_client.workspaces.list():
if self._can_administer(workspace):
Expand Down Expand Up @@ -684,7 +686,8 @@ def install_on_account(self):

if __name__ == "__main__":
logger = get_logger(__file__)

if is_in_debug():
logging.getLogger('databricks').setLevel(logging.DEBUG)
env = dict(os.environ.items())
force_install = env.get("UCX_FORCE_INSTALL")
if force_install == "account":
Expand Down
79 changes: 79 additions & 0 deletions tests/unit/account/test_aggregate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import logging
from unittest.mock import create_autospec
from databricks.sdk import Workspace, WorkspaceClient
from databricks.labs.ucx.account.aggregate import AccountAggregate
from databricks.labs.ucx.account.workspaces import AccountWorkspaces

from databricks.labs.ucx.config import WorkspaceConfig
from databricks.labs.ucx.contexts.workspace_cli import WorkspaceContext
from databricks.sdk.service import sql


def test_basic_readiness_report_no_workspaces(acc_client, caplog):
account_ws = AccountWorkspaces(acc_client)
account_aggregate_obj = AccountAggregate(account_ws)

with caplog.at_level(logging.INFO):
account_aggregate_obj.readiness_report()

assert 'UC compatibility' in caplog.text


def test_readiness_report_ucx_installed(acc_client, caplog):
account_ws = AccountWorkspaces(acc_client)
acc_client.workspaces.list.return_value = [
Workspace(workspace_name="foo", workspace_id=123, workspace_status_message="Running", deployment_name="abc")
]

ws = create_autospec(WorkspaceClient)
acc_client.get_workspace_client.return_value = ws
ws.statement_execution.execute_statement.return_value = sql.ExecuteStatementResponse(
status=sql.StatementStatus(state=sql.StatementState.SUCCEEDED),
result=sql.ResultData(
data_array=[
[
"jobs",
"32432123",
"""["cluster type not supported : LEGACY_TABLE_ACL",
"cluster type not supported : LEGACY_SINGLE_USER"]""",
],
[
"jobs",
"234234234",
"""["cluster type not supported : LEGACY_SINGLE_USER"]""",
],
[
"clusters",
"21312312",
"""[]""",
],
[
"tables",
"34234324",
"""["listTables returned null"]""",
],
],
row_count=2,
),
manifest=sql.ResultManifest(
schema=sql.ResultSchema(
columns=[
sql.ColumnInfo(name="object_type", type_name=sql.ColumnInfoTypeName.STRING),
sql.ColumnInfo(name="object_id", type_name=sql.ColumnInfoTypeName.STRING),
sql.ColumnInfo(name="failures", type_name=sql.ColumnInfoTypeName.STRING),
],
column_count=3,
)
),
statement_id='123',
)

ctx = WorkspaceContext(ws).replace(config=WorkspaceConfig(inventory_database="something", warehouse_id="1234"))
account_aggregate_obj = AccountAggregate(account_ws, workspace_context_factory=lambda _: ctx)

with caplog.at_level(logging.INFO):
account_aggregate_obj.readiness_report()

assert 'UC compatibility: 25.0% (3/4)' in caplog.text
assert 'cluster type not supported : LEGACY_TABLE_ACL: 1 objects' in caplog.text
assert 'cluster type not supported : LEGACY_SINGLE_USER: 2 objects' in caplog.text