Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix enable_logging=True not working in DockerSwarmOperator #35677

Merged
merged 10 commits into from
Dec 6, 2023
48 changes: 31 additions & 17 deletions airflow/providers/docker/operators/docker_swarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
"""Run ephemeral Docker Swarm services."""
from __future__ import annotations

import re
from datetime import datetime
from time import sleep
from typing import TYPE_CHECKING

from docker import types
Expand Down Expand Up @@ -179,23 +182,34 @@ def _has_service_terminated(self) -> bool:
def _stream_logs_to_output(self) -> None:
if not self.service:
raise Exception("The 'service' should be initialized before!")
logs = self.cli.service_logs(
self.service["ID"], follow=True, stdout=True, stderr=True, is_tty=self.tty
)
line = ""
for log in logs:
try:
log = log.decode()
except UnicodeDecodeError:
continue
if log == "\n":
self.log.info(line)
line = ""
else:
line += log
# flush any remaining log stream
if line:
self.log.info(line)
last_line_logged, last_timestamp = "", 0

def stream_new_logs(last_line_logged, since=0):
logs = self.cli.service_logs(
self.service["ID"],
follow=False,
stdout=True,
stderr=True,
is_tty=self.tty,
since=since,
timestamps=True,
)
logs = b"".join(logs).decode().splitlines()
if last_line_logged in logs:
logs = logs[logs.index(last_line_logged) + 1 :]
for line in logs:
match = re.match(r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{6,}Z) (.*)", line)
timestamp, message = match.groups()
self.log.info(message)
# Floor nanoseconds to microseconds
last_timestamp = re.sub(r"(\.\d{6})\d+Z", r"\1Z", timestamp)
last_timestamp = datetime.strptime(last_timestamp, "%Y-%m-%dT%H:%M:%S.%fZ")
last_timestamp = last_timestamp.timestamp()
return last_line_logged, last_timestamp

while not self._has_service_terminated():
sleep(2)
last_line_logged, last_timestamp = stream_new_logs(last_line_logged, since=last_timestamp)

def on_kill(self) -> None:
if self.hook.client_created and self.service is not None:
Expand Down
8 changes: 4 additions & 4 deletions tests/providers/docker/operators/test_docker_swarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def _client_tasks_side_effect():
yield [{"Status": {"State": "complete"}}]

def _client_service_logs_effect():
yield b"Testing is awesome."
yield b"2023-12-05T00:00:00.000000000Z Testing is awesome."

client_mock = mock.Mock(spec=APIClient)
client_mock.create_service.return_value = {"ID": "some_id"}
Expand Down Expand Up @@ -98,8 +98,8 @@ def _client_service_logs_effect():
base_url="unix://var/run/docker.sock", tls=False, version="1.19", timeout=DEFAULT_TIMEOUT_SECONDS
)

client_mock.service_logs.assert_called_once_with(
"some_id", follow=True, stdout=True, stderr=True, is_tty=True
client_mock.service_logs.assert_called_with(
"some_id", follow=False, stdout=True, stderr=True, is_tty=True, since=0, timestamps=True
)

csargs, cskwargs = client_mock.create_service.call_args_list[0]
Expand All @@ -108,7 +108,7 @@ def _client_service_logs_effect():
assert cskwargs["labels"] == {"name": "airflow__adhoc_airflow__unittest"}
assert cskwargs["name"].startswith("airflow-")
assert cskwargs["mode"] == types.ServiceMode(mode="replicated", replicas=3)
assert client_mock.tasks.call_count == 5
assert client_mock.tasks.call_count == 6
client_mock.remove_service.assert_called_once_with("some_id")

@mock.patch("airflow.providers.docker.operators.docker_swarm.types")
Expand Down