Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement migration sequencing (phase 2) #3009

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 63 additions & 4 deletions src/databricks/labs/ucx/assessment/sequencing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@
from collections import defaultdict
from collections.abc import Iterable
from dataclasses import dataclass, field
from pathlib import Path

from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import DatabricksError
from databricks.sdk.service.jobs import Job, JobCluster, Task

from databricks.labs.ucx.assessment.clusters import ClusterOwnership, ClusterInfo
from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo
from databricks.labs.ucx.framework.owners import AdministratorLocator
from databricks.labs.ucx.source_code.graph import DependencyProblem
from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspaceObjectOwnership
from databricks.labs.ucx.source_code.graph import DependencyGraph, DependencyProblem
from databricks.labs.ucx.source_code.path_lookup import PathLookup


@dataclass
Expand All @@ -39,6 +41,9 @@ class MigrationStep:
required_step_ids: list[int]
"""The step ids that should be completed before this step is started."""

@property
def key(self) -> tuple[str, str]:
return self.object_type, self.object_id

MigrationNodeKey = tuple[str, str]

Expand Down Expand Up @@ -148,8 +153,9 @@ class MigrationSequencer:
Analysing the graph in this case means: computing the migration sequence in `meth:generate_steps`.
"""

def __init__(self, ws: WorkspaceClient, administrator_locator: AdministratorLocator):
def __init__(self, ws: WorkspaceClient, path_lookup: PathLookup, administrator_locator: AdministratorLocator):
self._ws = ws
self._path_lookup = path_lookup
self._admin_locator = administrator_locator
self._counter = itertools.count()
self._nodes: dict[MigrationNodeKey, MigrationNode] = {}
Expand Down Expand Up @@ -220,7 +226,7 @@ def _register_job(self, job: Job) -> MaybeMigrationNode:
problems.append(problem)
return MaybeMigrationNode(job_node, problems)

def _register_workflow_task(self, task: Task, parent: MigrationNode) -> MaybeMigrationNode:
def _register_workflow_task(self, task: Task, parent: MigrationNode, graph: DependencyGraph) -> MaybeMigrationNode:
"""Register a workflow task.

TODO:
Expand Down Expand Up @@ -262,8 +268,51 @@ def _register_workflow_task(self, task: Task, parent: MigrationNode) -> MaybeMig
else:
problem = DependencyProblem('cluster-not-found', f"Could not find cluster: {task.job_cluster_key}")
problems.append(problem)
graph.visit(self._visit_dependency, None)
return MaybeMigrationNode(task_node, problems)

def _visit_dependency(self, graph: DependencyGraph) -> bool | None:
lineage = graph.dependency.lineage[-1]
parent_node = self._nodes[(lineage.object_type, lineage.object_id)]
for dependency in graph.local_dependencies:
lineage = dependency.lineage[-1]
self.register_dependency(parent_node, lineage.object_type, lineage.object_id)
# TODO tables and dfsas
return False

def register_dependency(self, parent_node: MigrationNode, object_type: str, object_id: str) -> MigrationNode:
dependency_node = self._nodes.get((object_type, object_id), None)
if not dependency_node:
dependency_node = self._create_dependency_node(object_type, object_id)
if parent_node:
self._incoming[parent_node.key].add(dependency_node.key)
self._outgoing[dependency_node.key].add(parent_node.key)
return dependency_node

def _create_dependency_node(self, object_type: str, object_id: str) -> MigrationNode:
object_name: str = "<ANONYMOUS>"
_object_owner: str = "<UNKNOWN>"
if object_type in {"NOTEBOOK", "FILE"}:
path = Path(object_id)
for library_root in self._path_lookup.library_roots:
if not path.is_relative_to(library_root):
continue
object_name = path.relative_to(library_root).as_posix()
break
object_owner = WorkspaceObjectOwnership(self._admin_locator).owner_of((object_type, object_id))
else:
raise ValueError(f"{object_type} not supported yet!")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where this exception is caught? it'll crash the assessment workflow if unhandled.

self._last_node_id += 1
dependency_node = MigrationNode(
node_id=self._last_node_id,
object_type=object_type,
object_id=object_id,
object_name=object_name,
object_owner=object_owner,
)
self._nodes[dependency_node.key] = dependency_node
return dependency_node

def _register_job_cluster(self, cluster: JobCluster, parent: MigrationNode) -> MaybeMigrationNode:
"""Register a job cluster.

Expand Down Expand Up @@ -322,6 +371,16 @@ def generate_steps(self) -> Iterable[MigrationStep]:
leaf during processing)
- We handle cyclic dependencies (implemented in PR #3009)
"""
"""The below algo is adapted from Kahn's topological sort.
The main differences are as follows:
1) we want the same step number for all nodes with same dependency depth
so instead of pushing 'leaf' nodes to a queue, we fetch them again once all current 'leaf' nodes are processed
(these are transient 'leaf' nodes i.e. they only become 'leaf' during processing)
2) Kahn only supports DAGs but python code allows cyclic dependencies i.e. A -> B -> C -> A is not a DAG
so when fetching 'leaf' nodes, we relax the 0-incoming-vertex rule in order
to avoid an infinite loop. We also avoid side effects (such as negative counts).
This algo works correctly for simple cases, but is not tested on large trees.
"""
ordered_steps: list[MigrationStep] = []
# For updating the priority of steps that depend on other steps
incoming_references = self._invert_outgoing_to_incoming_references()
Expand Down
7 changes: 7 additions & 0 deletions src/databricks/labs/ucx/framework/owners.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,10 @@ def _maybe_direct_owner(self, record: str) -> str | None:
return None
except InternalError: # redash is very naughty and throws 500s instead of proper 404s
return None


class WorkspaceObjectOwnership(Ownership[tuple[str, str]]):

def _maybe_direct_owner(self, record: tuple[str, str]) -> str | None:
# TODO: tuple[0] = object_type, tuple[1] = object_id
return None
8 changes: 4 additions & 4 deletions src/databricks/labs/ucx/source_code/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ def as_message(self) -> str:


class WorkflowTask(Dependency):
def __init__(self, ws: WorkspaceClient, task: jobs.Task, job: jobs.Job):
loader = WrappingLoader(WorkflowTaskContainer(ws, task, job))
def __init__(self, ws: WorkspaceClient, task: jobs.Task, job: jobs.Job, cache: WorkspaceCache | None = None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't add | None as constructor dependencies - it leads to non-deterministic logic and subtle bugs that are harder to diagnose later.

loader = WrappingLoader(WorkflowTaskContainer(ws, task, job, cache))
super().__init__(loader, Path(f'/jobs/{task.task_key}'), inherits_context=False)
self._task = task
self._job = job
Expand All @@ -99,11 +99,11 @@ def lineage(self) -> list[LineageAtom]:


class WorkflowTaskContainer(SourceContainer):
def __init__(self, ws: WorkspaceClient, task: jobs.Task, job: jobs.Job):
def __init__(self, ws: WorkspaceClient, task: jobs.Task, job: jobs.Job, cache: WorkspaceCache | None = None):
self._task = task
self._job = job
self._ws = ws
self._cache = WorkspaceCache(ws)
self._cache = cache or WorkspaceCache(ws)
self._named_parameters: dict[str, str] | None = {}
self._parameters: list[str] | None = []
self._spark_conf: dict[str, str] | None = {}
Expand Down
33 changes: 33 additions & 0 deletions tests/unit/assessment/test_sequencing.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,39 @@ def test_sequence_steps_from_job_task_with_new_cluster(ws, admin_locator) -> Non
]


class _MigrationSequencer(MigrationSequencer):

def visit_graph(self, graph: DependencyGraph):
graph.visit(self._visit_dependency, None)


def test_sequencer_supports_cyclic_dependencies(ws, simple_dependency_resolver, mock_path_lookup):
root = Dependency(FileLoader(), Path("root.py"))
root_graph = _DependencyGraph(root, None, simple_dependency_resolver, mock_path_lookup, CurrentSessionState())
child_a = Dependency(FileLoader(), Path("a.py"))
child_graph_a = _DependencyGraph(
child_a, root_graph, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()
)
child_b = Dependency(FileLoader(), Path("b.py"))
child_graph_b = _DependencyGraph(
child_b, root_graph, simple_dependency_resolver, mock_path_lookup, CurrentSessionState()
)
# root imports a and b
root_graph.add_dependency(child_graph_a)
root_graph.add_dependency(child_graph_b)
# a imports b
child_graph_a.add_dependency(child_graph_b)
# b imports a (using local import)
child_graph_b.add_dependency(child_graph_a)
sequencer = _MigrationSequencer(ws, mock_path_lookup, admin_locator(ws, "John Doe"))
sequencer.register_dependency(None, root.lineage[-1].object_type, root.lineage[-1].object_id)
sequencer.visit_graph(root_graph)
steps = list(sequencer.generate_steps())
assert len(steps) == 3
assert steps[2].object_id == "root.py"



def test_sequence_steps_from_job_task_with_non_existing_cluster(ws, admin_locator) -> None:
"""Sequence a job with a task that references a non-existing cluster.

Expand Down
Loading