Skip to content

Commit

Permalink
Type annotations for profile.py (#6067)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky authored Apr 6, 2022
1 parent cd2ce86 commit 8cf1196
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 47 deletions.
2 changes: 1 addition & 1 deletion distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def __init__(
if not hasattr(self.io_loop, "profile"):
ref = weakref.ref(self.io_loop)

def stop():
def stop() -> bool:
loop = ref()
return loop is None or loop.asyncio_loop.is_closed()

Expand Down
112 changes: 68 additions & 44 deletions distributed/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import sys
import threading
from collections import defaultdict, deque
from collections.abc import Callable, Collection
from time import sleep
from types import FrameType
from typing import Any

import tlz as toolz
Expand All @@ -43,7 +45,7 @@
from distributed.utils import color_of


def identifier(frame):
def identifier(frame: FrameType | None) -> str:
"""A string identifier from a frame
Strings are cheaper to use as indexes into dicts than tuples or dicts
Expand All @@ -60,8 +62,10 @@ def identifier(frame):
)


# work around some frames lacking an f_lineo eg: https://bugs.python.org/issue47085
def _f_lineno(frame):
def _f_lineno(frame: FrameType) -> int:
"""Work around some frames lacking an f_lineno
See: https://bugs.python.org/issue47085
"""
f_lineno = frame.f_lineno
if f_lineno is not None:
return f_lineno
Expand All @@ -78,7 +82,7 @@ def _f_lineno(frame):
return prev_line


def repr_frame(frame):
def repr_frame(frame: FrameType) -> str:
"""Render a frame as a line for inclusion into a text traceback"""
co = frame.f_code
f_lineno = _f_lineno(frame)
Expand All @@ -87,7 +91,7 @@ def repr_frame(frame):
return text + "\n\t" + line


def info_frame(frame):
def info_frame(frame: FrameType) -> dict[str, Any]:
co = frame.f_code
f_lineno = _f_lineno(frame)
line = linecache.getline(co.co_filename, f_lineno, frame.f_globals).lstrip()
Expand All @@ -99,7 +103,14 @@ def info_frame(frame):
}


def process(frame, child, state, stop=None, omit=None):
def process(
frame: FrameType,
child,
state: dict[str, Any],
*,
stop: str | None = None,
omit: Collection[str] = (),
) -> dict[str, Any] | None:
"""Add counts from a frame stack onto existing state
This recursively adds counts to the existing state dictionary and creates
Expand All @@ -119,16 +130,17 @@ def process(frame, child, state, stop=None, omit=None):
'description': 'root',
'children': {'...'}}
"""
if omit is not None and any(frame.f_code.co_filename.endswith(o) for o in omit):
return False
if any(frame.f_code.co_filename.endswith(o) for o in omit):
return None

prev = frame.f_back
if prev is not None and (
stop is None or not prev.f_code.co_filename.endswith(stop)
):
state = process(prev, frame, state, stop=stop)
if state is False:
return False
new_state = process(prev, frame, state, stop=stop)
if new_state is None:
return None
state = new_state

ident = identifier(frame)

Expand All @@ -149,9 +161,10 @@ def process(frame, child, state, stop=None, omit=None):
return d
else:
d["count"] += 1
return None


def merge(*args):
def merge(*args: dict[str, Any]) -> dict[str, Any]:
"""Merge multiple frame states together"""
if not args:
return create()
Expand All @@ -164,13 +177,13 @@ def merge(*args):
children[child].append(arg["children"][child])

try:
children = {k: merge(*v) for k, v in children.items()}
children_dict = {k: merge(*v) for k, v in children.items()}
except RecursionError: # pragma: no cover
children = {}
children_dict = {}
count = sum(arg["count"] for arg in args)
return {
"description": args[0]["description"],
"children": dict(children),
"children": children_dict,
"count": count,
"identifier": args[0]["identifier"],
}
Expand All @@ -185,17 +198,18 @@ def create() -> dict[str, Any]:
}


def call_stack(frame):
def call_stack(frame: FrameType) -> list[str]:
"""Create a call text stack from a frame
Returns
-------
list of strings
"""
L = []
while frame:
L.append(repr_frame(frame))
frame = frame.f_back
cur_frame: FrameType | None = frame
while cur_frame:
L.append(repr_frame(cur_frame))
cur_frame = cur_frame.f_back
return L[::-1]


Expand Down Expand Up @@ -287,9 +301,14 @@ def wait_profiler() -> None:
sleep(0.0001)


def _watch(thread_id, log, interval="20ms", cycle="2s", omit=None, stop=lambda: False):
interval = parse_timedelta(interval)
cycle = parse_timedelta(cycle)
def _watch(
thread_id: int,
log: deque[tuple[float, dict[str, Any]]], # [(timestamp, output of create()), ...]
interval: float,
cycle: float,
omit: Collection[str],
stop: Callable[[], bool],
) -> None:

recent = create()
last = time()
Expand All @@ -315,48 +334,51 @@ def _watch(thread_id, log, interval="20ms", cycle="2s", omit=None, stop=lambda:


def watch(
thread_id=None,
interval="20ms",
cycle="2s",
maxlen=1000,
omit=None,
stop=lambda: False,
):
thread_id: int | None = None,
interval: str = "20ms",
cycle: str = "2s",
maxlen: int = 1000,
omit: Collection[str] = (),
stop: Callable[[], bool] = lambda: False,
) -> deque[tuple[float, dict[str, Any]]]:
"""Gather profile information on a particular thread
This starts a new thread to watch a particular thread and returns a deque
that holds periodic profile information.
Parameters
----------
thread_id : int
thread_id : int, optional
Defaults to current thread
interval : str
Time per sample
cycle : str
Time per refreshing to a new profile state
maxlen : int
Passed onto deque, maximum number of periods
omit : str
Don't include entries that start with this filename
omit : collection of str
Don't include entries whose filename includes any of these substrings
stop : callable
Function to call to see if we should stop
Function to call to see if we should stop. It must
accept no arguments and return a bool (True to stop,
False to continue).
Returns
-------
deque
"""
if thread_id is None:
thread_id = threading.get_ident()
deque of tuples:
log = deque(maxlen=maxlen)
- timestamp
- dict[str, Any] (output of ``create()``)
"""
log: deque[tuple[float, dict[str, Any]]] = deque(maxlen=maxlen)

thread = threading.Thread(
target=_watch,
name="Profile",
kwargs={
"thread_id": thread_id,
"interval": interval,
"cycle": cycle,
"thread_id": thread_id or threading.get_ident(),
"interval": parse_timedelta(interval),
"cycle": parse_timedelta(cycle),
"log": log,
"omit": omit,
"stop": stop,
Expand Down Expand Up @@ -487,11 +509,11 @@ def _remove_py_stack(frames):
yield entry


def llprocess(frames, child, state):
def llprocess(frames, child, state: dict[str, Any] | None) -> dict[str, Any] | None:
"""Add counts from low level profile information onto existing state
This uses the ``stacktrace`` module to collect low level stack trace
information and place it onto the given sttate.
information and place it onto the given state.
It is configured with the ``distributed.worker.profile.low-level`` config
entry.
Expand All @@ -502,10 +524,11 @@ def llprocess(frames, child, state):
ll_get_stack
"""
if not frames:
return
return None
frame = frames.pop()
if frames:
state = llprocess(frames, frame, state)
assert state

addr = hex(frame.addr - frame.offset)
ident = ";".join(map(str, (frame.name, "<low-level>", addr)))
Expand All @@ -531,6 +554,7 @@ def llprocess(frames, child, state):
return d
else:
d["count"] += 1
return None


def ll_get_stack(tid):
Expand Down
4 changes: 2 additions & 2 deletions distributed/tests/test_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ class FakeFrame:
],
)
def test_info_frame_f_lineno(f_lasti: int, f_lineno: int) -> None:
assert info_frame(FakeFrame(f_lasti=f_lasti, f_code=FAKE_CODE)) == {
assert info_frame(FakeFrame(f_lasti=f_lasti, f_code=FAKE_CODE)) == { # type: ignore
"filename": "<stdin>",
"name": "example",
"line_number": f_lineno,
Expand All @@ -301,6 +301,6 @@ def test_info_frame_f_lineno(f_lasti: int, f_lineno: int) -> None:
],
)
def test_call_stack_f_lineno(f_lasti: int, f_lineno: int) -> None:
assert call_stack(FakeFrame(f_lasti=f_lasti, f_code=FAKE_CODE)) == [
assert call_stack(FakeFrame(f_lasti=f_lasti, f_code=FAKE_CODE)) == [ # type: ignore
f' File "<stdin>", line {f_lineno}, in example\n\t'
]

0 comments on commit 8cf1196

Please sign in to comment.