diff --git a/airflow/providers/docker/operators/docker_swarm.py b/airflow/providers/docker/operators/docker_swarm.py index 7a8a41992124d..5622d874370d5 100644 --- a/airflow/providers/docker/operators/docker_swarm.py +++ b/airflow/providers/docker/operators/docker_swarm.py @@ -17,6 +17,9 @@ """Run ephemeral Docker Swarm services.""" from __future__ import annotations +import re +from datetime import datetime +from time import sleep from typing import TYPE_CHECKING from docker import types @@ -179,23 +182,34 @@ def _has_service_terminated(self) -> bool: def _stream_logs_to_output(self) -> None: if not self.service: raise Exception("The 'service' should be initialized before!") - logs = self.cli.service_logs( - self.service["ID"], follow=True, stdout=True, stderr=True, is_tty=self.tty - ) - line = "" - for log in logs: - try: - log = log.decode() - except UnicodeDecodeError: - continue - if log == "\n": - self.log.info(line) - line = "" - else: - line += log - # flush any remaining log stream - if line: - self.log.info(line) + last_line_logged, last_timestamp = "", 0 + + def stream_new_logs(last_line_logged, since=0): + logs = self.cli.service_logs( + self.service["ID"], + follow=False, + stdout=True, + stderr=True, + is_tty=self.tty, + since=since, + timestamps=True, + ) + logs = b"".join(logs).decode().splitlines() + if last_line_logged in logs: + logs = logs[logs.index(last_line_logged) + 1 :] + for line in logs: + match = re.match(r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{6,}Z) (.*)", line) + timestamp, message = match.groups() + self.log.info(message) + # Floor nanoseconds to microseconds + last_timestamp = re.sub(r"(\.\d{6})\d+Z", r"\1Z", timestamp) + last_timestamp = datetime.strptime(last_timestamp, "%Y-%m-%dT%H:%M:%S.%fZ") + last_timestamp = last_timestamp.timestamp() + return last_line_logged, last_timestamp + + while not self._has_service_terminated(): + sleep(2) + last_line_logged, last_timestamp = stream_new_logs(last_line_logged, since=last_timestamp) def on_kill(self) -> None: if self.hook.client_created and self.service is not None: diff --git a/tests/providers/docker/operators/test_docker_swarm.py b/tests/providers/docker/operators/test_docker_swarm.py index e14a7aa8b9687..7bc74749dcdbf 100644 --- a/tests/providers/docker/operators/test_docker_swarm.py +++ b/tests/providers/docker/operators/test_docker_swarm.py @@ -40,7 +40,7 @@ def _client_tasks_side_effect(): yield [{"Status": {"State": "complete"}}] def _client_service_logs_effect(): - yield b"Testing is awesome." + yield b"2023-12-05T00:00:00.000000000Z Testing is awesome." client_mock = mock.Mock(spec=APIClient) client_mock.create_service.return_value = {"ID": "some_id"} @@ -98,8 +98,8 @@ def _client_service_logs_effect(): base_url="unix://var/run/docker.sock", tls=False, version="1.19", timeout=DEFAULT_TIMEOUT_SECONDS ) - client_mock.service_logs.assert_called_once_with( - "some_id", follow=True, stdout=True, stderr=True, is_tty=True + client_mock.service_logs.assert_called_with( + "some_id", follow=False, stdout=True, stderr=True, is_tty=True, since=0, timestamps=True ) csargs, cskwargs = client_mock.create_service.call_args_list[0] @@ -108,7 +108,7 @@ def _client_service_logs_effect(): assert cskwargs["labels"] == {"name": "airflow__adhoc_airflow__unittest"} assert cskwargs["name"].startswith("airflow-") assert cskwargs["mode"] == types.ServiceMode(mode="replicated", replicas=3) - assert client_mock.tasks.call_count == 5 + assert client_mock.tasks.call_count == 6 client_mock.remove_service.assert_called_once_with("some_id") @mock.patch("airflow.providers.docker.operators.docker_swarm.types")