From 882108862dcaf08e7f5da519b3d186048d4ec7f9 Mon Sep 17 00:00:00 2001 From: stavdav <58971745+stavdav143@users.noreply.github.com> Date: Wed, 6 Dec 2023 23:07:43 +0100 Subject: [PATCH] Fix `enable_logging=True` not working in `DockerSwarmOperator` (#35677) * Fixes #28452: "TaskInstances do not succeed when using enable_logging=True option in DockerSwarmOperator" It introduces logging of Docker Swarm services which was previously not working. * tty=True/False to be chosen by the user as it was the case before this fix (#28452) * Follow=true for logs will always result in tasks not ending. This is standard and provided upstream by the Docker API. Therefore in DockerSwarmOperator follow is always false. * service_logs called multiple times as we continuously poll the Docker API for logs. As we indicated in the previous commmit, the docker client malfunctions when we try to get the logs with follow=True. Therefore we make multiple calls to the API (every 2 seconds), to fetch the new logs. * service_logs called multiple times. In this test the tasks increase (6 instead of 5) as we check if the service has terminated (+1). As this assertion makes less sense in a situation where we do multiple calls to the Docker API (polling), we might think of removing it or replacing it with something more suitable. * Final commit of this PR marking the test case that validates logging in the Docker Swarm Operator. We log two times a different message and we assert that the two lines are given back in the logs in the expected sequence. * Formatting ruff * Reverting as Github actions don't run this test as a swarm node: docker.errors.APIError: 503 Server Error for http+docker://localhost/v1.43/services/create: Service Unavailable ("This node is not a swarm manager. Use "docker swarm init" or "docker swarm join" to connect this node to swarm and try again.") Revert "Final commit of this PR marking the test case that validates logging in the Docker Swarm Operator. We log two times a different message and we assert that the two lines are given back in the logs in the expected sequence." This reverts commit 048ba1ece25fcd02c5ccc752d63759c76b098e44. * Logging "since" timestamp to avoid memory issues Fix for #28452 * Formatting - Fix for #28452 --- .../docker/operators/docker_swarm.py | 48 ++++++++++++------- .../docker/operators/test_docker_swarm.py | 8 ++-- 2 files changed, 35 insertions(+), 21 deletions(-) diff --git a/airflow/providers/docker/operators/docker_swarm.py b/airflow/providers/docker/operators/docker_swarm.py index 7a8a41992124d..5622d874370d5 100644 --- a/airflow/providers/docker/operators/docker_swarm.py +++ b/airflow/providers/docker/operators/docker_swarm.py @@ -17,6 +17,9 @@ """Run ephemeral Docker Swarm services.""" from __future__ import annotations +import re +from datetime import datetime +from time import sleep from typing import TYPE_CHECKING from docker import types @@ -179,23 +182,34 @@ def _has_service_terminated(self) -> bool: def _stream_logs_to_output(self) -> None: if not self.service: raise Exception("The 'service' should be initialized before!") - logs = self.cli.service_logs( - self.service["ID"], follow=True, stdout=True, stderr=True, is_tty=self.tty - ) - line = "" - for log in logs: - try: - log = log.decode() - except UnicodeDecodeError: - continue - if log == "\n": - self.log.info(line) - line = "" - else: - line += log - # flush any remaining log stream - if line: - self.log.info(line) + last_line_logged, last_timestamp = "", 0 + + def stream_new_logs(last_line_logged, since=0): + logs = self.cli.service_logs( + self.service["ID"], + follow=False, + stdout=True, + stderr=True, + is_tty=self.tty, + since=since, + timestamps=True, + ) + logs = b"".join(logs).decode().splitlines() + if last_line_logged in logs: + logs = logs[logs.index(last_line_logged) + 1 :] + for line in logs: + match = re.match(r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{6,}Z) (.*)", line) + timestamp, message = match.groups() + self.log.info(message) + # Floor nanoseconds to microseconds + last_timestamp = re.sub(r"(\.\d{6})\d+Z", r"\1Z", timestamp) + last_timestamp = datetime.strptime(last_timestamp, "%Y-%m-%dT%H:%M:%S.%fZ") + last_timestamp = last_timestamp.timestamp() + return last_line_logged, last_timestamp + + while not self._has_service_terminated(): + sleep(2) + last_line_logged, last_timestamp = stream_new_logs(last_line_logged, since=last_timestamp) def on_kill(self) -> None: if self.hook.client_created and self.service is not None: diff --git a/tests/providers/docker/operators/test_docker_swarm.py b/tests/providers/docker/operators/test_docker_swarm.py index e14a7aa8b9687..7bc74749dcdbf 100644 --- a/tests/providers/docker/operators/test_docker_swarm.py +++ b/tests/providers/docker/operators/test_docker_swarm.py @@ -40,7 +40,7 @@ def _client_tasks_side_effect(): yield [{"Status": {"State": "complete"}}] def _client_service_logs_effect(): - yield b"Testing is awesome." + yield b"2023-12-05T00:00:00.000000000Z Testing is awesome." client_mock = mock.Mock(spec=APIClient) client_mock.create_service.return_value = {"ID": "some_id"} @@ -98,8 +98,8 @@ def _client_service_logs_effect(): base_url="unix://var/run/docker.sock", tls=False, version="1.19", timeout=DEFAULT_TIMEOUT_SECONDS ) - client_mock.service_logs.assert_called_once_with( - "some_id", follow=True, stdout=True, stderr=True, is_tty=True + client_mock.service_logs.assert_called_with( + "some_id", follow=False, stdout=True, stderr=True, is_tty=True, since=0, timestamps=True ) csargs, cskwargs = client_mock.create_service.call_args_list[0] @@ -108,7 +108,7 @@ def _client_service_logs_effect(): assert cskwargs["labels"] == {"name": "airflow__adhoc_airflow__unittest"} assert cskwargs["name"].startswith("airflow-") assert cskwargs["mode"] == types.ServiceMode(mode="replicated", replicas=3) - assert client_mock.tasks.call_count == 5 + assert client_mock.tasks.call_count == 6 client_mock.remove_service.assert_called_once_with("some_id") @mock.patch("airflow.providers.docker.operators.docker_swarm.types")