From 7fb4a3b614456d136b11edb9165d69d2b51a2f8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AE=D1=80=D0=B8=D0=B9=20=D0=A1=D0=B0=D0=B3=D0=B8=D1=82?= =?UTF-8?q?=D0=BE=D0=B2?= Date: Fri, 13 Nov 2020 20:13:43 +0700 Subject: [PATCH] 5864 fixed thread pool limit insufficiency on do _execute_convergence_plan cascade parallel calls MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Юрий Сагитов --- compose/cli/command.py | 3 + compose/parallel.py | 36 +++++++++ compose/project.py | 2 + requirements-dev.txt | 1 + tests/unit/parallel_test.py | 148 ++++++++++++++++++++++++++++++++++++ 5 files changed, 190 insertions(+) diff --git a/compose/cli/command.py b/compose/cli/command.py index d471e78df1a..bffb493c8c5 100644 --- a/compose/cli/command.py +++ b/compose/cli/command.py @@ -85,6 +85,9 @@ def set_parallel_limit(environment): raise errors.UserError('COMPOSE_PARALLEL_LIMIT can not be less than 2') parallel.GlobalLimit.set_global_limit(parallel_limit) + # Depth of cascade parallel execution limited by global parallel_limit + parallel.CascadeLimiter.set_global_limit(parallel_limit) + def get_config_from_options(base_dir, options, additional_options=None): additional_options = additional_options or {} diff --git a/compose/parallel.py b/compose/parallel.py index acf9e4a84cf..03dd65d3e17 100644 --- a/compose/parallel.py +++ b/compose/parallel.py @@ -2,6 +2,7 @@ import logging import operator import sys +from functools import partial from queue import Empty from queue import Queue from threading import Lock @@ -39,6 +40,10 @@ def set_global_limit(cls, value): cls.global_limiter = Semaphore(value) +class ParallelLimitExceededByCascadeCalls(Exception): + pass + + def parallel_execute_watch(events, writer, errors, results, msg, get_name, fail_check): """ Watch events from a parallel execution, update status and fill errors and results. Returns exception to re-raise. @@ -71,6 +76,31 @@ def parallel_execute_watch(events, writer, errors, results, msg, get_name, fail_ return error_to_reraise +class CascadeLimiter: + global_limit = PARALLEL_LIMIT + inner_limiter = Semaphore(1) + + @classmethod + def set_global_limit(cls, value): + if value is None: + value = PARALLEL_LIMIT + cls.global_limit = value + + def __init__(self, value=None): + if value is None: + value = self.global_limit + self.limit = value + + def allocate(self): + with self.inner_limiter: + limit = self.limit // 2 + self.limit -= limit + return limit + + def remaining(self): + return self.limit + + def parallel_execute(objects, func, get_name, msg, get_deps=None, limit=None, fail_check=None): """Runs func on objects in parallel while ensuring that func is ran on object only after it is ran on all its dependencies. @@ -165,6 +195,12 @@ def parallel_execute_iter(objects, func, get_deps, limit): if limit is None: limiter = NoLimit() + elif isinstance(limit, CascadeLimiter): + available_limit = limit.allocate() + if available_limit == 0: + raise ParallelLimitExceededByCascadeCalls() + limiter = Semaphore(available_limit) + func = partial(func, limit=limit.remaining()) else: limiter = Semaphore(limit) diff --git a/compose/project.py b/compose/project.py index 420cb6548bc..07434705675 100644 --- a/compose/project.py +++ b/compose/project.py @@ -24,6 +24,7 @@ from .network import build_networks from .network import get_networks from .network import ProjectNetworks +from .parallel import CascadeLimiter from .progress_stream import read_status from .service import BuildAction from .service import ContainerIpcMode @@ -646,6 +647,7 @@ def get_deps(service): operator.attrgetter('name'), None, get_deps, + limit=CascadeLimiter(), ) if errors: raise ProjectError( diff --git a/requirements-dev.txt b/requirements-dev.txt index 9cc00c64e85..aee42d8631f 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -7,4 +7,5 @@ mock==3.0.5 pytest==6.0.1; python_version >= '3.5' pytest==4.6.5; python_version < '3.5' pytest-cov==2.10.1 +pytest-timeout==1.4.2 PyYAML==5.3.1 diff --git a/tests/unit/parallel_test.py b/tests/unit/parallel_test.py index 98412f9a259..67f6026371d 100644 --- a/tests/unit/parallel_test.py +++ b/tests/unit/parallel_test.py @@ -1,11 +1,14 @@ import unittest from threading import Lock +import pytest from docker.errors import APIError +from compose.parallel import CascadeLimiter from compose.parallel import GlobalLimit from compose.parallel import parallel_execute from compose.parallel import parallel_execute_iter +from compose.parallel import ParallelLimitExceededByCascadeCalls from compose.parallel import ParallelStreamWriter from compose.parallel import UpstreamError @@ -88,6 +91,151 @@ def f(obj): assert results == tasks * [None] assert errors == {} + @pytest.mark.timeout(10) + def test_parallel_cascade_execute_with_global_limit(self): + COMPOSE_PARALLEL_LIMIT = 2 + + GlobalLimit.set_global_limit(COMPOSE_PARALLEL_LIMIT) + CascadeLimiter.set_global_limit(COMPOSE_PARALLEL_LIMIT) + self.addCleanup(GlobalLimit.set_global_limit, None) + self.addCleanup(CascadeLimiter.set_global_limit, None) + + tasks = COMPOSE_PARALLEL_LIMIT + inner_tasks = 2 + + lock = Lock() + + def inner_f(obj): + locked = lock.acquire(False) + # we should always get the lock because we're the only thread running + assert locked + lock.release() + return None + + def outer_f(obj, limit): + return parallel_execute( + objects=list(range(inner_tasks)), + func=inner_f, + get_name=str, + msg="Testing cascade parallel level 1 func", + limit=limit + ) + + results, errors = parallel_execute( + objects=list(range(tasks)), + func=outer_f, + get_name=str, + msg="Testing cascade parallel level 0 func", + limit=CascadeLimiter() + ) + + assert results == tasks * [(inner_tasks * [None], {})] + assert errors == {} + + @pytest.mark.timeout(10) + def test_parallel_multiple_cascade_execute_with_global_limit(self): + COMPOSE_PARALLEL_LIMIT = 3 + + GlobalLimit.set_global_limit(COMPOSE_PARALLEL_LIMIT) + CascadeLimiter.set_global_limit(COMPOSE_PARALLEL_LIMIT) + self.addCleanup(GlobalLimit.set_global_limit, None) + self.addCleanup(CascadeLimiter.set_global_limit, None) + + tasks = COMPOSE_PARALLEL_LIMIT + middle_tasks = 2 + inner_tasks = 2 + + lock = Lock() + + def inner_f(obj): + locked = lock.acquire(False) + # we should always get the lock because we're the only thread running + assert locked + lock.release() + return None + + def middle_f(obj, limit): + return parallel_execute( + objects=list(range(inner_tasks)), + func=inner_f, + get_name=str, + msg="Testing cascade parallel level 2 func", + limit=limit + ) + + def outer_f(obj, limit): + return parallel_execute( + objects=list(range(middle_tasks)), + func=middle_f, + get_name=str, + msg="Testing cascade parallel level 1 func", + limit=CascadeLimiter(limit) + ) + limiter = CascadeLimiter() + results, errors = parallel_execute( + objects=list(range(tasks)), + func=outer_f, + get_name=str, + msg="Testing cascade parallel level 0 func", + limit=limiter + ) + assert results == tasks * [(inner_tasks * [(middle_tasks * [None], {})], {})] + assert errors == {} + + @pytest.mark.timeout(10) + def test_parallel_cascade_execute_with_global_limit_less_than_call_stack_height(self): + COMPOSE_PARALLEL_LIMIT = 2 + + GlobalLimit.set_global_limit(COMPOSE_PARALLEL_LIMIT) + CascadeLimiter.set_global_limit(COMPOSE_PARALLEL_LIMIT) + self.addCleanup(GlobalLimit.set_global_limit, None) + self.addCleanup(CascadeLimiter.set_global_limit, None) + + tasks = COMPOSE_PARALLEL_LIMIT + middle_tasks = 2 + inner_tasks = 2 + + lock = Lock() + + def inner_f(obj): + locked = lock.acquire(False) + # we should always get the lock because we're the only thread running + assert locked + lock.release() + return None + + def middle_f(obj, limit): + print('middle limit:', limit) + return parallel_execute( + objects=list(range(inner_tasks)), + func=inner_f, + get_name=str, + msg="Testing cascade parallel level 2 func", + limit=limit + ) + + def outer_f(obj, limit): + print('outer limit:', limit) + return parallel_execute( + objects=list(range(middle_tasks)), + func=middle_f, + get_name=str, + msg="Testing cascade parallel level 1 func", + limit=CascadeLimiter(limit) + ) + + with pytest.raises(ParallelLimitExceededByCascadeCalls): + results, errors = parallel_execute( + objects=list(range(tasks)), + func=outer_f, + get_name=str, + msg="Testing cascade parallel level 0 func", + limit=CascadeLimiter() + ) + + assert results == tasks * [(inner_tasks * [(middle_tasks * [None], {})], {})] + assert errors == {} + def test_parallel_execute_with_deps(self): log = []