Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Type annotations for profile.py #6067

Merged
merged 4 commits into from
Apr 6, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def __init__(
if not hasattr(self.io_loop, "profile"):
ref = weakref.ref(self.io_loop)

def stop():
def stop() -> bool:
loop = ref()
return loop is None or loop.asyncio_loop.is_closed()

Expand Down
94 changes: 58 additions & 36 deletions distributed/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import sys
import threading
from collections import defaultdict, deque
from collections.abc import Callable, Collection
from time import sleep
from typing import Any

Expand All @@ -43,7 +44,7 @@
from distributed.utils import color_of


def identifier(frame):
def identifier(frame) -> str:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why you're never annotating the frame?

Suggested change
def identifier(frame) -> str:
def identifier(frame: types.FrameType) -> str:

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added them now

"""A string identifier from a frame

Strings are cheaper to use as indexes into dicts than tuples or dicts
Expand All @@ -60,8 +61,10 @@ def identifier(frame):
)


# work around some frames lacking an f_lineo eg: https://bugs.python.org/issue47085
def _f_lineno(frame):
def _f_lineno(frame) -> int:
"""Work around some frames lacking an f_lineno
See: https://bugs.python.org/issue47085
"""
f_lineno = frame.f_lineno
if f_lineno is not None:
return f_lineno
Expand All @@ -78,7 +81,7 @@ def _f_lineno(frame):
return prev_line


def repr_frame(frame):
def repr_frame(frame) -> str:
"""Render a frame as a line for inclusion into a text traceback"""
co = frame.f_code
f_lineno = _f_lineno(frame)
Expand All @@ -87,7 +90,7 @@ def repr_frame(frame):
return text + "\n\t" + line


def info_frame(frame):
def info_frame(frame) -> dict[str, Any]:
co = frame.f_code
f_lineno = _f_lineno(frame)
line = linecache.getline(co.co_filename, f_lineno, frame.f_globals).lstrip()
Expand All @@ -99,7 +102,14 @@ def info_frame(frame):
}


def process(frame, child, state, stop=None, omit=None):
def process(
frame,
child,
state: dict[str, Any],
*,
stop: str | None = None,
omit: Collection[str] = (),
) -> dict[str, Any] | None:
"""Add counts from a frame stack onto existing state

This recursively adds counts to the existing state dictionary and creates
Expand All @@ -119,16 +129,17 @@ def process(frame, child, state, stop=None, omit=None):
'description': 'root',
'children': {'...'}}
"""
if omit is not None and any(frame.f_code.co_filename.endswith(o) for o in omit):
return False
if any(frame.f_code.co_filename.endswith(o) for o in omit):
return None

prev = frame.f_back
if prev is not None and (
stop is None or not prev.f_code.co_filename.endswith(stop)
):
state = process(prev, frame, state, stop=stop)
if state is False:
return False
new_state = process(prev, frame, state, stop=stop)
if new_state is None:
return None
state = new_state

ident = identifier(frame)

Expand All @@ -149,6 +160,7 @@ def process(frame, child, state, stop=None, omit=None):
return d
else:
d["count"] += 1
return None


def merge(*args):
Expand Down Expand Up @@ -275,9 +287,14 @@ def traverse(state, start, stop, height):
}


def _watch(thread_id, log, interval="20ms", cycle="2s", omit=None, stop=lambda: False):
interval = parse_timedelta(interval)
cycle = parse_timedelta(cycle)
def _watch(
thread_id: int,
log: deque[tuple[float, dict[str, Any]]], # [(timestamp, output of create()), ...]
interval: float,
cycle: float,
omit: Collection[str],
stop: Callable[[], bool],
) -> None:

recent = create()
last = time()
Expand All @@ -297,48 +314,51 @@ def _watch(thread_id, log, interval="20ms", cycle="2s", omit=None, stop=lambda:


def watch(
thread_id=None,
interval="20ms",
cycle="2s",
maxlen=1000,
omit=None,
stop=lambda: False,
):
thread_id: int | None = None,
interval: str = "20ms",
cycle: str = "2s",
maxlen: int = 1000,
omit: Collection[str] = (),
stop: Callable[[], bool] = lambda: False,
) -> deque[tuple[float, dict[str, Any]]]:
"""Gather profile information on a particular thread

This starts a new thread to watch a particular thread and returns a deque
that holds periodic profile information.

Parameters
----------
thread_id : int
thread_id : int, optional
Defaults to current thread
interval : str
Time per sample
cycle : str
Time per refreshing to a new profile state
maxlen : int
Passed onto deque, maximum number of periods
omit : str
Don't include entries that start with this filename
omit : collection of str
Don't include entries whose filename includes any of these substrings
stop : callable
Function to call to see if we should stop
Function to call to see if we should stop. It must
accept no arguments and return a bool (True to stop,
False to continue).

Returns
-------
deque
"""
if thread_id is None:
thread_id = threading.get_ident()
deque of tuples:

log = deque(maxlen=maxlen)
- timestamp
- dict[str, Any] (output of ``create()``)
"""
log: deque[tuple[float, dict[str, Any]]] = deque(maxlen=maxlen)

thread = threading.Thread(
target=_watch,
name="Profile",
kwargs={
"thread_id": thread_id,
"interval": interval,
"cycle": cycle,
"thread_id": thread_id or threading.get_ident(),
"interval": parse_timedelta(interval),
"cycle": parse_timedelta(cycle),
"log": log,
"omit": omit,
"stop": stop,
Expand Down Expand Up @@ -469,11 +489,11 @@ def _remove_py_stack(frames):
yield entry


def llprocess(frames, child, state):
def llprocess(frames, child, state: dict[str, Any] | None) -> dict[str, Any] | None:
"""Add counts from low level profile information onto existing state

This uses the ``stacktrace`` module to collect low level stack trace
information and place it onto the given sttate.
information and place it onto the given state.

It is configured with the ``distributed.worker.profile.low-level`` config
entry.
Expand All @@ -484,10 +504,11 @@ def llprocess(frames, child, state):
ll_get_stack
"""
if not frames:
return
return None
frame = frames.pop()
if frames:
state = llprocess(frames, frame, state)
assert state

addr = hex(frame.addr - frame.offset)
ident = ";".join(map(str, (frame.name, "<low-level>", addr)))
Expand All @@ -513,6 +534,7 @@ def llprocess(frames, child, state):
return d
else:
d["count"] += 1
return None


def ll_get_stack(tid):
Expand Down