Skip to content

Commit

Permalink
Add is_partitioned column (#1130)
Browse files Browse the repository at this point in the history
#  Changes
<!-- Summary of your changes that are easy to understand. Add
screenshots when necessary -->

### Linked issues
<!-- DOC: Link issue with a keyword: close, closes, closed, fix, fixes,
fixed, resolve, resolves, resolved. See
https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword
-->

Resolves #871 

Related PR:
- #985

### Functionality 

- [ ] added relevant user documentation
- [ ] added new CLI command
- [ ] modified existing command: `databricks labs ucx ...`
- [ ] added a new workflow
- [ ] modified existing workflow: `...`
- [ ] added a new table
- [X] modified existing table: `ucx.tables`

### Tests
<!-- How is this tested? Please see the checklist below and also
describe any other relevant tests -->

- [X] manually tested
- [ ] added unit tests
- [ ] added integration tests
- [ ] verified on staging environment (screenshot attached)


![image](https://github.com/databrickslabs/ucx/assets/23179165/980bd9be-fc36-405f-b21b-186629097a92)

![image](https://github.com/databrickslabs/ucx/assets/23179165/7e81cd04-a5eb-4944-a43e-d0a89d794bca)
  • Loading branch information
dleiva04 authored and nkvuong committed Mar 27, 2024
1 parent 056c9a1 commit 785affc
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 3 deletions.
1 change: 1 addition & 0 deletions src/databricks/labs/ucx/hive_metastore/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class Table:
upgraded_to: str | None = None

storage_properties: str | None = None
is_partitioned: bool = False

DBFS_ROOT_PREFIXES: typing.ClassVar[list[str]] = [
"/dbfs/",
Expand Down
12 changes: 10 additions & 2 deletions src/databricks/labs/ucx/hive_metastore/tables.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col,lower,upper}
import org.apache.spark.sql.catalyst.TableIdentifier

// must follow the same structure as databricks.labs.ucx.hive_metastore.tables.Table
case class TableDetails(catalog: String, database: String, name: String, object_type: String,
table_format: String, location: String, view_text: String, upgraded_to: String, storage_properties: String)
table_format: String, location: String, view_text: String, upgraded_to: String, storage_properties: String, is_partitioned: Boolean)
// recording error log in the database
case class TableError(catalog: String, database: String, name: String, error: String)

Expand Down Expand Up @@ -56,9 +57,16 @@ def metadataForAllTables(databases: Seq[String], queue: ConcurrentLinkedQueue[Ta
s"$key=$value"
}.mkString("[", ", ", "]")

val partitionColumnNames = try {
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName, Some(databaseName))).partitionColumnNames
} catch {
case e: Exception => null
}
val isPartitioned = if (partitionColumnNames != null && !partitionColumnNames.isEmpty) true else false

Some(TableDetails("hive_metastore", databaseName, tableName, table.tableType.name, table.provider.orNull,
table.storage.locationUri.map(_.toString).orNull, table.viewText.orNull,
upgraded_to match { case Some(target) => target case None => null }, formattedString))
upgraded_to match { case Some(target) => target case None => null }, formattedString, isPartitioned))
}
} catch {
case err: Throwable =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ SELECT CONCAT(tables.`database`, '.', tables.name) AS name,
WHEN size_in_bytes < 100000000 THEN CONCAT(CAST(round(size_in_bytes/1024/1024,2) AS string),"MB")
WHEN size_in_bytes < 100000000000 THEN CONCAT(CAST(round(size_in_bytes/1024/1024/1024,2) AS string),"GB")
ELSE CONCAT(CAST(round(size_in_bytes/1024/1024/1024/1024,2) AS string),"TB")
END AS table_size
END AS table_size,
IF(is_partitioned is true, "Yes", "No") AS is_partitioned
FROM $inventory.tables left outer join $inventory.table_size on
$inventory.tables.catalog = $inventory.table_size.catalog and
$inventory.tables.database = $inventory.table_size.database and
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# pylint: disable=invalid-name

import logging

from databricks.labs.blueprint.installation import Installation
from databricks.sdk import WorkspaceClient

from databricks.labs.ucx.config import WorkspaceConfig

logger = logging.getLogger(__name__)


def upgrade(installation: Installation, ws: WorkspaceClient):
config = installation.load(WorkspaceConfig)
warehouse_id = str(config.warehouse_id)
sql = f"ALTER TABLE {config.inventory_database}.tables ADD COLUMN is_partitioned BOOLEAN"
ws.statement_execution.execute_statement(sql, warehouse_id=warehouse_id)
installation.save(config)
25 changes: 25 additions & 0 deletions tests/integration/test_installation.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import databricks
from databricks.labs.ucx.config import WorkspaceConfig
from databricks.labs.ucx.hive_metastore import TablesCrawler
from databricks.labs.ucx.hive_metastore.mapping import Rule
from databricks.labs.ucx.install import WorkspaceInstallation, WorkspaceInstaller
from databricks.labs.ucx.installer.workflows import WorkflowsInstallation
Expand Down Expand Up @@ -591,3 +592,27 @@ def test_table_migration_job_cluster_override( # pylint: disable=too-many-local
job_id = install_state.resources["jobs"]["migrate-tables"]
for task in ws.jobs.get(job_id).settings.tasks:
assert task.existing_cluster_id == env_or_skip("TEST_USER_ISOLATION_CLUSTER_ID")


@retried(on=[NotFound, TimeoutError], timeout=timedelta(minutes=5))
def test_partitioned_tables(ws, sql_backend, new_installation, inventory_schema, make_schema, make_table):
_, workflows_install = new_installation()

schema = make_schema(catalog_name="hive_metastore")
sql_backend.execute(
f"CREATE TABLE IF NOT EXISTS {schema.full_name}.partitioned_table (column1 string, column2 STRING) PARTITIONED BY (column1)"
)
sql_backend.execute(
f"CREATE TABLE IF NOT EXISTS {schema.full_name}.non_partitioned_table (column1 string, column2 STRING)"
)
workflows_install.run_workflow("assessment")

tables = TablesCrawler(sql_backend, inventory_schema)

all_tables = {}
for table in tables.snapshot():
all_tables[table.key] = table

assert len(all_tables) >= 2
assert all_tables[f"{schema.full_name}.partitioned_table"].is_partitioned is True
assert all_tables[f"{schema.full_name}.non_partitioned_table"].is_partitioned is False

0 comments on commit 785affc

Please sign in to comment.