-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathparallel.py
210 lines (185 loc) · 8.45 KB
/
parallel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# Copyright 2016-2018 Dirk Thomas
# Licensed under the Apache License, Version 2.0
import asyncio
from concurrent.futures import ALL_COMPLETED
from concurrent.futures import FIRST_COMPLETED
from contextlib import suppress
import logging
import os
import signal
import sys
import traceback
from colcon_core.executor import ExecutorExtensionPoint
from colcon_core.executor import OnError
from colcon_core.logging import colcon_logger
from colcon_core.plugin_system import satisfies_version
from colcon_core.subprocess import new_event_loop
from colcon_core.subprocess import SIGINT_RESULT
logger = colcon_logger.getChild(__name__)
class ParallelExecutorExtension(ExecutorExtensionPoint):
"""
Process multiple packages in parallel.
The parallelization is honoring the dependency graph between the packages.
"""
# the priority needs to be higher than the extension providing the
# sequential execution in order to become the default
PRIORITY = 110
def __init__(self): # noqa: D107
super().__init__()
satisfies_version(
ExecutorExtensionPoint.EXTENSION_POINT_VERSION, '^1.0')
def add_arguments(self, *, parser): # noqa: D102
max_workers_default = os.cpu_count() or 4
with suppress(AttributeError):
# consider restricted set of CPUs if applicable
max_workers_default = min(
max_workers_default, len(os.sched_getaffinity(0)))
parser.add_argument(
'--parallel-workers',
type=int,
default=max_workers_default,
metavar='NUMBER',
help='The maximum number of packages to process in parallel '
'(default: {max_workers_default})'.format_map(locals()))
def execute(self, args, jobs, *, on_error=OnError.interrupt): # noqa: D102
# avoid debug message from asyncio when colcon uses debug log level
asyncio_logger = logging.getLogger('asyncio')
asyncio_logger.setLevel(logging.INFO)
loop = new_event_loop()
asyncio.set_event_loop(loop)
coro = self._execute(args, jobs, on_error=on_error)
future = asyncio.ensure_future(coro, loop=loop)
try:
logger.debug('run_until_complete')
loop.run_until_complete(future)
except KeyboardInterrupt:
logger.debug('run_until_complete was interrupted')
# override job rc with special SIGINT value
for job in self._ongoing_jobs:
job.returncode = SIGINT_RESULT
# ignore further SIGINTs
signal.signal(signal.SIGINT, signal.SIG_IGN)
# wait for jobs which have also received a SIGINT
if not future.done():
logger.debug('run_until_complete again')
loop.run_until_complete(future)
assert future.done()
# read potential exception to avoid asyncio error
_ = future.exception()
logger.debug('run_until_complete finished')
return signal.SIGINT
except Exception as e: # noqa: F841
exc = traceback.format_exc()
logger.error(
'Exception in job execution: {e}\n{exc}'.format_map(locals()))
return 1
finally:
# HACK on Windows closing the event loop seems to hang after Ctrl-C
# even though no futures are pending
if sys.platform != 'win32':
logger.debug('closing loop')
loop.close()
logger.debug('loop closed')
result = future.result()
logger.debug(
"run_until_complete finished with '{result}'".format_map(locals()))
return result
async def _execute(self, args, jobs, *, on_error):
# count the number of dependent jobs for each job
# in order to process jobs with more dependent jobs first
recursive_dependent_counts = {}
for package_name, job in jobs.items():
# ignore "self" dependency
recursive_dependent_counts[package_name] = len([
j for name, j in jobs.items()
if package_name != name and package_name in j.dependencies])
futures = {}
finished_jobs = {}
rc = 0
jobs = jobs.copy()
while jobs or futures:
# determine "ready" jobs
ready_jobs = []
for package_name, job in jobs.items():
# a pending job is "ready" when all dependencies have finished
not_finished = set(jobs.keys()) | {
f.identifier for f in futures.values()}
if not (set(job.dependencies) - {package_name}) & not_finished:
ready_jobs.append((
package_name, job,
recursive_dependent_counts[package_name]))
# order the ready jobs, jobs with more dependents first
ready_jobs.sort(key=lambda r: -r[2])
# take "ready" jobs
take_jobs = []
for package_name, job, _ in ready_jobs:
# don't schedule more jobs then workers
# to prevent starting further jobs when a job fails
if len(futures) + len(take_jobs) >= args.parallel_workers:
break
take_jobs.append((package_name, job))
del jobs[package_name]
# pass them to the executor
for package_name, job in take_jobs:
assert asyncio.iscoroutinefunction(job.__call__), \
'Job is not a coroutine'
future = asyncio.ensure_future(job())
futures[future] = job
# wait for futures
assert futures, 'No futures'
self._ongoing_jobs = futures.values()
done_futures, _pending = await asyncio.wait(
futures.keys(), timeout=30, return_when=FIRST_COMPLETED)
if not done_futures: # timeout
print(
'[Processing: %s]' % ', '.join(sorted(
f.identifier for f in futures.values())))
# check results of done futures
for done_future in done_futures:
job = futures[done_future]
del futures[done_future]
# get result without raising an exception
if done_future.cancelled():
result = signal.SIGINT
elif done_future.exception():
result = done_future.exception()
if isinstance(result, KeyboardInterrupt):
result = signal.SIGINT
else:
result = done_future.result()
if result == SIGINT_RESULT:
result = signal.SIGINT
finished_jobs[job.identifier] = result
# if any job returned a SIGINT overwrite the return code
# this should override a potentially earlier set error code
# in the case where on_error isn't set to OnError.interrupt
# otherwise set the error code if it is the first
if result is signal.SIGINT or result and not rc:
rc = result
if result:
if on_error in (OnError.interrupt, OnError.skip_pending):
# skip pending jobs
jobs.clear()
if on_error == OnError.skip_downstream:
# skip downstream jobs of failed one
for pending_name, pending_job in list(jobs.items()):
if job.identifier in pending_job.dependencies:
del jobs[pending_name]
# if any job failed or was interrupted cancel pending futures
if (rc and on_error == OnError.interrupt) or rc is signal.SIGINT:
if futures:
for future in futures.keys():
if not future.done():
future.cancel()
await asyncio.wait(
futures.keys(), return_when=ALL_COMPLETED)
# collect results from canceled futures
for future, job in futures.items():
result = future.result()
finished_jobs[job.identifier] = result
break
# if any job failed
if any(finished_jobs.values()):
# flush job output
self._flush()
return rc