Skip to content

Commit

Permalink
Use ~/.ucx/state.json to store the state of both dashboards and jobs (
Browse files Browse the repository at this point in the history
#561)

This speeds up (re-)installation on large workspaces with thousands of
configured jobs.
  • Loading branch information
nfx authored Nov 9, 2023
1 parent c6843c6 commit 3b729d1
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 170 deletions.
152 changes: 86 additions & 66 deletions src/databricks/labs/ucx/framework/dashboards.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from databricks.sdk import WorkspaceClient
from databricks.sdk.core import DatabricksError
from databricks.sdk.service import workspace
from databricks.sdk.service.sql import (
AccessControl,
ObjectTypePlural,
Expand All @@ -16,7 +17,8 @@
WidgetOptions,
WidgetPosition,
)
from databricks.sdk.service.workspace import ImportFormat

from databricks.labs.ucx.framework.install_state import InstallState

logger = logging.getLogger(__name__)

Expand All @@ -30,16 +32,8 @@ class SimpleQuery:
widget: dict[str, str]

@property
def query_key(self):
return f"{self.dashboard_ref}_{self.name}:query_id"

@property
def viz_key(self):
return f"{self.dashboard_ref}_{self.name}:viz_id"

@property
def widget_key(self):
return f"{self.dashboard_ref}_{self.name}:widget_id"
def key(self):
return f"{self.dashboard_ref}_{self.name}"

@property
def viz_type(self) -> str:
Expand Down Expand Up @@ -79,6 +73,7 @@ class DashboardFromFiles:
def __init__(
self,
ws: WorkspaceClient,
state: InstallState,
local_folder: Path,
remote_folder: str,
name_prefix: str,
Expand All @@ -91,19 +86,14 @@ def __init__(
self._name_prefix = name_prefix
self._query_text_callback = query_text_callback
self._warehouse_id = warehouse_id
self._state = {}
self._state = state
self._pos = 0

@property
def _query_state(self):
return f"{self._remote_folder}/state.json"

def dashboard_link(self, dashboard_ref: str):
dashboard_id = self._state[f"{dashboard_ref}:dashboard_id"]
dashboard_id = self._state.dashboards[dashboard_ref]
return f"{self._ws.config.host}/sql/dashboards/{dashboard_id}"

def create_dashboards(self) -> dict:
dashboards = {}
queries_per_dashboard = {}
# Iterate over dashboards for each step, represented as first-level folders
step_folders = [f for f in self._local_folder.glob("*") if f.is_dir()]
Expand All @@ -127,9 +117,8 @@ def create_dashboards(self) -> dict:
self._install_viz(query)
self._install_widget(query, dashboard_ref)
queries_per_dashboard[dashboard_ref] = desired_queries
dashboards[dashboard_ref] = self._state[f"{dashboard_ref}:dashboard_id"]
self._store_query_state(queries_per_dashboard)
return dashboards
return self._state.dashboards

def validate(self):
step_folders = [f for f in self._local_folder.glob("*") if f.is_dir()]
Expand All @@ -148,13 +137,13 @@ def validate(self):
raise AssertionError(msg) from err

def _install_widget(self, query: SimpleQuery, dashboard_ref: str):
dashboard_id = self._state[f"{dashboard_ref}:dashboard_id"]
dashboard_id = self._state.dashboards[dashboard_ref]
widget_options = self._get_widget_options(query)
# widgets are cleaned up every dashboard redeploy
widget = self._ws.dashboard_widgets.create(
dashboard_id, widget_options, 1, visualization_id=self._state[query.viz_key]
dashboard_id, widget_options, 1, visualization_id=self._state.viz[query.key]
)
self._state[query.widget_key] = widget.id
self._state.widgets[query.key] = widget.id

def _get_widget_options(self, query: SimpleQuery):
self._pos += 1
Expand All @@ -170,11 +159,12 @@ def _get_widget_options(self, query: SimpleQuery):
)
return widget_options

def _installed_query_state(self):
def _state_pre_v06(self):
try:
self._state = json.load(self._ws.workspace.download(self._query_state))
query_state = f"{self._remote_folder}/state.json"
state = json.load(self._ws.workspace.download(query_state))
to_remove = []
for k, v in self._state.items():
for k, v in state.items():
if k.endswith("dashboard_id"):
continue
if not k.endswith("query_id"):
Expand All @@ -184,50 +174,80 @@ def _installed_query_state(self):
except DatabricksError:
to_remove.append(k)
for key in to_remove:
del self._state[key]
del state[key]
return state
except DatabricksError as err:
if err.error_code != "RESOURCE_DOES_NOT_EXIST":
raise err
self._ws.workspace.mkdirs(self._remote_folder)
return {}
except JSONDecodeError:
logger.warning(f"JSON state file corrupt: {self._query_state}")
self._state = {} # noop
object_info = self._ws.workspace.get_status(self._remote_folder)
return {}

def _remote_folder_object(self) -> workspace.ObjectInfo:
try:
return self._ws.workspace.get_status(self._remote_folder)
except DatabricksError as err:
if err.error_code != "RESOURCE_DOES_NOT_EXIST":
raise err
self._ws.workspace.mkdirs(self._remote_folder)
return self._remote_folder_object()

def _installed_query_state(self):
if not self._state.dashboards:
for k, v in self._state_pre_v06().items():
prefix, suffix = k.split(":", 2)
match suffix:
case "dashboard_id":
self._state.dashboards[prefix] = v
case "query_id":
self._state.queries[prefix] = v
case "viz_id":
self._state.viz[prefix] = v
case "widget_id":
self._state.widgets[prefix] = v
object_info = self._remote_folder_object()
parent = f"folders/{object_info.object_id}"
return parent

def _store_query_state(self, queries: dict[str, list[SimpleQuery]]):
desired_keys = []
for ref, qrs in queries.items():
desired_keys.append(f"{ref}:dashboard_id")
for query in qrs:
desired_keys.append(query.query_key)
desired_keys.append(query.viz_key)
desired_keys.append(query.widget_key)
destructors = {
"query_id": self._ws.queries.delete,
"viz_id": self._ws.query_visualizations.delete,
"widget_id": self._ws.dashboard_widgets.delete,
}
new_state = {}
for k, v in self._state.items():
if k in desired_keys:
new_state[k] = v
continue
name = k if ":" not in k else k.split(":")[-1]
if name not in destructors:
continue
def _store_query_state(self, queries_per_dashboard: dict[str, list[SimpleQuery]]):
query_refs = set()
dashboard_refs = queries_per_dashboard.keys()
for queries in queries_per_dashboard.values():
for query in queries:
query_refs.add(query.key)

def silent_destroy(fn, object_id):
try:
destructors[name](v)
fn(object_id)
except DatabricksError as err:
logger.info(f"Failed to delete {name}-{v} --- {err.error_code}")
state_dump = json.dumps(new_state, indent=2).encode("utf8")
self._ws.workspace.upload(self._query_state, state_dump, format=ImportFormat.AUTO, overwrite=True)
logger.info(f"Failed to delete {object_id} --- {err.error_code}")

for ref, object_id in self._state.dashboards.items():
if ref in dashboard_refs:
continue
silent_destroy(self._ws.dashboards.delete, object_id)

for ref, object_id in self._state.queries.items():
if ref in query_refs:
continue
silent_destroy(self._ws.queries.delete, object_id)

for ref, object_id in self._state.viz.items():
if ref in query_refs:
continue
silent_destroy(self._ws.query_visualizations.delete, object_id)

for ref, object_id in self._state.widgets.items():
if ref in query_refs:
continue
silent_destroy(self._ws.dashboard_widgets.delete, object_id)

self._state.save()

def _install_dashboard(self, dashboard_name: str, parent_folder_id: str, dashboard_ref: str):
dashboard_id = f"{dashboard_ref}:dashboard_id"
if dashboard_id in self._state:
for widget in self._ws.dashboards.get(self._state[dashboard_id]).widgets:
if dashboard_ref in self._state.dashboards:
for widget in self._ws.dashboards.get(self._state.dashboards[dashboard_ref]).widgets:
self._ws.dashboard_widgets.delete(widget.id)
return
dash = self._ws.dashboards.create(dashboard_name, run_as_role=RunAsRole.VIEWER, parent=parent_folder_id)
Expand All @@ -236,7 +256,7 @@ def _install_dashboard(self, dashboard_name: str, parent_folder_id: str, dashboa
dash.id,
access_control_list=[AccessControl(group_name="users", permission_level=PermissionLevel.CAN_VIEW)],
)
self._state[dashboard_id] = dash.id
self._state.dashboards[dashboard_ref] = dash.id

def _desired_queries(self, local_folder: Path, dashboard_ref: str) -> list[SimpleQuery]:
desired_queries = []
Expand All @@ -257,10 +277,10 @@ def _desired_queries(self, local_folder: Path, dashboard_ref: str) -> list[Simpl

def _install_viz(self, query: SimpleQuery):
viz_args = self._get_viz_options(query)
if query.viz_key in self._state:
return self._ws.query_visualizations.update(self._state[query.viz_key], **viz_args)
viz = self._ws.query_visualizations.create(self._state[query.query_key], **viz_args)
self._state[query.viz_key] = viz.id
if query.key in self._state.viz:
return self._ws.query_visualizations.update(self._state.viz[query.key], **viz_args)
viz = self._ws.query_visualizations.create(self._state.queries[query.key], **viz_args)
self._state.viz[query.key] = viz.id

def _get_viz_options(self, query: SimpleQuery):
viz_types = {"table": self._table_viz_args, "counter": self._counter_viz_args}
Expand All @@ -276,16 +296,16 @@ def _install_query(self, query: SimpleQuery, dashboard_name: str, data_source_id
"name": f"{dashboard_name} - {query.name}",
"query": query.query,
}
if query.query_key in self._state:
return self._ws.queries.update(self._state[query.query_key], **query_meta)
if query.key in self._state.queries:
return self._ws.queries.update(self._state.queries[query.key], **query_meta)

deployed_query = self._ws.queries.create(parent=parent, run_as_role=RunAsRole.VIEWER, **query_meta)
self._ws.dbsql_permissions.set(
ObjectTypePlural.QUERIES,
deployed_query.id,
access_control_list=[AccessControl(group_name="users", permission_level=PermissionLevel.CAN_RUN)],
)
self._state[query.query_key] = deployed_query.id
self._state.queries[query.key] = deployed_query.id

@staticmethod
def _table_viz_args(
Expand Down
45 changes: 45 additions & 0 deletions src/databricks/labs/ucx/framework/install_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import json
import logging
from json import JSONDecodeError

from databricks.sdk import WorkspaceClient
from databricks.sdk.core import DatabricksError
from databricks.sdk.service.workspace import ImportFormat

logger = logging.getLogger(__name__)


class InstallState:
def __init__(self, ws: WorkspaceClient, install_folder: str, version: int = 1):
self._ws = ws
self._state_file = f"{install_folder}/state.json"
self._version = version
self._state = {}

def __getattr__(self, item):
if not self._state:
self._state = self._load()
if item not in self._state["resources"]:
self._state["resources"][item] = {}
return self._state["resources"][item]

def _load(self):
default_state = {"$version": self._version, "resources": {}}
try:
raw = json.load(self._ws.workspace.download(self._state_file))
version = raw.get("$version", None)
if version != self._version:
msg = f"expected state $version={self._version}, got={version}"
raise ValueError(msg)
return raw
except DatabricksError as err:
if err.error_code == "RESOURCE_DOES_NOT_EXIST":
return default_state
raise err
except JSONDecodeError:
logger.warning(f"JSON state file corrupt: {self._state_file}")
return default_state

def save(self):
state_dump = json.dumps(self._state, indent=2).encode("utf8")
self._ws.workspace.upload(self._state_file, state_dump, format=ImportFormat.AUTO, overwrite=True)
Loading

0 comments on commit 3b729d1

Please sign in to comment.