Skip to content

Commit

Permalink
add TQDM and implement pool.join
Browse files Browse the repository at this point in the history
  • Loading branch information
muayyad-alsadi committed Feb 4, 2024
1 parent c5be5ba commit 7bc9316
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 60 deletions.
134 changes: 79 additions & 55 deletions podman_compose.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import asyncio.subprocess
import signal

from multiprocessing import cpu_count

import shlex
from asyncio import Task

Expand All @@ -31,6 +33,12 @@
# import fnmatch
# fnmatch.fnmatchcase(env, "*_HOST")


from tqdm.asyncio import tqdm
# TODO: we need a do-nothing fallback for tqdm
#except ImportError:
# tqdm = asyncio

import yaml
from dotenv import dotenv_values

Expand Down Expand Up @@ -1158,29 +1166,28 @@ def flat_deps(services, with_extends=False):


class Podman:
def __init__(self, compose, podman_path="podman", dry_run=False, semaphore: asyncio.Semaphore = asyncio.Semaphore(sys.maxsize)):
def __init__(self, compose, podman_path="podman", dry_run=False):
self.compose = compose
self.podman_path = podman_path
self.dry_run = dry_run
self.semaphore = semaphore

async def output(self, podman_args, cmd="", cmd_args=None):
async with self.semaphore:
cmd_args = cmd_args or []
xargs = self.compose.get_podman_args(cmd) if cmd else []
cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args
log(cmd_ls)
p = await asyncio.subprocess.create_subprocess_exec(
*cmd_ls,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
# NOTE: do we need pool.output? like pool.run
cmd_args = cmd_args or []
xargs = self.compose.get_podman_args(cmd) if cmd else []
cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args
log(cmd_ls)
p = await asyncio.subprocess.create_subprocess_exec(
*cmd_ls,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)

stdout_data, stderr_data = await p.communicate()
if p.returncode == 0:
return stdout_data
else:
raise subprocess.CalledProcessError(p.returncode, " ".join(cmd_ls), stderr_data)
stdout_data, stderr_data = await p.communicate()
if p.returncode == 0:
return stdout_data
else:
raise subprocess.CalledProcessError(p.returncode, " ".join(cmd_ls), stderr_data)

def exec(
self,
Expand All @@ -1199,12 +1206,7 @@ async def run(
podman_args,
cmd="",
cmd_args=None,
log_formatter=None,
*,
# Intentionally mutable default argument to hold references to tasks
task_reference=set()
) -> int:
async with self.semaphore:
log_formatter=None) -> int:
cmd_args = list(map(str, cmd_args or []))
xargs = self.compose.get_podman_args(cmd) if cmd else []
cmd_ls = [self.podman_path, *podman_args, cmd] + xargs + cmd_args
Expand All @@ -1225,16 +1227,9 @@ async def format_out(stdout):
p = await asyncio.subprocess.create_subprocess_exec(
*cmd_ls, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
) # pylint: disable=consider-using-with

# This is hacky to make the tasks not get garbage collected
# https://github.com/python/cpython/issues/91887
out_t = asyncio.create_task(format_out(p.stdout))
task_reference.add(out_t)
out_t.add_done_callback(task_reference.discard)

err_t = asyncio.create_task(format_out(p.stderr))
task_reference.add(err_t)
err_t.add_done_callback(task_reference.discard)


else:
p = await asyncio.subprocess.create_subprocess_exec(*cmd_ls) # pylint: disable=consider-using-with
Expand Down Expand Up @@ -1273,6 +1268,34 @@ async def volume_ls(self, proj=None):
volumes = output.splitlines()
return volumes

class Pool:
def __init__(self, podman: Podman, parallel):
self.podman: Podman = podman
self.semaphore = asyncio.Semaphore(parallel) if isinstance(parallel, int) else parallel
self.tasks = []

def create_task(self, coro, *, name=None, context=None):
return self.tasks.append(asyncio.create_task(coro, name=name, context=context))

def run(self, *args, name=None, **kwargs):
return self.create_task(self._run_one(*args, **kwargs), name=name)

async def _run_one(self, *args, **kwargs) -> int:
async with self.semaphore:
return await self.podman.run(*args, **kwargs)

async def join(self, *, desc="joining enqueued tasks") -> int:
if not self.tasks: return 0
ls = await tqdm.gather(*self.tasks, desc=desc)
failed = [ i for i in ls if i != 0]
del self.tasks[:]
count = len(failed)
if count>1:
log(f"** EE ** got multiple failures: [{count}] failures")
if failed:
log(f"** EE ** retcode: [{failed[0]}]")
return failed[0]
return 0

def normalize_service(service, sub_dir=""):
if "build" in service:
Expand Down Expand Up @@ -1469,6 +1492,7 @@ def dotenv_to_dict(dotenv_path):
class PodmanCompose:
def __init__(self):
self.podman = None
self.pool = None
self.podman_version = None
self.environ = {}
self.exit_code = None
Expand Down Expand Up @@ -1529,7 +1553,8 @@ async def run(self):
if args.dry_run is False:
log(f"Binary {podman_path} has not been found.")
sys.exit(1)
self.podman = Podman(self, podman_path, args.dry_run, asyncio.Semaphore(args.parallel))
self.podman = Podman(self, podman_path, args.dry_run)
self.pool = Pool(self.podman, args.parallel)

if not args.dry_run:
# just to make sure podman is running
Expand All @@ -1553,6 +1578,7 @@ async def run(self):
self._parse_compose_file()
cmd = self.commands[cmd_name]
retcode = await cmd(self, args)
print("retcode", retcode)
if isinstance(retcode, int):
sys.exit(retcode)

Expand Down Expand Up @@ -1934,7 +1960,7 @@ def _init_global_parser(parser):
parser.add_argument(
"--parallel",
type=int,
default=os.environ.get("COMPOSE_PARALLEL_LIMIT", sys.maxsize)
default=os.environ.get("COMPOSE_PARALLEL_LIMIT", 2*cpu_count())
)


Expand Down Expand Up @@ -2101,7 +2127,7 @@ async def compose_systemd(compose, args):


@cmd_run(podman_compose, "pull", "pull stack images")
async def compose_pull(compose, args):
async def compose_pull(compose: PodmanCompose, args):
img_containers = [cnt for cnt in compose.containers if "image" in cnt]
if args.services:
services = set(args.services)
Expand All @@ -2111,7 +2137,13 @@ async def compose_pull(compose, args):
local_images = {cnt["image"] for cnt in img_containers if is_local(cnt)}
images -= local_images

await asyncio.gather(*[compose.podman.run([], "pull", [image]) for image in images])
images_process = tqdm(images, desc="pulling images")
for image in images_process:
# images_process.set_postfix_str(image)
compose.pool.run([], "pull", [image])
# uncomment to see how progress work
# await asyncio.sleep(1)
return await compose.pool.join(desc="waiting pull")


@cmd_run(podman_compose, "push", "push stack images")
Expand Down Expand Up @@ -2191,27 +2223,20 @@ async def build_one(compose, args, cnt):


@cmd_run(podman_compose, "build", "build stack images")
async def compose_build(compose, args):
tasks = []
async def compose_build(compose: PodmanCompose, args):

if args.services:
container_names_by_service = compose.container_names_by_service
compose.assert_services(args.services)
for service in args.services:
for service in tqdm(args.services, desc="building"):
cnt = compose.container_by_name[container_names_by_service[service][0]]
tasks.append(asyncio.create_task(build_one(compose, args, cnt)))
compose.pool.create_task(build_one(compose, args, cnt))

else:
for cnt in compose.containers:
tasks.append(asyncio.create_task(build_one(compose, args, cnt)))

status = 0
for t in asyncio.as_completed(tasks):
s = await t
if s is not None:
status = s
for cnt in tqdm(compose.containers, desc="building"):
compose.pool.create_task(build_one(compose, args, cnt))

return status
return await compose.pool.join("waiting build")


async def create_pods(compose, args): # pylint: disable=unused-argument
Expand Down Expand Up @@ -2366,15 +2391,13 @@ def get_volume_names(compose, cnt):


@cmd_run(podman_compose, "down", "tear down entire stack")
async def compose_down(compose, args):
async def compose_down(compose: PodmanCompose, args):
excluded = get_excluded(compose, args)
podman_args = []
timeout_global = getattr(args, "timeout", None)
containers = list(reversed(compose.containers))

down_tasks = []

for cnt in containers:
for cnt in tqdm(containers, "stopping ..."):
if cnt["_service"] in excluded:
continue
podman_stop_args = [*podman_args]
Expand All @@ -2384,8 +2407,9 @@ async def compose_down(compose, args):
timeout = str_to_seconds(timeout_str)
if timeout is not None:
podman_stop_args.extend(["-t", str(timeout)])
down_tasks.append(asyncio.create_task(compose.podman.run([], "stop", [*podman_stop_args, cnt["name"]]), name=cnt["name"]))
await asyncio.gather(*down_tasks)
compose.pool.run([], "stop", [*podman_stop_args, cnt["name"]], name=cnt["name"])

await compose.pool.join(desc="waiting to be stopped")
for cnt in containers:
if cnt["_service"] in excluded:
continue
Expand All @@ -2407,7 +2431,7 @@ async def compose_down(compose, args):
.splitlines()
)
for name in names:
await compose.podman.run([], "stop", [*podman_args, name])
compose.podman.run([], "stop", [*podman_args, name])
for name in names:
await compose.podman.run([], "rm", [name])
if args.volumes:
Expand Down
10 changes: 5 additions & 5 deletions tests/short/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: "3"
services:
redis:
image: redis:alpine
image: docker.io/library/redis:alpine
command: ["redis-server", "--appendonly yes", "--notify-keyspace-events", "Ex"]
volumes:
- ./data/redis:/data:z
Expand All @@ -12,7 +12,7 @@ services:
- SECRET_KEY=aabbcc
- ENV_IS_SET
web:
image: busybox
image: docker.io/library/busybox
command: ["/bin/busybox", "httpd", "-f", "-h", "/var/www/html", "-p", "8000"]
working_dir: /var/www/html
volumes:
Expand All @@ -21,19 +21,19 @@ services:
- /run
- /tmp
web1:
image: busybox
image: docker.io/library/busybox
command: ["/bin/busybox", "httpd", "-f", "-h", "/var/www/html", "-p", "8001"]
working_dir: /var/www/html
volumes:
- ./data/web:/var/www/html:ro,z
web2:
image: busybox
image: docker.io/library/busybox
command: ["/bin/busybox", "httpd", "-f", "-h", "/var/www/html", "-p", "8002"]
working_dir: /var/www/html
volumes:
- ~/Downloads/www:/var/www/html:ro,z
web3:
image: busybox
image: docker.io/library/busybox
command: ["/bin/busybox", "httpd", "-f", "-h", "/var/www/html", "-p", "8003"]
working_dir: /var/www/html
volumes:
Expand Down

0 comments on commit 7bc9316

Please sign in to comment.