Skip to content

Commit

Permalink
Added 'create-catalogs-schemas' command to prepare destination catalo…
Browse files Browse the repository at this point in the history
…gs and schemas before table migration (#1028)
  • Loading branch information
qziyuan authored Mar 8, 2024
1 parent 9d7bc1e commit 2657f6c
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 1 deletion.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ See [contributing instructions](CONTRIBUTING.md) to help improve this project.
* [`create-table-mapping` command](#create-table-mapping-command)
* [`skip` command](#skip-command)
* [`revert-migrated-tables` command](#revert-migrated-tables-command)
* [`create-catalogs-schemas` command](#create-catalogs-schemas-command)
* [`move` command](#move-command)
* [`alias` command](#alias-command)
* [Cross-workspace installations](#cross-workspace-installations)
Expand Down Expand Up @@ -546,6 +547,16 @@ to debug issues related to table migration.

[[back to top](#databricks-labs-ucx)]

## `create-catalogs-schemas` command

```text
databricks labs ucx create-catalogs-schemas
```
After [`create-table-mapping` command](#create-table-mapping-command) is executed, you can run this command to have the required UC catalogs and schemas created.
This command is supposed to be run before migrating tables to UC.

[[back to top](#databricks-labs-ucx)]

## `move` command

```text
Expand Down
6 changes: 5 additions & 1 deletion labs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,8 @@ commands:
description: List of workspace IDs to create account groups from.

- name: migrate-locations
description: Create UC external locations based on the output of guess_external_locations assessment task.
description: Create UC external locations based on the output of guess_external_locations assessment task.

- name: create-catalogs-schemas
description: Create UC external catalogs and schemas based on the destinations created from create_table_mapping command.
This command is supposed to be run before migrating tables to UC.
10 changes: 10 additions & 0 deletions src/databricks/labs/ucx/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from databricks.labs.ucx.config import WorkspaceConfig
from databricks.labs.ucx.framework.crawlers import StatementExecutionBackend
from databricks.labs.ucx.hive_metastore import ExternalLocations, TablesCrawler
from databricks.labs.ucx.hive_metastore.catalog_schema import CatalogSchema
from databricks.labs.ucx.hive_metastore.mapping import TableMapping
from databricks.labs.ucx.hive_metastore.table_migrate import TableMove, TablesMigrate
from databricks.labs.ucx.install import WorkspaceInstallation
Expand Down Expand Up @@ -370,5 +371,14 @@ def migrate_locations(w: WorkspaceClient, aws_profile: str | None = None):
logger.error("migrate_locations is not yet supported in GCP")


@ucx.command
def create_catalogs_schemas(w: WorkspaceClient):
"""Create UC catalogs and schemas based on the destinations created from create_table_mapping command."""
prompts = Prompts()
installation = Installation.current(w, 'ucx')
catalog_schema = CatalogSchema.for_cli(w, installation, prompts)
catalog_schema.create_catalog_schema()


if __name__ == "__main__":
ucx()
94 changes: 94 additions & 0 deletions src/databricks/labs/ucx/hive_metastore/catalog_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import logging

from databricks.labs.blueprint.installation import Installation
from databricks.labs.blueprint.tui import Prompts
from databricks.sdk import WorkspaceClient

from databricks.labs.ucx.config import WorkspaceConfig
from databricks.labs.ucx.framework.crawlers import StatementExecutionBackend
from databricks.labs.ucx.hive_metastore.mapping import TableMapping

logger = logging.getLogger(__name__)


class CatalogSchema:
def __init__(self, ws: WorkspaceClient, table_mapping: TableMapping, prompts: Prompts):
self._ws = ws
self._table_mapping = table_mapping
self._prompts = prompts

@classmethod
def for_cli(cls, ws: WorkspaceClient, installation: Installation, prompts: Prompts):
config = installation.load(WorkspaceConfig)
sql_backend = StatementExecutionBackend(ws, config.warehouse_id)
table_mapping = TableMapping(installation, ws, sql_backend)
return cls(ws, table_mapping, prompts)

def _list_existing(self) -> tuple[set[str], dict[str, set[str]]]:
"""generate a list of existing UC catalogs and schema."""
logger.info("Listing existing UC catalogs and schemas")
existing_catalogs: set[str] = set()
for catalog_info in self._ws.catalogs.list():
if catalog_info.name:
existing_catalogs.add(catalog_info.name)

existing_schemas: dict[str, set[str]] = {} # catalog -> set[schema]
for catalog in existing_catalogs:
existing_schemas[catalog] = set()
for schema in self._ws.schemas.list(catalog, max_results=0):
if schema.name:
existing_schemas[catalog].add(schema.name)

return existing_catalogs, existing_schemas

def _list_target(self) -> tuple[set[str], dict[str, set[str]]]:
"""generate a list of catalogs and schema to be created from table mappings."""
target_catalogs: set[str] = set()
target_schemas: dict[str, set[str]] = {} # catalog -> set[schema]
table_mappings = self._table_mapping.load()
for mappings in table_mappings:
target_catalog = mappings.catalog_name
target_schema = mappings.dst_schema
target_catalogs.add(target_catalog)
if target_catalog not in target_schemas:
target_schemas[target_catalog] = {target_schema}
continue
target_schemas[target_catalog].add(target_schema)
return target_catalogs, target_schemas

def _prepare(self) -> tuple[set[str], dict[str, set[str]]]:
"""prepare a list of catalogs and schema to be created"""
existing_catalogs, existing_schemas = self._list_existing()
target_catalogs, target_schemas = self._list_target()

logger.info("Preparing a list of UC catalogs and schema to be created")
# filter out existing catalogs and schemas from target catalogs and schemas to be created.
for existing_catalog in existing_catalogs:
if existing_catalog in target_catalogs:
target_catalogs.remove(existing_catalog)

for catalog, schemas in existing_schemas.items():
if catalog in target_schemas:
target_schemas[catalog] = target_schemas[catalog] - schemas
return target_catalogs, target_schemas

def _create(self, catalogs, schemas):
logger.info("Creating UC catalogs and schemas.")
# create catalogs
for catalog_name in catalogs:
catalog_storage = self._prompts.question(
f"Please provide storage location url for catalog:{catalog_name}.", default="metastore"
)
if catalog_storage == "metastore":
self._ws.catalogs.create(catalog_name, comment="Created by UCX")
continue
self._ws.catalogs.create(catalog_name, storage_root=catalog_storage, comment="Created by UCX")

# create schemas
for catalog_name, schema_names in schemas.items():
for schema_name in schema_names:
self._ws.schemas.create(schema_name, catalog_name, comment="Created by UCX")

def create_catalog_schema(self):
candidate_catalogs, candidate_schemas = self._prepare()
self._create(candidate_catalogs, candidate_schemas)
89 changes: 89 additions & 0 deletions tests/unit/hive_metastore/test_catalog_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from unittest.mock import create_autospec

from databricks.labs.blueprint.installation import MockInstallation
from databricks.labs.blueprint.tui import MockPrompts
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import CatalogInfo, SchemaInfo

from databricks.labs.ucx.hive_metastore.catalog_schema import CatalogSchema
from databricks.labs.ucx.hive_metastore.mapping import TableMapping

from ..framework.mocks import MockBackend


def prepare_test(ws, mock_prompts) -> CatalogSchema:
ws.catalogs.list.return_value = [CatalogInfo(name="catalog1")]
ws.schemas.list.return_value = [SchemaInfo(name="schema1")]
backend = MockBackend()
installation = MockInstallation(
{
'mapping.csv': [
{
'catalog_name': 'catalog1',
'dst_schema': 'schema3',
'dst_table': 'table',
'src_schema': 'schema3',
'src_table': 'table',
'workspace_name': 'workspace',
},
{
'catalog_name': 'catalog2',
'dst_schema': 'schema2',
'dst_table': 'table',
'src_schema': 'schema2',
'src_table': 'table',
'workspace_name': 'workspace',
},
{
'catalog_name': 'catalog2',
'dst_schema': 'schema2',
'dst_table': 'table2',
'src_schema': 'schema2',
'src_table': 'table2',
'workspace_name': 'workspace',
},
]
}
)
table_mapping = TableMapping(installation, ws, backend)

return CatalogSchema(ws, table_mapping, mock_prompts)


def test_create():
ws = create_autospec(WorkspaceClient)
mock_prompts = MockPrompts({"Please provide storage location url for catalog: *": "s3://foo/bar"})

catalog_schema = prepare_test(ws, mock_prompts)
catalog_schema.create_catalog_schema()
ws.catalogs.create.assert_called_once_with("catalog2", storage_root="s3://foo/bar", comment="Created by UCX")
ws.schemas.create.assert_any_call("schema2", "catalog2", comment="Created by UCX")
ws.schemas.create.assert_any_call("schema3", "catalog1", comment="Created by UCX")


def test_no_catalog_storage():
ws = create_autospec(WorkspaceClient)
mock_prompts = MockPrompts({"Please provide storage location url for catalog: *": ""})

catalog_schema = prepare_test(ws, mock_prompts)
catalog_schema.create_catalog_schema()
ws.catalogs.create.assert_called_once_with("catalog2", comment="Created by UCX")


def test_for_cli():
ws = create_autospec(WorkspaceClient)
installation = MockInstallation(
{
"config.yml": {
'version': 2,
'inventory_database': 'test',
'connect': {
'host': 'test',
'token': 'test',
},
}
}
)
prompts = MockPrompts({"hello": "world"})
catalog_schema = CatalogSchema.for_cli(ws, installation, prompts)
assert isinstance(catalog_schema, CatalogSchema)
8 changes: 8 additions & 0 deletions tests/unit/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from databricks.labs.ucx.cli import (
alias,
create_account_groups,
create_catalogs_schemas,
create_table_mapping,
create_uber_principal,
ensure_assessment_run,
Expand Down Expand Up @@ -48,6 +49,7 @@ def ws():
),
'/Users/foo/.ucx/state.json': json.dumps({'resources': {'jobs': {'assessment': '123'}}}),
"/Users/foo/.ucx/azure_storage_account_info.csv": "prefix,client_id,principal,privilege,type,directory_id\ntest,test,test,test,Application,test",
"/Users/foo/.ucx/mapping.csv": "workspace_name,catalog_name,src_schema,dst_schema,src_table,dst_table\ntest,test,test,test,test,test",
}

def download(path: str) -> io.StringIO | io.BytesIO:
Expand Down Expand Up @@ -393,3 +395,9 @@ def test_migrate_locations_gcp(ws, caplog):
ws.config.is_gcp = True
migrate_locations(ws)
assert "migrate_locations is not yet supported in GCP" in caplog.messages


def test_create_catalogs_schemas(ws):
with patch("databricks.labs.blueprint.tui.Prompts.question", return_value="s3://test"):
create_catalogs_schemas(ws)
ws.catalogs.list.assert_called_once()

0 comments on commit 2657f6c

Please sign in to comment.