Skip to content

Commit

Permalink
sam build - image package type - streaming progress bar (#3215)
Browse files Browse the repository at this point in the history
* fix: add log streaming to `sam build` - IMAGE functions

* fix: get closer to 95% test coverage

* fix: address comments

- Add type hinting
- Add additional tests

* fix: make mypy and lint happy

* fix: address feedback
  • Loading branch information
sriram-mv authored Sep 13, 2021
1 parent 8804d41 commit ac47ffc
Show file tree
Hide file tree
Showing 14 changed files with 224 additions and 89 deletions.
6 changes: 4 additions & 2 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
[run]
branch = True
# TODO: Remove the plugins_interfaces.py omission when new interfaces are used
omit = samcli/lib/iac/plugins_interfaces.py
omit =
# TODO: Remove the plugins_interfaces.py omission when new interfaces are used
samcli/lib/iac/plugins_interfaces.py
samcli/lib/init/templates/*
[report]
exclude_lines =
pragma: no cover
Expand Down
4 changes: 2 additions & 2 deletions samcli/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
python -m samcli
"""

from samcli.cli.main import cli
from samcli.cli.main import cli # pragma: no cover

if __name__ == "__main__":
if __name__ == "__main__": # pragma: no cover
# NOTE(TheSriram): prog_name is always set to "sam". This way when the CLI is invoked as a module,
# the help text that is generated still says "sam" instead of "__main__".
cli(prog_name="sam")
2 changes: 1 addition & 1 deletion samcli/commands/bootstrap/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
"""

# Expose the cli object here
from .command import cli # noqa
from .command import cli # pragma: no cover
2 changes: 1 addition & 1 deletion samcli/commands/bootstrap/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ def cli(ctx):
do_cli(ctx.region, ctx.profile) # pragma: no cover


def do_cli(region, profile):
def do_cli(region, profile): # pragma: no cover
bucket_name = bootstrap.manage_stack(profile=profile, region=region)
click.echo("Source Bucket: " + bucket_name)
19 changes: 7 additions & 12 deletions samcli/lib/build/app_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
ParallelBuildStrategy,
BuildStrategy,
)
from samcli.lib.docker.log_streamer import LogStreamer, LogStreamError
from samcli.lib.providers.provider import ResourcesToBuildCollector, Function, get_full_path, Stack, LayerVersion
from samcli.lib.providers.sam_base_provider import SamBaseProvider
from samcli.lib.utils.colors import Colored
Expand Down Expand Up @@ -118,7 +119,7 @@ def __init__(
self._container_manager = container_manager
self._parallel = parallel
self._mode = mode
self._stream_writer = stream_writer if stream_writer else StreamWriter(osutils.stderr())
self._stream_writer = stream_writer if stream_writer else StreamWriter(stream=osutils.stderr(), auto_flush=True)
self._docker_client = docker_client if docker_client else docker.from_env()

self._deprecated_runtimes = {"nodejs4.3", "nodejs6.10", "nodejs8.10", "dotnetcore2.0"}
Expand Down Expand Up @@ -366,17 +367,11 @@ def _stream_lambda_image_build_logs(self, build_logs: List[Dict[str, str]], func
function_name str
Name of the function that is being built
"""
for log in build_logs:
if log:
log_stream = log.get("stream")
error_stream = log.get("error")

if error_stream:
raise DockerBuildFailed(f"{function_name} failed to build: {error_stream}")

if log_stream:
self._stream_writer.write(str.encode(log_stream))
self._stream_writer.flush()
build_log_streamer = LogStreamer(self._stream_writer)
try:
build_log_streamer.stream_progress(build_logs)
except LogStreamError as ex:
raise DockerBuildFailed(msg=f"{function_name} failed to build: {str(ex)}") from ex

def _build_layer(
self,
Expand Down
Empty file added samcli/lib/docker/__init__.py
Empty file.
93 changes: 93 additions & 0 deletions samcli/lib/docker/log_streamer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""
Log streaming utilities when streaming logs from Docker
"""
import os
from typing import Dict

import docker

from samcli.lib.package.stream_cursor_utils import (
CursorUpFormatter,
CursorDownFormatter,
CursorLeftFormatter,
ClearLineFormatter,
)
from samcli.lib.utils.stream_writer import StreamWriter


class LogStreamError(Exception):
def __init__(self, msg: str) -> None:
Exception.__init__(self, msg)


class LogStreamer:
def __init__(self, stream: StreamWriter):
self._stream = stream
self._cursor_up_formatter = CursorUpFormatter()
self._cursor_down_formatter = CursorDownFormatter()
self._cursor_left_formatter = CursorLeftFormatter()
self._cursor_clear_formatter = ClearLineFormatter()

def stream_progress(self, logs: docker.APIClient.logs):
"""
Stream progress from docker push logs and move the cursor based on the log id.
:param logs: generator from docker_clent.APIClient.logs
"""
ids: Dict[str, int] = dict()
for log in logs:
_id = log.get("id", "")
status = log.get("status", "")
stream = log.get("stream", "")
progress = log.get("progress", "")
error = log.get("error", "")
change_cursor_count = 0
if _id:
if _id not in ids:
ids[_id] = len(ids)
else:
curr_log_line_id = ids[_id]
change_cursor_count = len(ids) - curr_log_line_id
self._stream.write(
self._cursor_up_formatter.cursor_format(change_cursor_count)
+ self._cursor_left_formatter.cursor_format(),
encode=True,
)

self._stream_write(_id, status, stream, progress, error)

if _id:
self._stream.write(
self._cursor_down_formatter.cursor_format(change_cursor_count)
+ self._cursor_left_formatter.cursor_format(),
encode=True,
)
self._stream.write(os.linesep, encode=True)

def _stream_write(self, _id: str, status: str, stream: bytes, progress: str, error: str):
"""
Write stream information to stderr, if the stream information contains a log id,
use the carriage return character to rewrite that particular line.
:param _id: docker log id
:param status: docker log status
:param stream: stream, usually stderr
:param progress: docker log progress
:param error: docker log error
"""
if error:
raise LogStreamError(msg=error)
if not status and not stream:
return

# NOTE(sriram-mv): Required for the purposes of when the cursor overflows existing terminal buffer.
if not stream:
self._stream.write(os.linesep, encode=True)
self._stream.write(
self._cursor_up_formatter.cursor_format() + self._cursor_left_formatter.cursor_format(), encode=True
)
self._stream.write(self._cursor_clear_formatter.cursor_format(), encode=True)

if not _id:
self._stream.write(stream, encode=True)
self._stream.write(status, encode=True)
else:
self._stream.write(f"\r{_id}: {status} {progress}", encode=True)
61 changes: 4 additions & 57 deletions samcli/lib/package/ecr_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"""
import logging
import base64
import os

from typing import Dict
import click
Expand All @@ -18,8 +17,8 @@
ECRAuthorizationError,
DeleteArtifactFailedError,
)
from samcli.lib.docker.log_streamer import LogStreamer, LogStreamError
from samcli.lib.package.image_utils import tag_translation
from samcli.lib.package.stream_cursor_utils import cursor_up, cursor_left, cursor_down, clear_line
from samcli.lib.utils.osutils import stderr
from samcli.lib.utils.stream_writer import StreamWriter

Expand All @@ -41,6 +40,7 @@ def __init__(self, docker_client, ecr_client, ecr_repo, ecr_repo_multi, tag="lat
self.tag = tag
self.auth_config = {}
self.stream = StreamWriter(stream=stream, auto_flush=True)
self.log_streamer = LogStreamer(stream=self.stream)
self.login_session_active = False

def login(self):
Expand Down Expand Up @@ -83,9 +83,9 @@ def upload(self, image, resource_name):
push_logs = self.docker_client.api.push(
repository=repository, tag=_tag, auth_config=self.auth_config, stream=True, decode=True
)
self._stream_progress(push_logs)
self.log_streamer.stream_progress(push_logs)

except (BuildError, APIError) as ex:
except (BuildError, APIError, LogStreamError) as ex:
raise DockerPushFailedError(msg=str(ex)) from ex

return f"{repository}:{_tag}"
Expand Down Expand Up @@ -170,56 +170,3 @@ def parse_image_url(image_uri: str) -> Dict:
result["image_tag"] = repo_image_tag_split[1] if len(repo_image_tag_split) > 1 else "latest"

return result

# TODO: move this to a generic class to allow for streaming logs back from docker.
def _stream_progress(self, logs):
"""
Stream progress from docker push logs and move the cursor based on the log id.
:param logs: generator from docker_clent.api.push
"""
ids = dict()
for log in logs:
_id = log.get("id", None)
status = log.get("status", None)
progress = log.get("progress", "")
error = log.get("error", "")
change_cursor_count = 0
if _id:
try:
curr_log_line_id = ids[_id]
change_cursor_count = len(ids) - curr_log_line_id
self.stream.write((cursor_up(change_cursor_count) + cursor_left).encode())
except KeyError:
ids[_id] = len(ids)
else:
ids = dict()

self._stream_write(_id, status, progress, error)

if _id:
self.stream.write((cursor_down(change_cursor_count) + cursor_left).encode())
self.stream.write(os.linesep.encode())

def _stream_write(self, _id, status, progress, error):
"""
Write stream information to stderr, if the stream information contains a log id,
use the carraige return character to rewrite that particular line.
:param _id: docker log id
:param status: docker log status
:param progress: docker log progress
:param error: docker log error
"""
if error:
raise DockerPushFailedError(msg=error)
if not status:
return

# NOTE(sriram-mv): Required for the purposes of when the cursor overflows existing terminal buffer.
self.stream.write(os.linesep.encode())
self.stream.write((cursor_up() + cursor_left).encode())
self.stream.write(clear_line().encode())

if not _id:
self.stream.write(f"{status}{os.linesep}".encode())
else:
self.stream.write(f"\r{_id}: {status} {progress}".encode())
47 changes: 40 additions & 7 deletions samcli/lib/package/stream_cursor_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,49 @@
pass


def cursor_up(count=1):
return ESC + str(count) + "A"
class CursorFormatter:
"""
Base class for defining how cursor is to be manipulated.
"""

def __init__(self):
pass

def cursor_format(self, count):
pass


class CursorUpFormatter(CursorFormatter):
"""
Class for formatting and outputting moving the cursor up within the stream of bytes.
"""

def cursor_format(self, count=0):
return ESC + str(count) + "A"


class CursorDownFormatter(CursorFormatter):
"""
Class for formatting and outputting moving the cursor down within the stream of bytes.
"""

def cursor_format(self, count=0):
return ESC + str(count) + "B"


def cursor_down(count=1):
return ESC + str(count) + "B"
class ClearLineFormatter(CursorFormatter):
"""
Class for formatting and outputting clearing the cursor within the stream of bytes.
"""

def cursor_format(self, count=0):
return ESC + str(count) + "K"

def clear_line():
return ESC + "0K"

class CursorLeftFormatter(CursorFormatter):
"""
Class for formatting and outputting moving the cursor left within the stream of bytes.
"""

cursor_left = ESC + "G"
def cursor_format(self, count=0):
return ESC + str(count) + "G"
8 changes: 6 additions & 2 deletions samcli/lib/utils/stream_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ def __init__(self, stream, auto_flush=False):
self._stream = stream
self._auto_flush = auto_flush

def write(self, output):
@property
def stream(self):
return self._stream

def write(self, output, encode=False):
"""
Writes specified text to the underlying stream
Expand All @@ -27,7 +31,7 @@ def write(self, output):
output bytes-like object
Bytes to write
"""
self._stream.write(output)
self._stream.write(output.encode() if encode else output)

if self._auto_flush:
self._stream.flush()
Expand Down
Empty file.
39 changes: 39 additions & 0 deletions tests/unit/lib/docker/test_log_streamer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from unittest import TestCase

from samcli.lib.utils.stream_writer import StreamWriter
from samcli.lib.utils.osutils import stderr
from samcli.lib.docker.log_streamer import LogStreamer


from docker.errors import APIError


class TestLogStreamer(TestCase):
def setUp(self):
self.stream = StreamWriter(stream=stderr(), auto_flush=True)
self.error_class = APIError
self.image = "image:v1"

def test_logstreamer_init(self):
LogStreamer(stream=self.stream)

def test_logstreamer_stream_progress(self):
log_streamer = LogStreamer(stream=self.stream)
log_streamer.stream_progress(
iter(
[
{"status": "Pushing to xyz"},
{"id": "1", "status": "Preparing", "progress": ""},
{"id": "2", "status": "Preparing", "progress": ""},
{"id": "3", "status": "Preparing", "progress": ""},
{"id": "1", "status": "Pushing", "progress": "[====> ]"},
{"id": "3", "status": "Pushing", "progress": "[====> ]"},
{"id": "2", "status": "Pushing", "progress": "[====> ]"},
{"id": "3", "status": "Pushed", "progress": "[========>]"},
{"id": "1", "status": "Pushed", "progress": "[========>]"},
{"id": "2", "status": "Pushed", "progress": "[========>]"},
{"status": f"image {self.image} pushed digest: a89q34f"},
{},
]
)
)
Loading

0 comments on commit ac47ffc

Please sign in to comment.