From 8cf11967ca4c159c23f328b2dc5b2839e8c46383 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 6 Apr 2022 17:02:13 +0100 Subject: [PATCH] Type annotations for profile.py (#6067) --- distributed/core.py | 2 +- distributed/profile.py | 112 ++++++++++++++++++------------ distributed/tests/test_profile.py | 4 +- 3 files changed, 71 insertions(+), 47 deletions(-) diff --git a/distributed/core.py b/distributed/core.py index 5d968cb4c13..4b37721b8ba 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -202,7 +202,7 @@ def __init__( if not hasattr(self.io_loop, "profile"): ref = weakref.ref(self.io_loop) - def stop(): + def stop() -> bool: loop = ref() return loop is None or loop.asyncio_loop.is_closed() diff --git a/distributed/profile.py b/distributed/profile.py index 5314c3aebee..3e81012c629 100644 --- a/distributed/profile.py +++ b/distributed/profile.py @@ -32,7 +32,9 @@ import sys import threading from collections import defaultdict, deque +from collections.abc import Callable, Collection from time import sleep +from types import FrameType from typing import Any import tlz as toolz @@ -43,7 +45,7 @@ from distributed.utils import color_of -def identifier(frame): +def identifier(frame: FrameType | None) -> str: """A string identifier from a frame Strings are cheaper to use as indexes into dicts than tuples or dicts @@ -60,8 +62,10 @@ def identifier(frame): ) -# work around some frames lacking an f_lineo eg: https://bugs.python.org/issue47085 -def _f_lineno(frame): +def _f_lineno(frame: FrameType) -> int: + """Work around some frames lacking an f_lineno + See: https://bugs.python.org/issue47085 + """ f_lineno = frame.f_lineno if f_lineno is not None: return f_lineno @@ -78,7 +82,7 @@ def _f_lineno(frame): return prev_line -def repr_frame(frame): +def repr_frame(frame: FrameType) -> str: """Render a frame as a line for inclusion into a text traceback""" co = frame.f_code f_lineno = _f_lineno(frame) @@ -87,7 +91,7 @@ def repr_frame(frame): return text + "\n\t" + line -def info_frame(frame): +def info_frame(frame: FrameType) -> dict[str, Any]: co = frame.f_code f_lineno = _f_lineno(frame) line = linecache.getline(co.co_filename, f_lineno, frame.f_globals).lstrip() @@ -99,7 +103,14 @@ def info_frame(frame): } -def process(frame, child, state, stop=None, omit=None): +def process( + frame: FrameType, + child, + state: dict[str, Any], + *, + stop: str | None = None, + omit: Collection[str] = (), +) -> dict[str, Any] | None: """Add counts from a frame stack onto existing state This recursively adds counts to the existing state dictionary and creates @@ -119,16 +130,17 @@ def process(frame, child, state, stop=None, omit=None): 'description': 'root', 'children': {'...'}} """ - if omit is not None and any(frame.f_code.co_filename.endswith(o) for o in omit): - return False + if any(frame.f_code.co_filename.endswith(o) for o in omit): + return None prev = frame.f_back if prev is not None and ( stop is None or not prev.f_code.co_filename.endswith(stop) ): - state = process(prev, frame, state, stop=stop) - if state is False: - return False + new_state = process(prev, frame, state, stop=stop) + if new_state is None: + return None + state = new_state ident = identifier(frame) @@ -149,9 +161,10 @@ def process(frame, child, state, stop=None, omit=None): return d else: d["count"] += 1 + return None -def merge(*args): +def merge(*args: dict[str, Any]) -> dict[str, Any]: """Merge multiple frame states together""" if not args: return create() @@ -164,13 +177,13 @@ def merge(*args): children[child].append(arg["children"][child]) try: - children = {k: merge(*v) for k, v in children.items()} + children_dict = {k: merge(*v) for k, v in children.items()} except RecursionError: # pragma: no cover - children = {} + children_dict = {} count = sum(arg["count"] for arg in args) return { "description": args[0]["description"], - "children": dict(children), + "children": children_dict, "count": count, "identifier": args[0]["identifier"], } @@ -185,7 +198,7 @@ def create() -> dict[str, Any]: } -def call_stack(frame): +def call_stack(frame: FrameType) -> list[str]: """Create a call text stack from a frame Returns @@ -193,9 +206,10 @@ def call_stack(frame): list of strings """ L = [] - while frame: - L.append(repr_frame(frame)) - frame = frame.f_back + cur_frame: FrameType | None = frame + while cur_frame: + L.append(repr_frame(cur_frame)) + cur_frame = cur_frame.f_back return L[::-1] @@ -287,9 +301,14 @@ def wait_profiler() -> None: sleep(0.0001) -def _watch(thread_id, log, interval="20ms", cycle="2s", omit=None, stop=lambda: False): - interval = parse_timedelta(interval) - cycle = parse_timedelta(cycle) +def _watch( + thread_id: int, + log: deque[tuple[float, dict[str, Any]]], # [(timestamp, output of create()), ...] + interval: float, + cycle: float, + omit: Collection[str], + stop: Callable[[], bool], +) -> None: recent = create() last = time() @@ -315,13 +334,13 @@ def _watch(thread_id, log, interval="20ms", cycle="2s", omit=None, stop=lambda: def watch( - thread_id=None, - interval="20ms", - cycle="2s", - maxlen=1000, - omit=None, - stop=lambda: False, -): + thread_id: int | None = None, + interval: str = "20ms", + cycle: str = "2s", + maxlen: int = 1000, + omit: Collection[str] = (), + stop: Callable[[], bool] = lambda: False, +) -> deque[tuple[float, dict[str, Any]]]: """Gather profile information on a particular thread This starts a new thread to watch a particular thread and returns a deque @@ -329,34 +348,37 @@ def watch( Parameters ---------- - thread_id : int + thread_id : int, optional + Defaults to current thread interval : str Time per sample cycle : str Time per refreshing to a new profile state maxlen : int Passed onto deque, maximum number of periods - omit : str - Don't include entries that start with this filename + omit : collection of str + Don't include entries whose filename includes any of these substrings stop : callable - Function to call to see if we should stop + Function to call to see if we should stop. It must + accept no arguments and return a bool (True to stop, + False to continue). Returns ------- - deque - """ - if thread_id is None: - thread_id = threading.get_ident() + deque of tuples: - log = deque(maxlen=maxlen) + - timestamp + - dict[str, Any] (output of ``create()``) + """ + log: deque[tuple[float, dict[str, Any]]] = deque(maxlen=maxlen) thread = threading.Thread( target=_watch, name="Profile", kwargs={ - "thread_id": thread_id, - "interval": interval, - "cycle": cycle, + "thread_id": thread_id or threading.get_ident(), + "interval": parse_timedelta(interval), + "cycle": parse_timedelta(cycle), "log": log, "omit": omit, "stop": stop, @@ -487,11 +509,11 @@ def _remove_py_stack(frames): yield entry -def llprocess(frames, child, state): +def llprocess(frames, child, state: dict[str, Any] | None) -> dict[str, Any] | None: """Add counts from low level profile information onto existing state This uses the ``stacktrace`` module to collect low level stack trace - information and place it onto the given sttate. + information and place it onto the given state. It is configured with the ``distributed.worker.profile.low-level`` config entry. @@ -502,10 +524,11 @@ def llprocess(frames, child, state): ll_get_stack """ if not frames: - return + return None frame = frames.pop() if frames: state = llprocess(frames, frame, state) + assert state addr = hex(frame.addr - frame.offset) ident = ";".join(map(str, (frame.name, "", addr))) @@ -531,6 +554,7 @@ def llprocess(frames, child, state): return d else: d["count"] += 1 + return None def ll_get_stack(tid): diff --git a/distributed/tests/test_profile.py b/distributed/tests/test_profile.py index 75eb704c99f..c7c78c220f6 100644 --- a/distributed/tests/test_profile.py +++ b/distributed/tests/test_profile.py @@ -274,7 +274,7 @@ class FakeFrame: ], ) def test_info_frame_f_lineno(f_lasti: int, f_lineno: int) -> None: - assert info_frame(FakeFrame(f_lasti=f_lasti, f_code=FAKE_CODE)) == { + assert info_frame(FakeFrame(f_lasti=f_lasti, f_code=FAKE_CODE)) == { # type: ignore "filename": "", "name": "example", "line_number": f_lineno, @@ -301,6 +301,6 @@ def test_info_frame_f_lineno(f_lasti: int, f_lineno: int) -> None: ], ) def test_call_stack_f_lineno(f_lasti: int, f_lineno: int) -> None: - assert call_stack(FakeFrame(f_lasti=f_lasti, f_code=FAKE_CODE)) == [ + assert call_stack(FakeFrame(f_lasti=f_lasti, f_code=FAKE_CODE)) == [ # type: ignore f' File "", line {f_lineno}, in example\n\t' ]