Skip to content

Commit

Permalink
Fix enable_logging=True not working in DockerSwarmOperator (#35677)
Browse files Browse the repository at this point in the history
* Fixes #28452: "TaskInstances do not succeed when using enable_logging=True option in DockerSwarmOperator"
It introduces logging of Docker Swarm services which was previously not
working.

* tty=True/False to be chosen by the user as it was the case before this fix (#28452)

* Follow=true for logs will always result in tasks not ending. This is  standard and provided upstream by the Docker API.
Therefore in DockerSwarmOperator follow is always false.

* service_logs called multiple times as we continuously poll the Docker API for logs. As we indicated in the previous commmit,
the docker client malfunctions when we try to get the logs with follow=True. Therefore we make multiple calls to the API (every 2 seconds), to fetch the new logs.

* service_logs called multiple times. In this test the tasks increase (6 instead of 5) as we check if the service has terminated (+1). As this assertion makes less sense in a situation where we do multiple calls to the Docker API (polling), we might think of removing it or replacing it with something more suitable.

* Final commit of this PR marking the test case that validates logging in the Docker Swarm Operator. We log two times a different message and we assert that the two lines are given back in the logs in the expected sequence.

* Formatting ruff

* Reverting as Github actions don't run this test as a swarm node:

docker.errors.APIError: 503 Server Error for http+docker://localhost/v1.43/services/create: Service Unavailable ("This node is not a swarm manager. Use "docker swarm init" or "docker swarm join" to connect this node to swarm and try again.")

Revert "Final commit of this PR marking the test case that validates logging in the Docker Swarm Operator. We log two times a different message and we assert that the two lines are given back in the logs in the expected sequence."

This reverts commit 048ba1e.

* Logging "since" timestamp to avoid memory issues
Fix for #28452

* Formatting - Fix for #28452
  • Loading branch information
stavdav143 authored Dec 6, 2023
1 parent 3bb5978 commit 8821088
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 21 deletions.
48 changes: 31 additions & 17 deletions airflow/providers/docker/operators/docker_swarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
"""Run ephemeral Docker Swarm services."""
from __future__ import annotations

import re
from datetime import datetime
from time import sleep
from typing import TYPE_CHECKING

from docker import types
Expand Down Expand Up @@ -179,23 +182,34 @@ def _has_service_terminated(self) -> bool:
def _stream_logs_to_output(self) -> None:
if not self.service:
raise Exception("The 'service' should be initialized before!")
logs = self.cli.service_logs(
self.service["ID"], follow=True, stdout=True, stderr=True, is_tty=self.tty
)
line = ""
for log in logs:
try:
log = log.decode()
except UnicodeDecodeError:
continue
if log == "\n":
self.log.info(line)
line = ""
else:
line += log
# flush any remaining log stream
if line:
self.log.info(line)
last_line_logged, last_timestamp = "", 0

def stream_new_logs(last_line_logged, since=0):
logs = self.cli.service_logs(
self.service["ID"],
follow=False,
stdout=True,
stderr=True,
is_tty=self.tty,
since=since,
timestamps=True,
)
logs = b"".join(logs).decode().splitlines()
if last_line_logged in logs:
logs = logs[logs.index(last_line_logged) + 1 :]
for line in logs:
match = re.match(r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{6,}Z) (.*)", line)
timestamp, message = match.groups()
self.log.info(message)
# Floor nanoseconds to microseconds
last_timestamp = re.sub(r"(\.\d{6})\d+Z", r"\1Z", timestamp)
last_timestamp = datetime.strptime(last_timestamp, "%Y-%m-%dT%H:%M:%S.%fZ")
last_timestamp = last_timestamp.timestamp()
return last_line_logged, last_timestamp

while not self._has_service_terminated():
sleep(2)
last_line_logged, last_timestamp = stream_new_logs(last_line_logged, since=last_timestamp)

def on_kill(self) -> None:
if self.hook.client_created and self.service is not None:
Expand Down
8 changes: 4 additions & 4 deletions tests/providers/docker/operators/test_docker_swarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def _client_tasks_side_effect():
yield [{"Status": {"State": "complete"}}]

def _client_service_logs_effect():
yield b"Testing is awesome."
yield b"2023-12-05T00:00:00.000000000Z Testing is awesome."

client_mock = mock.Mock(spec=APIClient)
client_mock.create_service.return_value = {"ID": "some_id"}
Expand Down Expand Up @@ -98,8 +98,8 @@ def _client_service_logs_effect():
base_url="unix://var/run/docker.sock", tls=False, version="1.19", timeout=DEFAULT_TIMEOUT_SECONDS
)

client_mock.service_logs.assert_called_once_with(
"some_id", follow=True, stdout=True, stderr=True, is_tty=True
client_mock.service_logs.assert_called_with(
"some_id", follow=False, stdout=True, stderr=True, is_tty=True, since=0, timestamps=True
)

csargs, cskwargs = client_mock.create_service.call_args_list[0]
Expand All @@ -108,7 +108,7 @@ def _client_service_logs_effect():
assert cskwargs["labels"] == {"name": "airflow__adhoc_airflow__unittest"}
assert cskwargs["name"].startswith("airflow-")
assert cskwargs["mode"] == types.ServiceMode(mode="replicated", replicas=3)
assert client_mock.tasks.call_count == 5
assert client_mock.tasks.call_count == 6
client_mock.remove_service.assert_called_once_with("some_id")

@mock.patch("airflow.providers.docker.operators.docker_swarm.types")
Expand Down

0 comments on commit 8821088

Please sign in to comment.