Skip to content
This repository was archived by the owner on Jan 30, 2023. It is now read-only.

Commit 65c7563

Browse files
embraydimpase
authored andcommitted
Enhance the new build_many to actually return results (including exceptions).
Now it can be used even as a somewhat more general parallel map.
1 parent 4583b40 commit 65c7563

File tree

1 file changed

+127
-44
lines changed

1 file changed

+127
-44
lines changed

src/sage_setup/docbuild/utils.py

+127-44
Original file line numberDiff line numberDiff line change
@@ -7,37 +7,48 @@
77
class WorkerDiedException(RuntimeError):
88
"""Raised if a worker process dies unexpected."""
99

10+
def __init__(self, message, original_exception=None):
11+
super(WorkerDiedException, self).__init__(message)
12+
self.original_exception = original_exception
1013

11-
def _build_many(target, args, processes=None):
14+
15+
def build_many(target, args, processes=None):
1216
"""
1317
Map a list of arguments in ``args`` to a single-argument target function
14-
``target`` in parallel using ``NUM_THREADS`` (or ``processes`` if given)
15-
simultaneous processes.
18+
``target`` in parallel using ``multiprocessing.cpu_count()`` (or
19+
``processes`` if given) simultaneous processes.
1620
1721
This is a simplified version of ``multiprocessing.Pool.map`` from the
1822
Python standard library which avoids a couple of its pitfalls. In
1923
particular, it can abort (with a `RuntimeError`) without hanging if one of
20-
the worker processes unexpectedly dies. It also avoids starting new
21-
processes from a pthread, which is known to result in bugs on versions of
22-
Cygwin prior to 3.0.0 (see
23-
https://trac.sagemath.org/ticket/27214#comment:25).
24+
the worker processes unexpectedly dies. It also has semantics equivalent
25+
to ``maxtasksperchild=1``; that is, one process is started per argument.
26+
As such, this is inefficient for processing large numbers of fast tasks,
27+
but appropriate for running longer tasks (such as doc builds) which may
28+
also require significant cleanup.
29+
30+
It also avoids starting new processes from a pthread, which results in at
31+
least two known issues:
2432
25-
On the other hand, unlike ``multiprocessing.Pool.map`` it does not return
26-
a result. This is fine for the purpose of building multiple Sphinx
27-
documents in parallel.
33+
* On versions of Cygwin prior to 3.0.0 there were bugs in mmap handling
34+
on threads (see https://trac.sagemath.org/ticket/27214#comment:25).
35+
36+
* When PARI is built with multi-threading support, forking a Sage
37+
process from a thread leaves the main Pari interface instance broken
38+
(see https://trac.sagemath.org/ticket/26608#comment:38).
2839
2940
In the future this may be replaced by a generalized version of the more
3041
robust parallel processing implementation from ``sage.doctest.forker``.
3142
3243
EXAMPLES::
3344
34-
sage: from sage_setup.docbuild.utils import _build_many
45+
sage: from sage_setup.docbuild.utils import build_many
3546
sage: def target(N):
3647
....: import time
3748
....: time.sleep(float(0.1))
3849
....: print('Processed task %s' % N)
3950
....:
40-
sage: _build_many(target, range(8), processes=8)
51+
sage: _ = build_many(target, range(8), processes=8)
4152
Processed task ...
4253
Processed task ...
4354
Processed task ...
@@ -47,13 +58,23 @@ def _build_many(target, args, processes=None):
4758
Processed task ...
4859
Processed task ...
4960
50-
If one of the worker processes errors out from an unhandled exception, or
51-
otherwise exits non-zero (e.g. killed by a signal) any in-progress tasks
52-
will be completed gracefully, but then a `RuntimeError` is raised and
53-
pending tasks are not started::
61+
Unlike the first version of `build_many` which was only intended to get
62+
around the Cygwin bug, this version can also return a result, and thus can
63+
be used as a replacement for `multiprocessing.Pool.map` (i.e. it still
64+
blocks until the result is ready)::
65+
66+
sage: def square(N):
67+
....: return N * N
68+
sage: build_many(square, range(100))
69+
[0, 1, 4, 9, ..., 9604, 9801]
70+
71+
If the target function raises an exception in any of the workers,
72+
`build_many` raises that exception and all other results are discarded.
73+
Any in-progress tasks may still be allowed to complete gracefully before
74+
the exception is raised::
5475
5576
sage: def target(N):
56-
....: import time
77+
....: import time, os, signal
5778
....: if N == 4:
5879
....: # Task 4 is a poison pill
5980
....: 1 / 0
@@ -67,48 +88,86 @@ def _build_many(target, args, processes=None):
6788
traceback from the failing process on stderr. However, due to how the
6889
doctest runner works, the doctest will only expect the final exception::
6990
70-
sage: _build_many(target, range(8), processes=8)
91+
sage: build_many(target, range(8), processes=8)
7192
Traceback (most recent call last):
7293
...
73-
WorkerDiedException: worker for 4 died with non-zero exit code 1
94+
ZeroDivisionError: rational division by zero
95+
96+
Similarly, if one of the worker processes dies unexpectedly otherwise exits
97+
non-zero (e.g. killed by a signal) any in-progress tasks will be completed
98+
gracefully, but then a `RuntimeError` is raised and pending tasks are not
99+
started::
100+
101+
sage: def target(N):
102+
....: import time, os, signal
103+
....: if N == 4:
104+
....: # Task 4 is a poison pill
105+
....: os.kill(os.getpid(), signal.SIGKILL)
106+
....: else:
107+
....: time.sleep(0.5)
108+
....: print('Processed task %s' % N)
109+
....:
110+
sage: build_many(target, range(8), processes=8)
111+
Traceback (most recent call last):
112+
...
113+
WorkerDiedException: worker for 4 died with non-zero exit code -9
74114
"""
75-
from multiprocessing import Process
76-
from .build_options import NUM_THREADS, ABORT_ON_ERROR
115+
from multiprocessing import Process, Queue, cpu_count
116+
from six.moves.queue import Empty
77117

78118
if processes is None:
79-
processes = NUM_THREADS
119+
processes = cpu_count()
80120

81121
workers = [None] * processes
82-
queue = list(args)
83-
84-
# Maps worker process PIDs to the name of the document it's working
85-
# on (the argument it was passed). This is primarily used just for
86-
# debugging/information purposes.
87-
jobs = {}
122+
tasks = enumerate(args)
123+
results = []
124+
result_queue = Queue()
88125

89126
### Utility functions ###
127+
def run_worker(target, queue, idx, task):
128+
try:
129+
result = target(task)
130+
except BaseException as exc:
131+
queue.put((None, exc))
132+
else:
133+
queue.put((idx, result))
90134

91-
def bring_out_yer_dead(w, exitcode):
135+
def bring_out_yer_dead(w, task, exitcode):
92136
"""
93-
Handle a dead / completed worker. Raises WorkerDiedError if it
137+
Handle a dead / completed worker. Raises WorkerDiedException if it
94138
returned with a non-zero exit code.
95139
"""
96140

97141
if w is None or exitcode is None:
98142
# I'm not dead yet! (or I haven't even been born yet)
99-
return w
143+
return (w, task)
100144

101145
# Hack: If we wait()ed on this worker manually we have to tell it
102146
# it's dead:
103147
if w._popen.returncode is None:
104148
w._popen.returncode = exitcode
105149

106-
if exitcode != 0 and ABORT_ON_ERROR:
150+
if exitcode != 0:
107151
raise WorkerDiedException(
108152
"worker for {} died with non-zero exit code "
109-
"{}".format(jobs[w.pid], w.exitcode))
153+
"{}".format(task[1], w.exitcode))
154+
155+
# Get result from the queue; depending on ordering this may not be
156+
# *the* result for this worker, but for each completed worker there
157+
# should be *a* result so let's get it
158+
try:
159+
result = result_queue.get_nowait()
160+
except Empty:
161+
# Generally shouldn't happen but could in case of a race condition;
162+
# don't worry we'll collect any remaining results at the end.
163+
pass
164+
165+
if result[0] is None:
166+
# Indicates that an exception occurred in the target function
167+
raise WorkerDiedException('', original_exception=result[1])
168+
else:
169+
results.append(result)
110170

111-
jobs.pop(w.pid)
112171
# Helps multiprocessing with some internal bookkeeping
113172
w.join()
114173

@@ -147,20 +206,28 @@ def reap_workers(waited_pid=None, waited_exitcode=None):
147206

148207
for idx, w in enumerate(workers):
149208
if w is not None:
209+
w, task = w
150210
if w.pid == waited_pid:
151211
exitcode = waited_exitcode
152212
else:
153213
exitcode = w.exitcode
154214

155-
w = bring_out_yer_dead(w, exitcode)
215+
w = bring_out_yer_dead(w, task, exitcode)
156216

157217
# Worker w is dead/not started, so start a new worker
158218
# in its place with the next document from the queue
159-
if w is None and queue:
160-
job = queue.pop(0)
161-
w = Process(target=target, args=(job,))
162-
w.start()
163-
jobs[w.pid] = job
219+
if w is None:
220+
try:
221+
task = next(tasks)
222+
except StopIteration:
223+
pass
224+
else:
225+
w = Process(target=run_worker,
226+
args=((target, result_queue) + task))
227+
w.start()
228+
# Pair the new worker with the task it's performing (mostly
229+
# for debugging purposes)
230+
w = (w, task)
164231

165232
workers[idx] = w
166233

@@ -197,18 +264,34 @@ def reap_workers(waited_pid=None, waited_exitcode=None):
197264
finally:
198265
try:
199266
remaining_workers = [w for w in workers if w is not None]
200-
for w in remaining_workers:
267+
for w, _ in remaining_workers:
201268
# Give any remaining workers a chance to shut down gracefully
202269
try:
203270
w.terminate()
204271
except OSError as exc:
205272
if exc.errno != errno.ESRCH:
206273
# Otherwise it was already dead so this was expected
207274
raise
208-
for w in remaining_workers:
275+
for w, _ in remaining_workers:
209276
w.join()
210277
finally:
211278
if worker_exc is not None:
212279
# Re-raise the RuntimeError from bring_out_yer_dead set if a
213-
# worker died unexpectedly
214-
raise worker_exc
280+
# worker died unexpectedly, or the original exception if it's
281+
# wrapping one
282+
if worker_exc.original_exception:
283+
raise worker_exc.original_exception
284+
else:
285+
raise worker_exc
286+
287+
# All workers should be shut down by now and should have completed without
288+
# error. No new items will be added to the result queue, so we can get all
289+
# the remaining results, if any.
290+
while True:
291+
try:
292+
results.append(result_queue.get_nowait())
293+
except Empty:
294+
break
295+
296+
# Return the results sorted according to their original task order
297+
return [r[1] for r in sorted(results, key=lambda r: r[0])]

0 commit comments

Comments
 (0)