diff --git a/podman_compose.py b/podman_compose.py index 3295bc32..2852ad1b 100755 --- a/podman_compose.py +++ b/podman_compose.py @@ -20,6 +20,8 @@ import asyncio.subprocess import signal +from multiprocessing import cpu_count + import shlex from asyncio import Task @@ -31,6 +33,12 @@ # import fnmatch # fnmatch.fnmatchcase(env, "*_HOST") + +from tqdm.asyncio import tqdm +# TODO: we need a do-nothing fallback for tqdm +#except ImportError: +# tqdm = asyncio + import yaml from dotenv import dotenv_values @@ -1158,29 +1166,28 @@ def flat_deps(services, with_extends=False): class Podman: - def __init__(self, compose, podman_path="podman", dry_run=False, semaphore: asyncio.Semaphore = asyncio.Semaphore(sys.maxsize)): + def __init__(self, compose, podman_path="podman", dry_run=False): self.compose = compose self.podman_path = podman_path self.dry_run = dry_run - self.semaphore = semaphore async def output(self, podman_args, cmd="", cmd_args=None): - async with self.semaphore: - cmd_args = cmd_args or [] - xargs = self.compose.get_podman_args(cmd) if cmd else [] - cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args - log(cmd_ls) - p = await asyncio.subprocess.create_subprocess_exec( - *cmd_ls, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE - ) + # NOTE: do we need pool.output? like pool.run + cmd_args = cmd_args or [] + xargs = self.compose.get_podman_args(cmd) if cmd else [] + cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args + log(cmd_ls) + p = await asyncio.subprocess.create_subprocess_exec( + *cmd_ls, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) - stdout_data, stderr_data = await p.communicate() - if p.returncode == 0: - return stdout_data - else: - raise subprocess.CalledProcessError(p.returncode, " ".join(cmd_ls), stderr_data) + stdout_data, stderr_data = await p.communicate() + if p.returncode == 0: + return stdout_data + else: + raise subprocess.CalledProcessError(p.returncode, " ".join(cmd_ls), stderr_data) def exec( self, @@ -1199,12 +1206,7 @@ async def run( podman_args, cmd="", cmd_args=None, - log_formatter=None, - *, - # Intentionally mutable default argument to hold references to tasks - task_reference=set() - ) -> int: - async with self.semaphore: + log_formatter=None) -> int: cmd_args = list(map(str, cmd_args or [])) xargs = self.compose.get_podman_args(cmd) if cmd else [] cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args @@ -1225,16 +1227,9 @@ async def format_out(stdout): p = await asyncio.subprocess.create_subprocess_exec( *cmd_ls, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) # pylint: disable=consider-using-with - - # This is hacky to make the tasks not get garbage collected - # https://github.com/python/cpython/issues/91887 out_t = asyncio.create_task(format_out(p.stdout)) - task_reference.add(out_t) - out_t.add_done_callback(task_reference.discard) - err_t = asyncio.create_task(format_out(p.stderr)) - task_reference.add(err_t) - err_t.add_done_callback(task_reference.discard) + else: p = await asyncio.subprocess.create_subprocess_exec(*cmd_ls) # pylint: disable=consider-using-with @@ -1273,6 +1268,34 @@ async def volume_ls(self, proj=None): volumes = output.splitlines() return volumes +class Pool: + def __init__(self, podman: Podman, parallel): + self.podman: Podman = podman + self.semaphore = asyncio.Semaphore(parallel) if isinstance(parallel, int) else parallel + self.tasks = [] + + def create_task(self, coro, *, name=None, context=None): + return self.tasks.append(asyncio.create_task(coro, name=name, context=context)) + + def run(self, *args, name=None, **kwargs): + return self.create_task(self._run_one(*args, **kwargs), name=name) + + async def _run_one(self, *args, **kwargs) -> int: + async with self.semaphore: + return await self.podman.run(*args, **kwargs) + + async def join(self, *, desc="joining enqueued tasks") -> int: + if not self.tasks: return 0 + ls = await tqdm.gather(*self.tasks, desc=desc) + failed = [ i for i in ls if i != 0] + del self.tasks[:] + count = len(failed) + if count>1: + log(f"** EE ** got multiple failures: [{count}] failures") + if failed: + log(f"** EE ** retcode: [{failed[0]}]") + return failed[0] + return 0 def normalize_service(service, sub_dir=""): if "build" in service: @@ -1469,6 +1492,7 @@ def dotenv_to_dict(dotenv_path): class PodmanCompose: def __init__(self): self.podman = None + self.pool = None self.podman_version = None self.environ = {} self.exit_code = None @@ -1529,7 +1553,8 @@ async def run(self): if args.dry_run is False: log(f"Binary {podman_path} has not been found.") sys.exit(1) - self.podman = Podman(self, podman_path, args.dry_run, asyncio.Semaphore(args.parallel)) + self.podman = Podman(self, podman_path, args.dry_run) + self.pool = Pool(self.podman, args.parallel) if not args.dry_run: # just to make sure podman is running @@ -1553,6 +1578,7 @@ async def run(self): self._parse_compose_file() cmd = self.commands[cmd_name] retcode = await cmd(self, args) + print("retcode", retcode) if isinstance(retcode, int): sys.exit(retcode) @@ -1934,7 +1960,7 @@ def _init_global_parser(parser): parser.add_argument( "--parallel", type=int, - default=os.environ.get("COMPOSE_PARALLEL_LIMIT", sys.maxsize) + default=os.environ.get("COMPOSE_PARALLEL_LIMIT", 2*cpu_count()) ) @@ -2101,7 +2127,7 @@ async def compose_systemd(compose, args): @cmd_run(podman_compose, "pull", "pull stack images") -async def compose_pull(compose, args): +async def compose_pull(compose: PodmanCompose, args): img_containers = [cnt for cnt in compose.containers if "image" in cnt] if args.services: services = set(args.services) @@ -2111,7 +2137,13 @@ async def compose_pull(compose, args): local_images = {cnt["image"] for cnt in img_containers if is_local(cnt)} images -= local_images - await asyncio.gather(*[compose.podman.run([], "pull", [image]) for image in images]) + images_process = tqdm(images, desc="pulling images") + for image in images_process: + # images_process.set_postfix_str(image) + compose.pool.run([], "pull", [image]) + # uncomment to see how progress work + # await asyncio.sleep(1) + return await compose.pool.join(desc="waiting pull") @cmd_run(podman_compose, "push", "push stack images") @@ -2191,27 +2223,20 @@ async def build_one(compose, args, cnt): @cmd_run(podman_compose, "build", "build stack images") -async def compose_build(compose, args): - tasks = [] +async def compose_build(compose: PodmanCompose, args): if args.services: container_names_by_service = compose.container_names_by_service compose.assert_services(args.services) - for service in args.services: + for service in tqdm(args.services, desc="building"): cnt = compose.container_by_name[container_names_by_service[service][0]] - tasks.append(asyncio.create_task(build_one(compose, args, cnt))) + compose.pool.create_task(build_one(compose, args, cnt)) else: - for cnt in compose.containers: - tasks.append(asyncio.create_task(build_one(compose, args, cnt))) - - status = 0 - for t in asyncio.as_completed(tasks): - s = await t - if s is not None: - status = s + for cnt in tqdm(compose.containers, desc="building"): + compose.pool.create_task(build_one(compose, args, cnt)) - return status + return await compose.pool.join("waiting build") async def create_pods(compose, args): # pylint: disable=unused-argument @@ -2366,15 +2391,13 @@ def get_volume_names(compose, cnt): @cmd_run(podman_compose, "down", "tear down entire stack") -async def compose_down(compose, args): +async def compose_down(compose: PodmanCompose, args): excluded = get_excluded(compose, args) podman_args = [] timeout_global = getattr(args, "timeout", None) containers = list(reversed(compose.containers)) - down_tasks = [] - - for cnt in containers: + for cnt in tqdm(containers, "stopping ..."): if cnt["_service"] in excluded: continue podman_stop_args = [*podman_args] @@ -2384,8 +2407,9 @@ async def compose_down(compose, args): timeout = str_to_seconds(timeout_str) if timeout is not None: podman_stop_args.extend(["-t", str(timeout)]) - down_tasks.append(asyncio.create_task(compose.podman.run([], "stop", [*podman_stop_args, cnt["name"]]), name=cnt["name"])) - await asyncio.gather(*down_tasks) + compose.pool.run([], "stop", [*podman_stop_args, cnt["name"]], name=cnt["name"]) + + await compose.pool.join(desc="waiting to be stopped") for cnt in containers: if cnt["_service"] in excluded: continue @@ -2407,7 +2431,7 @@ async def compose_down(compose, args): .splitlines() ) for name in names: - await compose.podman.run([], "stop", [*podman_args, name]) + compose.podman.run([], "stop", [*podman_args, name]) for name in names: await compose.podman.run([], "rm", [name]) if args.volumes: diff --git a/tests/short/docker-compose.yaml b/tests/short/docker-compose.yaml index fdfc5fe0..c2886ed4 100644 --- a/tests/short/docker-compose.yaml +++ b/tests/short/docker-compose.yaml @@ -1,7 +1,7 @@ version: "3" services: redis: - image: redis:alpine + image: docker.io/library/redis:alpine command: ["redis-server", "--appendonly yes", "--notify-keyspace-events", "Ex"] volumes: - ./data/redis:/data:z @@ -12,7 +12,7 @@ services: - SECRET_KEY=aabbcc - ENV_IS_SET web: - image: busybox + image: docker.io/library/busybox command: ["/bin/busybox", "httpd", "-f", "-h", "/var/www/html", "-p", "8000"] working_dir: /var/www/html volumes: @@ -21,19 +21,19 @@ services: - /run - /tmp web1: - image: busybox + image: docker.io/library/busybox command: ["/bin/busybox", "httpd", "-f", "-h", "/var/www/html", "-p", "8001"] working_dir: /var/www/html volumes: - ./data/web:/var/www/html:ro,z web2: - image: busybox + image: docker.io/library/busybox command: ["/bin/busybox", "httpd", "-f", "-h", "/var/www/html", "-p", "8002"] working_dir: /var/www/html volumes: - ~/Downloads/www:/var/www/html:ro,z web3: - image: busybox + image: docker.io/library/busybox command: ["/bin/busybox", "httpd", "-f", "-h", "/var/www/html", "-p", "8003"] working_dir: /var/www/html volumes: