Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

5864 fixed thread pool limit insufficiency on cascade parallel execute #2

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions compose/cli/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ def set_parallel_limit(environment):
raise errors.UserError('COMPOSE_PARALLEL_LIMIT can not be less than 2')
parallel.GlobalLimit.set_global_limit(parallel_limit)

# Depth of cascade parallel execution limited by global parallel_limit
parallel.CascadeLimiter.set_global_limit(parallel_limit)


def get_config_from_options(base_dir, options, additional_options=None):
additional_options = additional_options or {}
Expand Down
36 changes: 36 additions & 0 deletions compose/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import operator
import sys
from functools import partial
from queue import Empty
from queue import Queue
from threading import Lock
Expand Down Expand Up @@ -39,6 +40,10 @@ def set_global_limit(cls, value):
cls.global_limiter = Semaphore(value)


class ParallelLimitExceededByCascadeCalls(Exception):
pass


def parallel_execute_watch(events, writer, errors, results, msg, get_name, fail_check):
""" Watch events from a parallel execution, update status and fill errors and results.
Returns exception to re-raise.
Expand Down Expand Up @@ -71,6 +76,31 @@ def parallel_execute_watch(events, writer, errors, results, msg, get_name, fail_
return error_to_reraise


class CascadeLimiter:
global_limit = PARALLEL_LIMIT
inner_limiter = Semaphore(1)

@classmethod
def set_global_limit(cls, value):
if value is None:
value = PARALLEL_LIMIT
cls.global_limit = value

def __init__(self, value=None):
if value is None:
value = self.global_limit
self.limit = value

def allocate(self):
with self.inner_limiter:
limit = self.limit // 2
self.limit -= limit
return limit

def remaining(self):
return self.limit


def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None, fail_check=None):
"""Runs func on objects in parallel while ensuring that func is
ran on object only after it is ran on all its dependencies.
Expand Down Expand Up @@ -165,6 +195,12 @@ def parallel_execute_iter(objects, func, get_deps, limit):

if limit is None:
limiter = NoLimit()
elif isinstance(limit, CascadeLimiter):
available_limit = limit.allocate()
if available_limit == 0:
raise ParallelLimitExceededByCascadeCalls()
limiter = Semaphore(available_limit)
func = partial(func, limit=limit.remaining())
else:
limiter = Semaphore(limit)

Expand Down
2 changes: 2 additions & 0 deletions compose/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from .network import build_networks
from .network import get_networks
from .network import ProjectNetworks
from .parallel import CascadeLimiter
from .progress_stream import read_status
from .service import BuildAction
from .service import ContainerIpcMode
Expand Down Expand Up @@ -646,6 +647,7 @@ def get_deps(service):
operator.attrgetter('name'),
None,
get_deps,
limit=CascadeLimiter(),
)
if errors:
raise ProjectError(
Expand Down
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ mock==3.0.5
pytest==6.0.1; python_version >= '3.5'
pytest==4.6.5; python_version < '3.5'
pytest-cov==2.10.1
pytest-timeout==1.4.2
PyYAML==5.3.1
148 changes: 148 additions & 0 deletions tests/unit/parallel_test.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import unittest
from threading import Lock

import pytest
from docker.errors import APIError

from compose.parallel import CascadeLimiter
from compose.parallel import GlobalLimit
from compose.parallel import parallel_execute
from compose.parallel import parallel_execute_iter
from compose.parallel import ParallelLimitExceededByCascadeCalls
from compose.parallel import ParallelStreamWriter
from compose.parallel import UpstreamError

Expand Down Expand Up @@ -88,6 +91,151 @@ def f(obj):
assert results == tasks * [None]
assert errors == {}

@pytest.mark.timeout(10)
def test_parallel_cascade_execute_with_global_limit(self):
COMPOSE_PARALLEL_LIMIT = 2

GlobalLimit.set_global_limit(COMPOSE_PARALLEL_LIMIT)
CascadeLimiter.set_global_limit(COMPOSE_PARALLEL_LIMIT)
self.addCleanup(GlobalLimit.set_global_limit, None)
self.addCleanup(CascadeLimiter.set_global_limit, None)

tasks = COMPOSE_PARALLEL_LIMIT
inner_tasks = 2

lock = Lock()

def inner_f(obj):
locked = lock.acquire(False)
# we should always get the lock because we're the only thread running
assert locked
lock.release()
return None

def outer_f(obj, limit):
return parallel_execute(
objects=list(range(inner_tasks)),
func=inner_f,
get_name=str,
msg="Testing cascade parallel level 1 func",
limit=limit
)

results, errors = parallel_execute(
objects=list(range(tasks)),
func=outer_f,
get_name=str,
msg="Testing cascade parallel level 0 func",
limit=CascadeLimiter()
)

assert results == tasks * [(inner_tasks * [None], {})]
assert errors == {}

@pytest.mark.timeout(10)
def test_parallel_multiple_cascade_execute_with_global_limit(self):
COMPOSE_PARALLEL_LIMIT = 3

GlobalLimit.set_global_limit(COMPOSE_PARALLEL_LIMIT)
CascadeLimiter.set_global_limit(COMPOSE_PARALLEL_LIMIT)
self.addCleanup(GlobalLimit.set_global_limit, None)
self.addCleanup(CascadeLimiter.set_global_limit, None)

tasks = COMPOSE_PARALLEL_LIMIT
middle_tasks = 2
inner_tasks = 2

lock = Lock()

def inner_f(obj):
locked = lock.acquire(False)
# we should always get the lock because we're the only thread running
assert locked
lock.release()
return None

def middle_f(obj, limit):
return parallel_execute(
objects=list(range(inner_tasks)),
func=inner_f,
get_name=str,
msg="Testing cascade parallel level 2 func",
limit=limit
)

def outer_f(obj, limit):
return parallel_execute(
objects=list(range(middle_tasks)),
func=middle_f,
get_name=str,
msg="Testing cascade parallel level 1 func",
limit=CascadeLimiter(limit)
)
limiter = CascadeLimiter()
results, errors = parallel_execute(
objects=list(range(tasks)),
func=outer_f,
get_name=str,
msg="Testing cascade parallel level 0 func",
limit=limiter
)
assert results == tasks * [(inner_tasks * [(middle_tasks * [None], {})], {})]
assert errors == {}

@pytest.mark.timeout(10)
def test_parallel_cascade_execute_with_global_limit_less_than_call_stack_height(self):
COMPOSE_PARALLEL_LIMIT = 2

GlobalLimit.set_global_limit(COMPOSE_PARALLEL_LIMIT)
CascadeLimiter.set_global_limit(COMPOSE_PARALLEL_LIMIT)
self.addCleanup(GlobalLimit.set_global_limit, None)
self.addCleanup(CascadeLimiter.set_global_limit, None)

tasks = COMPOSE_PARALLEL_LIMIT
middle_tasks = 2
inner_tasks = 2

lock = Lock()

def inner_f(obj):
locked = lock.acquire(False)
# we should always get the lock because we're the only thread running
assert locked
lock.release()
return None

def middle_f(obj, limit):
print('middle limit:', limit)
return parallel_execute(
objects=list(range(inner_tasks)),
func=inner_f,
get_name=str,
msg="Testing cascade parallel level 2 func",
limit=limit
)

def outer_f(obj, limit):
print('outer limit:', limit)
return parallel_execute(
objects=list(range(middle_tasks)),
func=middle_f,
get_name=str,
msg="Testing cascade parallel level 1 func",
limit=CascadeLimiter(limit)
)

with pytest.raises(ParallelLimitExceededByCascadeCalls):
results, errors = parallel_execute(
objects=list(range(tasks)),
func=outer_f,
get_name=str,
msg="Testing cascade parallel level 0 func",
limit=CascadeLimiter()
)

assert results == tasks * [(inner_tasks * [(middle_tasks * [None], {})], {})]
assert errors == {}

def test_parallel_execute_with_deps(self):
log = []

Expand Down