From 1f3e3dc4349bdfdf39e325280fb410c2626a06d0 Mon Sep 17 00:00:00 2001 From: Mehmet Nuri Deveci <5735811+mndeveci@users.noreply.github.com> Date: Fri, 7 May 2021 19:24:33 -0700 Subject: [PATCH 1/5] refactor logs command library --- samcli/commands/logs/command.py | 19 +- samcli/commands/logs/logs_context.py | 48 +-- samcli/lib/logs/event.py | 72 ---- samcli/lib/logs/fetcher.py | 145 -------- samcli/lib/logs/formatter.py | 181 ---------- .../lib/{logs => observability}/__init__.py | 0 .../lib/observability/cw_logs}/__init__.py | 0 .../observability/cw_logs/cw_log_consumers.py | 17 + .../lib/observability/cw_logs/cw_log_event.py | 40 +++ .../cw_logs/cw_log_formatters.py | 87 +++++ .../cw_logs/cw_log_group_provider.py} | 0 .../observability/cw_logs/cw_log_puller.py | 111 ++++++ .../observability_info_puller.py | 142 ++++++++ tests/unit/commands/logs/test_command.py | 43 +-- tests/unit/commands/logs/test_logs_context.py | 24 +- tests/unit/lib/logs/test_fetcher.py | 255 -------------- tests/unit/lib/logs/test_formatter.py | 164 --------- tests/unit/lib/observability/__init__.py | 0 .../lib/observability/cw_logs/__init__.py | 0 .../cw_logs/test_cw_log_consumers.py | 15 + .../cw_logs/test_cw_log_event.py} | 44 ++- .../cw_logs/test_cw_log_formatters.py | 120 +++++++ .../cw_logs/test_cw_log_group_provider.py} | 2 +- .../cw_logs/test_cw_log_puller.py | 322 ++++++++++++++++++ .../test_observability_info_puller.py | 50 +++ 25 files changed, 977 insertions(+), 924 deletions(-) delete mode 100644 samcli/lib/logs/event.py delete mode 100644 samcli/lib/logs/fetcher.py delete mode 100644 samcli/lib/logs/formatter.py rename samcli/lib/{logs => observability}/__init__.py (100%) rename {tests/unit/lib/logs => samcli/lib/observability/cw_logs}/__init__.py (100%) create mode 100644 samcli/lib/observability/cw_logs/cw_log_consumers.py create mode 100644 samcli/lib/observability/cw_logs/cw_log_event.py create mode 100644 samcli/lib/observability/cw_logs/cw_log_formatters.py rename samcli/lib/{logs/provider.py => observability/cw_logs/cw_log_group_provider.py} (100%) create mode 100644 samcli/lib/observability/cw_logs/cw_log_puller.py create mode 100644 samcli/lib/observability/observability_info_puller.py delete mode 100644 tests/unit/lib/logs/test_fetcher.py delete mode 100644 tests/unit/lib/logs/test_formatter.py create mode 100644 tests/unit/lib/observability/__init__.py create mode 100644 tests/unit/lib/observability/cw_logs/__init__.py create mode 100644 tests/unit/lib/observability/cw_logs/test_cw_log_consumers.py rename tests/unit/lib/{logs/test_event.py => observability/cw_logs/test_cw_log_event.py} (51%) create mode 100644 tests/unit/lib/observability/cw_logs/test_cw_log_formatters.py rename tests/unit/lib/{logs/test_provider.py => observability/cw_logs/test_cw_log_group_provider.py} (78%) create mode 100644 tests/unit/lib/observability/cw_logs/test_cw_log_puller.py create mode 100644 tests/unit/lib/observability/test_observability_info_puller.py diff --git a/samcli/commands/logs/command.py b/samcli/commands/logs/command.py index 03723c08bc..7042970a3a 100644 --- a/samcli/commands/logs/command.py +++ b/samcli/commands/logs/command.py @@ -111,24 +111,13 @@ def do_cli(function_name, stack_name, filter_pattern, tailing, start_time, end_t filter_pattern=filter_pattern, start_time=start_time, end_time=end_time, - # output_file is not yet supported by CLI - output_file=None, ) as context: if tailing: - events_iterable = context.fetcher.tail( - context.log_group_name, filter_pattern=context.filter_pattern, start=context.start_time - ) + context.fetcher.tail(start_time=context.start_time, filter_pattern=context.filter_pattern) else: - events_iterable = context.fetcher.fetch( - context.log_group_name, + context.fetcher.load_time_period( + start_time=context.start_time, + end_time=context.end_time, filter_pattern=context.filter_pattern, - start=context.start_time, - end=context.end_time, ) - - formatted_events = context.formatter.do_format(events_iterable) - - for event in formatted_events: - # New line is not necessary. It is already in the log events sent by CloudWatch - click.echo(event, nl=False) diff --git a/samcli/commands/logs/logs_context.py b/samcli/commands/logs/logs_context.py index 668cffb66d..e9d9213bc5 100644 --- a/samcli/commands/logs/logs_context.py +++ b/samcli/commands/logs/logs_context.py @@ -3,13 +3,21 @@ """ import logging + import boto3 import botocore from samcli.commands.exceptions import UserException -from samcli.lib.logs.fetcher import LogsFetcher -from samcli.lib.logs.formatter import LogsFormatter, LambdaLogMsgFormatters, JSONMsgFormatter, KeywordHighlighter -from samcli.lib.logs.provider import LogGroupProvider +from samcli.lib.observability.cw_logs.cw_log_consumers import CWTerminalEventConsumer +from samcli.lib.observability.cw_logs.cw_log_formatters import ( + CWColorizeErrorsFormatter, + CWJsonFormatter, + CWKeywordHighlighterFormatter, + CWPrettyPrintFormatter, +) +from samcli.lib.observability.cw_logs.cw_log_group_provider import LogGroupProvider +from samcli.lib.observability.cw_logs.cw_log_puller import CWLogPuller +from samcli.lib.observability.observability_info_puller import ObservabilityEventConsumerDecorator from samcli.lib.utils.colors import Colored from samcli.lib.utils.time import to_utc, parse_date @@ -97,26 +105,20 @@ def __exit__(self, *args): @property def fetcher(self): - return LogsFetcher(self._logs_client) - - @property - def formatter(self): - """ - Creates and returns a Formatter capable of nicely formatting Lambda function logs - - Returns - ------- - LogsFormatter - """ - formatter_chain = [ - LambdaLogMsgFormatters.colorize_errors, - # Format JSON "before" highlighting the keywords. Otherwise, JSON will be invalid from all the - # ANSI color codes and fail to pretty print - JSONMsgFormatter.format_json, - KeywordHighlighter(self._filter_pattern).highlight_keywords, - ] - - return LogsFormatter(self.colored, formatter_chain) + return CWLogPuller( + logs_client=self._logs_client, + consumer=ObservabilityEventConsumerDecorator( + mappers=[ + CWColorizeErrorsFormatter(self.colored), + CWJsonFormatter(), + CWKeywordHighlighterFormatter(self.colored, self._filter_pattern), + CWPrettyPrintFormatter(self.colored), + ], + consumer=CWTerminalEventConsumer(), + ), + cw_log_group=self.log_group_name, + resource_name=self._function_name, + ) @property def start_time(self): diff --git a/samcli/lib/logs/event.py b/samcli/lib/logs/event.py deleted file mode 100644 index 0c05232d33..0000000000 --- a/samcli/lib/logs/event.py +++ /dev/null @@ -1,72 +0,0 @@ -""" -Represents CloudWatch Log Event -""" - -import logging - -from samcli.lib.utils.time import timestamp_to_iso - -LOG = logging.getLogger(__name__) - - -class LogEvent: - """ - Data object representing a CloudWatch Log Event - """ - - log_group_name = None - log_stream_name = None - timestamp = None - message = None - - def __init__(self, log_group_name, event_dict): - """ - Creates instance of the class - - Parameters - ---------- - log_group_name : str - The log group name - event_dict : dict - Dict of log event data returned by CloudWatch Logs API. - https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_FilteredLogEvent.html - """ - - self.log_group_name = log_group_name - - if not event_dict: - # If event is empty, just use default values for properties. We don't raise an error here because - # this class is a data wrapper to the `events_dict`. It doesn't try to be smart. - return - - self.log_stream_name = event_dict.get("logStreamName") - self.message = event_dict.get("message", "") - - self.timestamp_millis = event_dict.get("timestamp") - - # Convert the timestamp from epoch to readable ISO timestamp, easier for formatting. - if self.timestamp_millis: - self.timestamp = timestamp_to_iso(int(self.timestamp_millis)) - - def __eq__(self, other): - - if not isinstance(other, LogEvent): - return False - - return ( - self.log_group_name == other.log_group_name - and self.log_stream_name == other.log_stream_name - and self.timestamp == other.timestamp - and self.message == other.message - ) - - def __repr__(self): # pragma: no cover - # Used to print pretty diff when testing - return str( - { - "log_group_name": self.log_group_name, - "log_stream_name": self.log_stream_name, - "message": self.message, - "timestamp": self.timestamp, - } - ) diff --git a/samcli/lib/logs/fetcher.py b/samcli/lib/logs/fetcher.py deleted file mode 100644 index c6709fe28e..0000000000 --- a/samcli/lib/logs/fetcher.py +++ /dev/null @@ -1,145 +0,0 @@ -""" -Filters & fetches logs from CloudWatch Logs -""" - -import time -import logging - -from samcli.lib.utils.time import to_timestamp, to_datetime -from .event import LogEvent - - -LOG = logging.getLogger(__name__) - - -class LogsFetcher: - """ - Fetch logs from a CloudWatch Logs group with the ability to scope to a particular time, filter by - a pattern, and in the future possibly multiplex from from multiple streams together. - """ - - def __init__(self, cw_client=None): - """ - Initialize the fetcher - - Parameters - ---------- - cw_client - CloudWatch Logs Client from AWS SDK - """ - self.cw_client = cw_client - - def fetch(self, log_group_name, start=None, end=None, filter_pattern=None): - """ - Fetch logs from all streams under the given CloudWatch Log Group and yields in the output. Optionally, caller - can filter the logs using a pattern or a start/end time. - - Parameters - ---------- - log_group_name : string - Name of CloudWatch Logs Group to query. - start : datetime.datetime - Optional start time for logs. - end : datetime.datetime - Optional end time for logs. - filter_pattern : str - Expression to filter the logs by. This is passed directly to CloudWatch, so any expression supported by - CloudWatch Logs API is supported here. - - Yields - ------ - - samcli.lib.logs.event.LogEvent - Object containing the information from each log event returned by CloudWatch Logs - """ - - kwargs = {"logGroupName": log_group_name, "interleaved": True} - - if start: - kwargs["startTime"] = to_timestamp(start) - - if end: - kwargs["endTime"] = to_timestamp(end) - - if filter_pattern: - kwargs["filterPattern"] = filter_pattern - - while True: - LOG.debug("Fetching logs from CloudWatch with parameters %s", kwargs) - result = self.cw_client.filter_log_events(**kwargs) - - # Several events will be returned. Yield one at a time - for event in result.get("events", []): - yield LogEvent(log_group_name, event) - - # Keep iterating until there are no more logs left to query. - next_token = result.get("nextToken", None) - kwargs["nextToken"] = next_token - if not next_token: - break - - def tail(self, log_group_name, start=None, filter_pattern=None, max_retries=1000, poll_interval=0.3): - """ - ** This is a long blocking call ** - - Fetches logs from CloudWatch logs similar to the ``fetch`` method, but instead of stopping after all logs have - been fetched, this method continues to poll CloudWatch for new logs. So this essentially simulates the - ``tail -f`` bash command. - - If no logs are available, then it keep polling for ``timeout`` number of seconds before exiting. This method - polls CloudWatch at around ~3 Calls Per Second to stay below the 5TPS limit. - - Parameters - ---------- - log_group_name : str - Name of CloudWatch Logs Group to query. - start : datetime.datetime - Optional start time for logs. Defaults to '5m ago' - filter_pattern : str - Expression to filter the logs by. This is passed directly to CloudWatch, so any expression supported by - CloudWatch Logs API is supported here. - max_retries : int - When logs are not available, this value determines the number of times to retry fetching logs before giving - up. This counter is reset every time new logs are available. - poll_interval : float - Number of fractional seconds wait before polling again. Defaults to 300milliseconds. - If no new logs available, this method will stop polling after ``max_retries * poll_interval`` seconds - - Yields - ------ - samcli.lib.logs.event.LogEvent - Object containing the information from each log event returned by CloudWatch Logs - """ - - # On every poll, startTime of the API call is the timestamp of last record observed - latest_event_time = 0 # Start of epoch - if start: - latest_event_time = to_timestamp(start) - - counter = max_retries - while counter > 0: - - LOG.debug("Tailing logs from %s starting at %s", log_group_name, str(latest_event_time)) - - has_data = False - counter -= 1 - events_itr = self.fetch(log_group_name, start=to_datetime(latest_event_time), filter_pattern=filter_pattern) - - # Find the timestamp of the most recent log event. - for event in events_itr: - has_data = True - - if event.timestamp_millis > latest_event_time: - latest_event_time = event.timestamp_millis - - # Yield the event back so it behaves similar to ``fetch`` - yield event - - # This poll fetched logs. Reset the retry counter and set the timestamp for next poll - if has_data: - counter = max_retries - latest_event_time += 1 # one extra millisecond to fetch next log event - - # We already fetched logs once. Sleep for some time before querying again. - # This also helps us scoot under the TPS limit for CloudWatch API call. - time.sleep(poll_interval) diff --git a/samcli/lib/logs/formatter.py b/samcli/lib/logs/formatter.py deleted file mode 100644 index 6e21619f36..0000000000 --- a/samcli/lib/logs/formatter.py +++ /dev/null @@ -1,181 +0,0 @@ -""" -Format log events produced by CloudWatch Logs -""" - -import json -import functools - - -class LogsFormatter: - """ - Formats log messages returned by CloudWatch Logs service. - """ - - def __init__(self, colored, formatter_chain=None): - # the docstring contains an example function which contains another docstring, - # pylint is confused so disable it for this method. - # pylint: disable=missing-param-doc,differing-param-doc,differing-type-doc,redundant-returns-doc - """ - - ``formatter_chain`` is a list of methods that can format an event. Each method must take an - ``samcli.lib.logs.event.LogEvent`` object as input and return the same object back. This allows us to easily - chain formatter methods one after another. This class will apply all the formatters from this list on each - log event. - - After running the formatter chain, this class will convert the event object to string by appending - the timestamp to message. To skip all custom formatting and simply convert event to string, you can leave - the ``formatter_chain`` list empty. - - Formatter Method - ================ - Formatter method needs to accept two arguments at a minimum: ``event`` and ``colored``. It can make - modifications to the contents of ``event`` and must return the same object. - - Example: - .. code-block:: python - - def my_formatter(event, colored): - \""" - Example of a custom log formatter - - Parameters - ---------- - event : samcli.lib.logs.event.LogEvent - Log event to format - - colored : samcli.lib.utils.colors.Colored - Instance of ``Colored`` object to add colors to the message - - Returns - ------- - samcli.lib.logs.event.LogEvent - Object representing the log event that has been formatted. It could be the same event object passed - via input. - \""" - - # Do your formatting - - return event - - Parameters - ---------- - colored : samcli.lib.utils.colors.Colored - Used to add color to the string when pretty printing. Colors are useful only when pretty printing on a - Terminal. To turn off coloring, set the appropriate property when instantiating the - ``samcli.lib.utils.colors.Colored`` class. - - formatter_chain : List[str] - list of formatter methods - """ - - self.colored = colored - self.formatter_chain = formatter_chain or [] - - # At end of the chain, pretty print the Event object as string. - self.formatter_chain.append(LogsFormatter._pretty_print_event) - - def do_format(self, event_iterable): - """ - Formats the given CloudWatch Logs Event dictionary as necessary and returns an iterable that will - return the formatted string. This can be used to parse and format the events based on context - ie. In Lambda Function logs, a formatter may wish to color the "ERROR" keywords red, - or highlight a filter keyword separately etc. - - This method takes an iterable as input and returns an iterable. It does not immediately format the event. - Instead, it sets up the formatter chain appropriately and returns the iterable. Actual formatting happens - only when the iterable is used by the caller. - - Parameters - ---------- - event_iterable : iterable of samcli.lib.logs.event.LogEvent - Iterable that returns an object containing information about each log event. - - Returns - ------- - iterable of string - Iterable that returns a formatted event as a string. - """ - - for operation in self.formatter_chain: - - # Make sure the operation has access to certain basic objects like colored - partial_op = functools.partial(operation, colored=self.colored) - event_iterable = map(partial_op, event_iterable) - - return event_iterable - - @staticmethod - def _pretty_print_event(event, colored): - """ - Basic formatter to convert an event object to string - """ - event.timestamp = colored.yellow(event.timestamp) - event.log_stream_name = colored.cyan(event.log_stream_name) - - return " ".join([event.log_stream_name, event.timestamp, event.message]) - - -class LambdaLogMsgFormatters: - """ - Format logs printed by AWS Lambda functions. - - This class is a collection of static methods that can be used within a formatter chain. - """ - - @staticmethod - def colorize_errors(event, colored): - """ - Highlights some commonly known Lambda error cases in red: - - Nodejs process crashes - - Lambda function timeouts - """ - - nodejs_crash_msg = "Process exited before completing request" - timeout_msg = "Task timed out" - - if nodejs_crash_msg in event.message or timeout_msg in event.message: - event.message = colored.red(event.message) - - return event - - -class KeywordHighlighter: - """ - Highlight certain keywords in the log line - """ - - def __init__(self, keyword=None): - self.keyword = keyword - - def highlight_keywords(self, event, colored): - """ - Highlight the keyword in the log statement by drawing an underline - """ - if self.keyword: - highlight = colored.underline(self.keyword) - event.message = event.message.replace(self.keyword, highlight) - - return event - - -class JSONMsgFormatter: - """ - Pretty print JSONs within a message - """ - - @staticmethod - def format_json(event, colored): - """ - If the event message is a JSON string, then pretty print the JSON with 2 indents and sort the keys. This makes - it very easy to visually parse and search JSON data - """ - - try: - if event.message.startswith("{"): - msg_dict = json.loads(event.message) - event.message = json.dumps(msg_dict, indent=2) - except Exception: - # Skip if the event message was not JSON - pass - - return event diff --git a/samcli/lib/logs/__init__.py b/samcli/lib/observability/__init__.py similarity index 100% rename from samcli/lib/logs/__init__.py rename to samcli/lib/observability/__init__.py diff --git a/tests/unit/lib/logs/__init__.py b/samcli/lib/observability/cw_logs/__init__.py similarity index 100% rename from tests/unit/lib/logs/__init__.py rename to samcli/lib/observability/cw_logs/__init__.py diff --git a/samcli/lib/observability/cw_logs/cw_log_consumers.py b/samcli/lib/observability/cw_logs/cw_log_consumers.py new file mode 100644 index 0000000000..5e41b4275b --- /dev/null +++ b/samcli/lib/observability/cw_logs/cw_log_consumers.py @@ -0,0 +1,17 @@ +""" +Consumers for CloudWatch log events +""" +from typing import Any + +import click + +from samcli.lib.observability.observability_info_puller import ObservabilityEventConsumer + + +class CWTerminalEventConsumer(ObservabilityEventConsumer): + """ + Consumer implementation that will consume given event as outputting into console + """ + + def consume(self, event: Any): + click.echo(event, nl=False) diff --git a/samcli/lib/observability/cw_logs/cw_log_event.py b/samcli/lib/observability/cw_logs/cw_log_event.py new file mode 100644 index 0000000000..49b9a4e889 --- /dev/null +++ b/samcli/lib/observability/cw_logs/cw_log_event.py @@ -0,0 +1,40 @@ +""" +CloudWatch log event type +""" +from typing import Optional + +from samcli.lib.observability.observability_info_puller import ObservabilityEvent + + +class CWLogEvent(ObservabilityEvent[dict]): + """ + An event class which represents a Cloud Watch log + """ + + def __init__(self, cw_log_group: str, event: dict, resource_name: Optional[str] = None): + """ + Parameters + ---------- + cw_log_group : str + Name of the CloudWatch log group + event : dict + Event dictionary of the CloudWatch log event + resource_name : Optional[str] + Resource name that is related to this CloudWatch log event + """ + self.cw_log_group = cw_log_group + self.message: str = event.get("message", "") + self.log_stream_name: str = event.get("logStreamName", "") + timestamp: int = event.get("timestamp", 0) + super().__init__(event, timestamp, resource_name) + + def __eq__(self, other): + if not isinstance(other, CWLogEvent): + return False + + return ( + self.cw_log_group == other.cw_log_group + and self.log_stream_name == other.log_stream_name + and self.timestamp == other.timestamp + and self.message == other.message + ) diff --git a/samcli/lib/observability/cw_logs/cw_log_formatters.py b/samcli/lib/observability/cw_logs/cw_log_formatters.py new file mode 100644 index 0000000000..3c4acaa820 --- /dev/null +++ b/samcli/lib/observability/cw_logs/cw_log_formatters.py @@ -0,0 +1,87 @@ +""" +Contains all mappers (formatters) for CloudWatch logs +""" +import json + +from samcli.lib.observability.cw_logs.cw_log_event import CWLogEvent +from samcli.lib.observability.observability_info_puller import ObservabilityEventMapper +from samcli.lib.utils.colors import Colored +from samcli.lib.utils.time import timestamp_to_iso + + +class CWKeywordHighlighterFormatter(ObservabilityEventMapper[CWLogEvent]): + """ + Mapper implementation which will highlight given keywords in CloudWatch logs + """ + + def __init__(self, colored: Colored, keyword=None): + """ + Parameters + ---------- + colored : Colored + Colored class that will be used to highlight the keywords in log event + keyword : str + Keyword that will be highlighted + """ + self._keyword = keyword + self._colored = colored + + def map(self, event: CWLogEvent) -> CWLogEvent: + if self._keyword: + highlight = self._colored.underline(self._keyword) + event.message = event.message.replace(self._keyword, highlight) + + return event + + +class CWColorizeErrorsFormatter(ObservabilityEventMapper[CWLogEvent]): + """ + Mapper implementation which will colorize some pre-defined error messages + """ + + NODEJS_CRASH_MESSAGE = "Process exited before completing request" + TIMEOUT_MSG = "Task timed out" + + def __init__(self, colored: Colored): + self._colored = colored + + def map(self, event: CWLogEvent) -> CWLogEvent: + if ( + CWColorizeErrorsFormatter.NODEJS_CRASH_MESSAGE in event.message + or CWColorizeErrorsFormatter.TIMEOUT_MSG in event.message + ): + event.message = self._colored.red(event.message) + return event + + +class CWJsonFormatter(ObservabilityEventMapper[CWLogEvent]): + """ + Mapper implementation which will auto indent the input if the input is a JSON object + """ + + # pylint: disable=R0201 + def map(self, event: CWLogEvent) -> CWLogEvent: + try: + if event.message.startswith("{"): + msg_dict = json.loads(event.message) + event.message = json.dumps(msg_dict, indent=2) + except Exception: + pass + + return event + + +class CWPrettyPrintFormatter(ObservabilityEventMapper[CWLogEvent]): + """ + Mapper implementation which will format given CloudWatch log event into string with coloring + log stream name and timestamp + """ + + def __init__(self, colored: Colored): + self._colored = colored + + # pylint: disable=R0201 + def map(self, event: CWLogEvent) -> str: + timestamp = self._colored.yellow(timestamp_to_iso(int(event.timestamp))) + log_stream_name = self._colored.cyan(event.log_stream_name) + return f"{log_stream_name} {timestamp} {event.message}" diff --git a/samcli/lib/logs/provider.py b/samcli/lib/observability/cw_logs/cw_log_group_provider.py similarity index 100% rename from samcli/lib/logs/provider.py rename to samcli/lib/observability/cw_logs/cw_log_group_provider.py diff --git a/samcli/lib/observability/cw_logs/cw_log_puller.py b/samcli/lib/observability/cw_logs/cw_log_puller.py new file mode 100644 index 0000000000..e7d8b7fb10 --- /dev/null +++ b/samcli/lib/observability/cw_logs/cw_log_puller.py @@ -0,0 +1,111 @@ +""" +CloudWatch log event puller implementation +""" +import logging +import time +from datetime import datetime +from typing import Optional, Any + +from samcli.lib.observability.cw_logs.cw_log_event import CWLogEvent +from samcli.lib.observability.observability_info_puller import ObservabilityPuller, ObservabilityEventConsumer +from samcli.lib.utils.time import to_timestamp, to_datetime + +LOG = logging.getLogger(__name__) + + +class CWLogPuller(ObservabilityPuller): + """ + Puller implementation that can pull events from CloudWatch log group + """ + + def __init__( + self, + logs_client: Any, + consumer: ObservabilityEventConsumer, + cw_log_group: str, + resource_name: Optional[str] = None, + max_retries: int = 1000, + poll_interval: int = 1, + ): + """ + Parameters + ---------- + logs_client: Any + boto3 logs client instance + consumer : ObservabilityEventConsumer + Consumer instance that will process pulled events + cw_log_group : str + CloudWatch log group name + resource_name : Optional[str] + Optional parameter to assign a resource name for each event. + max_retries: int + Optional parameter to set maximum retries when tailing. Default value is 1000 + poll_interval: int + Optional parameter to define sleep interval between pulling new log events when tailing. Default value is 1 + """ + self.logs_client = logs_client + self.consumer = consumer + self.cw_log_group = cw_log_group + self.resource_name = resource_name + self._max_retries = max_retries + self._poll_interval = poll_interval + self.latest_event_time = 0 + self.had_data = False + + def tail(self, start_time: Optional[datetime] = None, filter_pattern: Optional[str] = None): + if start_time: + self.latest_event_time = to_timestamp(start_time) + + counter = self._max_retries + while counter > 0: + LOG.debug("Tailing logs from %s starting at %s", self.cw_log_group, str(self.latest_event_time)) + + counter -= 1 + self.load_time_period(to_datetime(self.latest_event_time), filter_pattern=filter_pattern) + + # This poll fetched logs. Reset the retry counter and set the timestamp for next poll + if self.had_data: + counter = self._max_retries + self.latest_event_time += 1 # one extra millisecond to fetch next log event + self.had_data = False + + # We already fetched logs once. Sleep for some time before querying again. + # This also helps us scoot under the TPS limit for CloudWatch API call. + time.sleep(self._poll_interval) + + def load_time_period( + self, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None, + filter_pattern: Optional[str] = None, + ): + kwargs = {"logGroupName": self.cw_log_group, "interleaved": True} + + if start_time: + kwargs["startTime"] = to_timestamp(start_time) + + if end_time: + kwargs["endTime"] = to_timestamp(end_time) + + if filter_pattern: + kwargs["filterPattern"] = filter_pattern + + while True: + LOG.debug("Fetching logs from CloudWatch with parameters %s", kwargs) + result = self.logs_client.filter_log_events(**kwargs) + + # Several events will be returned. Yield one at a time + for event in result.get("events", []): + self.had_data = True + cw_event = CWLogEvent(self.cw_log_group, event, self.resource_name) + + if cw_event.timestamp > self.latest_event_time: + self.latest_event_time = cw_event.timestamp + + self.consumer.consume(cw_event) + + # Keep iterating until there are no more logs left to query. + next_token = result.get("nextToken", None) + kwargs["nextToken"] = next_token + if not next_token: + break diff --git a/samcli/lib/observability/observability_info_puller.py b/samcli/lib/observability/observability_info_puller.py new file mode 100644 index 0000000000..afd6efdd7f --- /dev/null +++ b/samcli/lib/observability/observability_info_puller.py @@ -0,0 +1,142 @@ +""" +Interfaces and generic implementations for observability events (like CW logs) +""" +import logging +from abc import ABC, abstractmethod +from datetime import datetime +from typing import List, Optional, Generic, TypeVar, Any + +LOG = logging.getLogger(__name__) + +# Generic type for the internal observability event +InternalEventType = TypeVar("InternalEventType") + + +class ObservabilityEvent(Generic[InternalEventType]): + """ + Generic class that represents observability event + This keeps some common fields for filtering or sorting later on + """ + + def __init__(self, event: InternalEventType, timestamp: int, resource_name: Optional[str] = None): + """ + Parameters + ---------- + event : EventType + Actual event object. This can be any type with generic definition (dict, str etc.) + timestamp : int + Timestamp of the event + resource_name : Optional[str] + Resource name related to this event. This is optional since not all events is connected to a single resource + """ + self.event = event + self.timestamp = timestamp + self.resource_name = resource_name + + +# Generic type for identifying different ObservabilityEvent +ObservabilityEventType = TypeVar("ObservabilityEventType", bound=ObservabilityEvent) + + +class ObservabilityPuller(ABC): + """ + Interface definition for pulling observability information. + """ + + @abstractmethod + def tail(self, start_time: Optional[datetime] = None, filter_pattern: Optional[str] = None): + """ + Parameters + ---------- + start_time : Optional[datetime] + Optional parameter to tail information from earlier time + filter_pattern : Optional[str] + Optional parameter to filter events with given string + """ + + @abstractmethod + def load_time_period( + self, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None, + filter_pattern: Optional[str] = None, + ): + """ + Parameters + ---------- + start_time : Optional[datetime] + Optional parameter to load events from certain date time + end_time : Optional[datetime] + Optional parameter to load events until certain date time + filter_pattern : Optional[str] + Optional parameter to filter events with given string + """ + + +# pylint: disable=fixme +# fixme add ABC parent class back once we bump the pylint to a version 2.8.2 or higher +class ObservabilityEventMapper(Generic[ObservabilityEventType]): + """ + Interface definition to map/change any event to another object + This could be used by highlighting certain parts or formatting events before logging into console + """ + + @abstractmethod + def map(self, event: ObservabilityEventType) -> Any: + """ + Parameters + ---------- + event : ObservabilityEventType + Event object that will be mapped/converted to another event or any object + + Returns + ------- + Any + Return converted type + """ + + +class ObservabilityEventConsumer(ABC): + """ + Consumer interface, which will consume any event. + An example is to output event into console. + """ + + @abstractmethod + def consume(self, event: ObservabilityEvent): + """ + Parameters + ---------- + event : ObservabilityEvent + Event that will be consumed + """ + + +class ObservabilityEventConsumerDecorator(ObservabilityEventConsumer): + """ + A decorator implementation for consumer, which can have mappers and decorated consumer within. + Rather than the normal implementation, this will process the events through mappers which is been + provided, and then pass them to actual consumer + """ + + def __init__(self, mappers: List[ObservabilityEventMapper], consumer: ObservabilityEventConsumer): + """ + Parameters + ---------- + mappers : List[ObservabilityEventMapper] + List of event mappers which will be used to process events before passing to consumer + consumer : ObservabilityEventConsumer + Actual consumer which will handle the events after they are processed by mappers + """ + self._mappers = mappers + self._consumer = consumer + + def consume(self, event: ObservabilityEvent): + """ + See Also ObservabilityEventConsumerDecorator and ObservabilityEventConsumer + """ + for mapper in self._mappers: + LOG.debug("Calling mapper (%s) for event (%s)", mapper, event) + event = mapper.map(event) + LOG.debug("Calling consumer (%s) for event (%s)", self._consumer, event) + self._consumer.consume(event) diff --git a/tests/unit/commands/logs/test_command.py b/tests/unit/commands/logs/test_command.py index b895428f19..3a48600ae0 100644 --- a/tests/unit/commands/logs/test_command.py +++ b/tests/unit/commands/logs/test_command.py @@ -1,5 +1,5 @@ from unittest import TestCase -from unittest.mock import Mock, patch, call +from unittest.mock import Mock, patch from samcli.commands.logs.command import do_cli @@ -13,67 +13,46 @@ def setUp(self): self.start_time = "start" self.end_time = "end" - @patch("samcli.commands.logs.command.click") @patch("samcli.commands.logs.logs_context.LogsCommandContext") - def test_without_tail(self, LogsCommandContextMock, click_mock): + def test_without_tail(self, logs_command_context_mock): tailing = False - events_iterable = [1, 2, 3] - formatted_events = [4, 5, 6] context_mock = Mock() - LogsCommandContextMock.return_value.__enter__.return_value = context_mock - - context_mock.fetcher.fetch.return_value = events_iterable - context_mock.formatter.do_format.return_value = formatted_events + logs_command_context_mock.return_value.__enter__.return_value = context_mock do_cli(self.function_name, self.stack_name, self.filter_pattern, tailing, self.start_time, self.end_time) - LogsCommandContextMock.assert_called_with( + logs_command_context_mock.assert_called_with( self.function_name, stack_name=self.stack_name, filter_pattern=self.filter_pattern, start_time=self.start_time, end_time=self.end_time, - output_file=None, ) - context_mock.fetcher.fetch.assert_called_with( - context_mock.log_group_name, + context_mock.fetcher.load_time_period.assert_called_with( filter_pattern=context_mock.filter_pattern, - start=context_mock.start_time, - end=context_mock.end_time, + start_time=context_mock.start_time, + end_time=context_mock.end_time, ) - context_mock.formatter.do_format.assert_called_with(events_iterable) - click_mock.echo.assert_has_calls([call(v, nl=False) for v in formatted_events]) - - @patch("samcli.commands.logs.command.click") @patch("samcli.commands.logs.logs_context.LogsCommandContext") - def test_with_tailing(self, LogsCommandContextMock, click_mock): + def test_with_tailing(self, logs_command_context_mock): tailing = True - events_iterable = [1, 2, 3] - formatted_events = [4, 5, 6] context_mock = Mock() - LogsCommandContextMock.return_value.__enter__.return_value = context_mock - - context_mock.fetcher.tail.return_value = events_iterable - context_mock.formatter.do_format.return_value = formatted_events + logs_command_context_mock.return_value.__enter__.return_value = context_mock do_cli(self.function_name, self.stack_name, self.filter_pattern, tailing, self.start_time, self.end_time) - LogsCommandContextMock.assert_called_with( + logs_command_context_mock.assert_called_with( self.function_name, stack_name=self.stack_name, filter_pattern=self.filter_pattern, start_time=self.start_time, end_time=self.end_time, - output_file=None, ) context_mock.fetcher.tail.assert_called_with( - context_mock.log_group_name, filter_pattern=context_mock.filter_pattern, start=context_mock.start_time + filter_pattern=context_mock.filter_pattern, start_time=context_mock.start_time ) - - context_mock.formatter.do_format.assert_called_with(events_iterable) - click_mock.echo.assert_has_calls([call(v, nl=False) for v in formatted_events]) diff --git a/tests/unit/commands/logs/test_logs_context.py b/tests/unit/commands/logs/test_logs_context.py index fe37d4e1c7..abcd792b27 100644 --- a/tests/unit/commands/logs/test_logs_context.py +++ b/tests/unit/commands/logs/test_logs_context.py @@ -1,11 +1,11 @@ -import botocore.session -from botocore.stub import Stubber - from unittest import TestCase from unittest.mock import Mock, patch, ANY -from samcli.commands.logs.logs_context import LogsCommandContext +import botocore.session +from botocore.stub import Stubber + from samcli.commands.exceptions import UserException +from samcli.commands.logs.logs_context import LogsCommandContext class TestLogsCommandContext(TestCase): @@ -30,13 +30,6 @@ def test_basic_properties(self): self.assertEqual(self.context.filter_pattern, self.filter_pattern) self.assertIsNone(self.context.output_file_handle) # before setting context handle will be null - @patch("samcli.commands.logs.logs_context.LogsFetcher") - def test_fetcher_property(self, LogsFetcherMock): - LogsFetcherMock.return_value = Mock() - - self.assertEqual(self.context.fetcher, LogsFetcherMock.return_value) - LogsFetcherMock.assert_called_with(self.context._logs_client) - @patch("samcli.commands.logs.logs_context.Colored") def test_colored_property(self, ColoredMock): ColoredMock.return_value = Mock() @@ -61,15 +54,6 @@ def test_colored_property_without_output_file(self, ColoredMock): self.assertEqual(ctx.colored, ColoredMock.return_value) ColoredMock.assert_called_with(colorize=True) # Must enable colors - @patch("samcli.commands.logs.logs_context.LogsFormatter") - @patch("samcli.commands.logs.logs_context.Colored") - def test_formatter_property(self, ColoredMock, LogsFormatterMock): - LogsFormatterMock.return_value = Mock() - ColoredMock.return_value = Mock() - - self.assertEqual(self.context.formatter, LogsFormatterMock.return_value) - LogsFormatterMock.assert_called_with(ColoredMock.return_value, ANY) - @patch("samcli.commands.logs.logs_context.LogGroupProvider") @patch.object(LogsCommandContext, "_get_resource_id_from_stack") def test_log_group_name_property_with_stack_name(self, get_resource_id_mock, LogGroupProviderMock): diff --git a/tests/unit/lib/logs/test_fetcher.py b/tests/unit/lib/logs/test_fetcher.py deleted file mode 100644 index c0b634c008..0000000000 --- a/tests/unit/lib/logs/test_fetcher.py +++ /dev/null @@ -1,255 +0,0 @@ -import copy -import datetime -import botocore.session - -from unittest import TestCase -from unittest.mock import Mock, patch, call, ANY -from botocore.stub import Stubber - -from samcli.lib.logs.fetcher import LogsFetcher -from samcli.lib.logs.event import LogEvent -from samcli.lib.utils.time import to_timestamp, to_datetime - - -class TestLogsFetcher_fetch(TestCase): - def setUp(self): - - real_client = botocore.session.get_session().create_client("logs", region_name="us-east-1") - self.client_stubber = Stubber(real_client) - self.fetcher = LogsFetcher(real_client) - - self.log_group_name = "name" - self.stream_name = "stream name" - self.timestamp = to_timestamp(datetime.datetime.utcnow()) - - self.mock_api_response = { - "events": [ - { - "eventId": "id1", - "ingestionTime": 0, - "logStreamName": self.stream_name, - "message": "message 1", - "timestamp": self.timestamp, - }, - { - "eventId": "id2", - "ingestionTime": 0, - "logStreamName": self.stream_name, - "message": "message 2", - "timestamp": self.timestamp, - }, - ] - } - - self.expected_events = [ - LogEvent( - self.log_group_name, - { - "eventId": "id1", - "ingestionTime": 0, - "logStreamName": self.stream_name, - "message": "message 1", - "timestamp": self.timestamp, - }, - ), - LogEvent( - self.log_group_name, - { - "eventId": "id2", - "ingestionTime": 0, - "logStreamName": self.stream_name, - "message": "message 2", - "timestamp": self.timestamp, - }, - ), - ] - - def test_must_fetch_logs_for_log_group(self): - expected_params = {"logGroupName": self.log_group_name, "interleaved": True} - - # Configure the stubber to return the configured response. The stubber also verifies - # that input params were provided as expected - self.client_stubber.add_response("filter_log_events", self.mock_api_response, expected_params) - - with self.client_stubber: - events_iterable = self.fetcher.fetch(self.log_group_name) - - actual_result = list(events_iterable) - self.assertEqual(self.expected_events, actual_result) - - def test_must_fetch_logs_with_all_params(self): - pattern = "foobar" - start = datetime.datetime.utcnow() - end = datetime.datetime.utcnow() - - expected_params = { - "logGroupName": self.log_group_name, - "interleaved": True, - "startTime": to_timestamp(start), - "endTime": to_timestamp(end), - "filterPattern": pattern, - } - - self.client_stubber.add_response("filter_log_events", self.mock_api_response, expected_params) - - with self.client_stubber: - events_iterable = self.fetcher.fetch(self.log_group_name, start=start, end=end, filter_pattern=pattern) - - actual_result = list(events_iterable) - self.assertEqual(self.expected_events, actual_result) - - def test_must_paginate_using_next_token(self): - """Make three API calls, first two returns a nextToken and last does not.""" - token = "token" - expected_params = {"logGroupName": self.log_group_name, "interleaved": True} - expected_params_with_token = {"logGroupName": self.log_group_name, "interleaved": True, "nextToken": token} - - mock_response_with_token = copy.deepcopy(self.mock_api_response) - mock_response_with_token["nextToken"] = token - - # Call 1 returns a token. Also when first call is made, token is **not** passed as API params - self.client_stubber.add_response("filter_log_events", mock_response_with_token, expected_params) - - # Call 2 returns a token - self.client_stubber.add_response("filter_log_events", mock_response_with_token, expected_params_with_token) - - # Call 3 DOES NOT return a token. This will terminate the loop. - self.client_stubber.add_response("filter_log_events", self.mock_api_response, expected_params_with_token) - - # Same data was returned in each API call - expected_events_result = self.expected_events + self.expected_events + self.expected_events - - with self.client_stubber: - events_iterable = self.fetcher.fetch(self.log_group_name) - - actual_result = list(events_iterable) - self.assertEqual(expected_events_result, actual_result) - - -class TestLogsFetcher_tail(TestCase): - def setUp(self): - - self.fetcher = LogsFetcher(Mock()) - - self.log_group_name = "name" - self.filter_pattern = "pattern" - - self.start_time = to_datetime(10) - self.max_retries = 3 - self.poll_interval = 1 - - self.mock_events1 = [ - LogEvent(self.log_group_name, {"timestamp": 11}), - LogEvent(self.log_group_name, {"timestamp": 12}), - ] - self.mock_events2 = [ - LogEvent(self.log_group_name, {"timestamp": 13}), - LogEvent(self.log_group_name, {"timestamp": 14}), - ] - self.mock_events_empty = [] - - @patch("samcli.lib.logs.fetcher.time") - def test_must_tail_logs_with_single_data_fetch(self, time_mock): - - self.fetcher.fetch = Mock() - - self.fetcher.fetch.side_effect = [ - self.mock_events1, - # Return empty data for `max_retries` number of polls - self.mock_events_empty, - self.mock_events_empty, - self.mock_events_empty, - ] - - expected_fetch_calls = [ - # First fetch returns data - call(ANY, start=self.start_time, filter_pattern=self.filter_pattern), - # Three empty fetches - call(ANY, start=to_datetime(13), filter_pattern=self.filter_pattern), - call(ANY, start=to_datetime(13), filter_pattern=self.filter_pattern), - call(ANY, start=to_datetime(13), filter_pattern=self.filter_pattern), - ] - - # One per poll - expected_sleep_calls = [call(self.poll_interval) for i in expected_fetch_calls] - - result_itr = self.fetcher.tail( - self.log_group_name, - start=self.start_time, - filter_pattern=self.filter_pattern, - max_retries=self.max_retries, - poll_interval=self.poll_interval, - ) - - self.assertEqual(self.mock_events1, list(result_itr)) - self.assertEqual(expected_fetch_calls, self.fetcher.fetch.call_args_list) - self.assertEqual(expected_sleep_calls, time_mock.sleep.call_args_list) - - @patch("samcli.lib.logs.fetcher.time") - def test_must_tail_logs_with_multiple_data_fetches(self, time_mock): - - self.fetcher.fetch = Mock() - - self.fetcher.fetch.side_effect = [ - self.mock_events1, - # Just one empty fetch - self.mock_events_empty, - # This fetch returns data - self.mock_events2, - # Return empty data for `max_retries` number of polls - self.mock_events_empty, - self.mock_events_empty, - self.mock_events_empty, - ] - - expected_fetch_calls = [ - # First fetch returns data - call(ANY, start=self.start_time, filter_pattern=self.filter_pattern), - # This fetch was empty - call(ANY, start=to_datetime(13), filter_pattern=self.filter_pattern), - # This fetch returned data - call(ANY, start=to_datetime(13), filter_pattern=self.filter_pattern), - # Three empty fetches - call(ANY, start=to_datetime(15), filter_pattern=self.filter_pattern), - call(ANY, start=to_datetime(15), filter_pattern=self.filter_pattern), - call(ANY, start=to_datetime(15), filter_pattern=self.filter_pattern), - ] - - # One per poll - expected_sleep_calls = [call(self.poll_interval) for i in expected_fetch_calls] - - result_itr = self.fetcher.tail( - self.log_group_name, - start=self.start_time, - filter_pattern=self.filter_pattern, - max_retries=self.max_retries, - poll_interval=self.poll_interval, - ) - - self.assertEqual(self.mock_events1 + self.mock_events2, list(result_itr)) - self.assertEqual(expected_fetch_calls, self.fetcher.fetch.call_args_list) - self.assertEqual(expected_sleep_calls, time_mock.sleep.call_args_list) - - @patch("samcli.lib.logs.fetcher.time") - def test_without_start_time(self, time_mock): - - self.fetcher.fetch = Mock() - - self.fetcher.fetch.return_value = self.mock_events_empty - - expected_fetch_calls = [ - # Three empty fetches, all with default start time - call(ANY, start=to_datetime(0), filter_pattern=ANY), - call(ANY, start=to_datetime(0), filter_pattern=ANY), - call(ANY, start=to_datetime(0), filter_pattern=ANY), - ] - - result_itr = self.fetcher.tail( - self.log_group_name, - filter_pattern=self.filter_pattern, - max_retries=self.max_retries, - poll_interval=self.poll_interval, - ) - - self.assertEqual([], list(result_itr)) - self.assertEqual(expected_fetch_calls, self.fetcher.fetch.call_args_list) diff --git a/tests/unit/lib/logs/test_formatter.py b/tests/unit/lib/logs/test_formatter.py deleted file mode 100644 index b30fd49c71..0000000000 --- a/tests/unit/lib/logs/test_formatter.py +++ /dev/null @@ -1,164 +0,0 @@ -import json - -from unittest import TestCase -from unittest.mock import Mock, patch, call -from parameterized import parameterized - -from samcli.lib.logs.formatter import LogsFormatter, LambdaLogMsgFormatters, KeywordHighlighter, JSONMsgFormatter -from samcli.lib.logs.event import LogEvent - - -class TestLogsFormatter_pretty_print_event(TestCase): - def setUp(self): - self.colored_mock = Mock() - self.group_name = "group name" - self.stream_name = "stream name" - self.message = "message" - self.event_dict = {"timestamp": 1, "message": self.message, "logStreamName": self.stream_name} - - def test_must_serialize_event(self): - colored_timestamp = "colored timestamp" - colored_stream_name = "colored stream name" - self.colored_mock.yellow.return_value = colored_timestamp - self.colored_mock.cyan.return_value = colored_stream_name - - event = LogEvent(self.group_name, self.event_dict) - - expected = " ".join([colored_stream_name, colored_timestamp, self.message]) - result = LogsFormatter._pretty_print_event(event, self.colored_mock) - - self.assertEqual(expected, result) - self.colored_mock.yellow.has_calls() - self.colored_mock.cyan.assert_called_with(self.stream_name) - - -def _passthru_formatter(event, colored): - return event - - -class TestLogsFormatter_do_format(TestCase): - def setUp(self): - self.colored_mock = Mock() - - # Set formatter chain method to return the input unaltered. - self.chain_method1 = Mock(wraps=_passthru_formatter) - self.chain_method2 = Mock(wraps=_passthru_formatter) - self.chain_method3 = Mock(wraps=_passthru_formatter) - - self.formatter_chain = [self.chain_method1, self.chain_method2, self.chain_method3] - - @patch.object(LogsFormatter, "_pretty_print_event", wraps=_passthru_formatter) - def test_must_map_formatters_sequentially(self, pretty_print_mock): - - events_iterable = [1, 2, 3] - expected_result = [1, 2, 3] - expected_call_order = [ - call(1, colored=self.colored_mock), - call(2, colored=self.colored_mock), - call(3, colored=self.colored_mock), - ] - - formatter = LogsFormatter(self.colored_mock, self.formatter_chain) - - result_iterable = formatter.do_format(events_iterable) - self.assertEqual(list(result_iterable), expected_result) - - self.chain_method1.assert_has_calls(expected_call_order) - self.chain_method2.assert_has_calls(expected_call_order) - self.chain_method3.assert_has_calls(expected_call_order) - pretty_print_mock.assert_has_calls(expected_call_order) # Pretty Printer must always be called - - @patch.object(LogsFormatter, "_pretty_print_event", wraps=_passthru_formatter) - def test_must_work_without_formatter_chain(self, pretty_print_mock): - - events_iterable = [1, 2, 3] - expected_result = [1, 2, 3] - expected_call_order = [ - call(1, colored=self.colored_mock), - call(2, colored=self.colored_mock), - call(3, colored=self.colored_mock), - ] - - # No formatter chain. - formatter = LogsFormatter(self.colored_mock) - - result_iterable = formatter.do_format(events_iterable) - self.assertEqual(list(result_iterable), expected_result) - - # Pretty Print is always called, even if there are no other formatters in the chain. - pretty_print_mock.assert_has_calls(expected_call_order) - self.chain_method1.assert_not_called() - self.chain_method2.assert_not_called() - self.chain_method3.assert_not_called() - - -class TestLambdaLogMsgFormatters_colorize_crashes(TestCase): - @parameterized.expand( - [ - "Task timed out", - "Something happened. Task timed out. Something else happend", - "Process exited before completing request", - ] - ) - def test_must_color_crash_messages(self, input_msg): - color_result = "colored messaage" - colored = Mock() - colored.red.return_value = color_result - event = LogEvent("group_name", {"message": input_msg}) - - result = LambdaLogMsgFormatters.colorize_errors(event, colored) - self.assertEqual(result.message, color_result) - colored.red.assert_called_with(input_msg) - - def test_must_ignore_other_messages(self): - colored = Mock() - event = LogEvent("group_name", {"message": "some msg"}) - - result = LambdaLogMsgFormatters.colorize_errors(event, colored) - self.assertEqual(result.message, "some msg") - colored.red.assert_not_called() - - -class TestKeywordHighlight_highlight_keyword(TestCase): - def test_must_highlight_all_keywords(self): - input_msg = "this keyword some keyword other keyword" - keyword = "keyword" - color_result = "colored" - expected_msg = "this colored some colored other colored" - - colored = Mock() - colored.underline.return_value = color_result - event = LogEvent("group_name", {"message": input_msg}) - - result = KeywordHighlighter(keyword).highlight_keywords(event, colored) - self.assertEqual(result.message, expected_msg) - colored.underline.assert_called_with(keyword) - - def test_must_ignore_if_keyword_is_absent(self): - colored = Mock() - input_msg = "this keyword some keyword other keyword" - event = LogEvent("group_name", {"message": input_msg}) - - result = KeywordHighlighter().highlight_keywords(event, colored) - self.assertEqual(result.message, input_msg) - colored.underline.assert_not_called() - - -class TestJSONMsgFormatter_format_json(TestCase): - def test_must_pretty_print_json(self): - data = {"a": "b"} - input_msg = '{"a": "b"}' - expected_msg = json.dumps(data, indent=2) - - event = LogEvent("group_name", {"message": input_msg}) - - result = JSONMsgFormatter.format_json(event, None) - self.assertEqual(result.message, expected_msg) - - @parameterized.expand(["this is not json", '{"not a valid json"}']) - def test_ignore_non_json(self, input_msg): - - event = LogEvent("group_name", {"message": input_msg}) - - result = JSONMsgFormatter.format_json(event, None) - self.assertEqual(result.message, input_msg) diff --git a/tests/unit/lib/observability/__init__.py b/tests/unit/lib/observability/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/unit/lib/observability/cw_logs/__init__.py b/tests/unit/lib/observability/cw_logs/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/unit/lib/observability/cw_logs/test_cw_log_consumers.py b/tests/unit/lib/observability/cw_logs/test_cw_log_consumers.py new file mode 100644 index 0000000000..d5d01c3704 --- /dev/null +++ b/tests/unit/lib/observability/cw_logs/test_cw_log_consumers.py @@ -0,0 +1,15 @@ +from unittest import TestCase +from unittest.mock import patch, Mock, call + +from samcli.lib.observability.cw_logs.cw_log_consumers import CWTerminalEventConsumer + + +class TestCWTerminalEventConsumer(TestCase): + def setUp(self): + self.consumer = CWTerminalEventConsumer() + + @patch("samcli.lib.observability.cw_logs.cw_log_consumers.click") + def test_consume_with_event(self, patched_click): + event = Mock() + self.consumer.consume(event) + patched_click.echo.assert_called_with(event, nl=False) diff --git a/tests/unit/lib/logs/test_event.py b/tests/unit/lib/observability/cw_logs/test_cw_log_event.py similarity index 51% rename from tests/unit/lib/logs/test_event.py rename to tests/unit/lib/observability/cw_logs/test_cw_log_event.py index c093edf0e2..62968f0d71 100644 --- a/tests/unit/lib/logs/test_event.py +++ b/tests/unit/lib/observability/cw_logs/test_cw_log_event.py @@ -1,9 +1,9 @@ from unittest import TestCase -from samcli.lib.logs.event import LogEvent +from samcli.lib.observability.cw_logs.cw_log_event import CWLogEvent -class TestLogEvent(TestCase): +class TestCWLogEvent(TestCase): def setUp(self): self.group_name = "log group name" self.stream_name = "stream name" @@ -12,44 +12,56 @@ def setUp(self): self.timestamp_str = "2018-07-06T13:09:54" def test_must_extract_fields_from_event(self): - event = LogEvent( + event = CWLogEvent( self.group_name, {"timestamp": self.timestamp, "logStreamName": self.stream_name, "message": self.message} ) - self.assertEqual(event.log_group_name, self.group_name) + self.assertEqual(event.cw_log_group, self.group_name) self.assertEqual(event.log_stream_name, self.stream_name) self.assertEqual(event.message, self.message) - self.assertEqual(self.timestamp_str, event.timestamp) + self.assertEqual(self.timestamp, event.timestamp) def test_must_ignore_if_some_fields_are_empty(self): - event = LogEvent(self.group_name, {"logStreamName": "stream name"}) + event = CWLogEvent(self.group_name, {"logStreamName": "stream name"}) - self.assertEqual(event.log_group_name, self.group_name) + self.assertEqual(event.cw_log_group, self.group_name) self.assertEqual(event.log_stream_name, self.stream_name) self.assertEqual(event.message, "") - self.assertIsNone(event.timestamp) + self.assertEqual(event.timestamp, 0) def test_must_ignore_if_event_is_empty(self): - event = LogEvent(self.group_name, {}) + event = CWLogEvent(self.group_name, {}) - self.assertEqual(event.log_group_name, self.group_name) - self.assertIsNone(event.log_stream_name) - self.assertIsNone(event.message) - self.assertIsNone(event.timestamp) + self.assertEqual(event.cw_log_group, self.group_name) + self.assertEqual(event.log_stream_name, "") + self.assertEqual(event.message, "") + self.assertEqual(event.timestamp, 0) def test_check_for_equality(self): - event = LogEvent( + event = CWLogEvent( self.group_name, {"timestamp": self.timestamp, "logStreamName": self.stream_name, "message": self.message} ) - other = LogEvent( + other = CWLogEvent( self.group_name, {"timestamp": self.timestamp, "logStreamName": self.stream_name, "message": self.message} ) self.assertEqual(event, other) + def test_check_for_inequality(self): + event = CWLogEvent( + self.group_name, + {"timestamp": self.timestamp + 1, "logStreamName": self.stream_name, "message": self.message}, + ) + + other = CWLogEvent( + self.group_name, {"timestamp": self.timestamp, "logStreamName": self.stream_name, "message": self.message} + ) + + self.assertNotEqual(event, other) + def test_check_for_equality_with_other_data_types(self): - event = LogEvent(self.group_name, {}) + event = CWLogEvent(self.group_name, {}) other = "this is not an event" self.assertNotEqual(event, other) diff --git a/tests/unit/lib/observability/cw_logs/test_cw_log_formatters.py b/tests/unit/lib/observability/cw_logs/test_cw_log_formatters.py new file mode 100644 index 0000000000..4a720f8075 --- /dev/null +++ b/tests/unit/lib/observability/cw_logs/test_cw_log_formatters.py @@ -0,0 +1,120 @@ +import json +from unittest import TestCase +from unittest.mock import Mock + +from parameterized import parameterized + +from samcli.lib.observability.cw_logs.cw_log_event import CWLogEvent +from samcli.lib.observability.cw_logs.cw_log_formatters import ( + CWPrettyPrintFormatter, + CWColorizeErrorsFormatter, + CWKeywordHighlighterFormatter, + CWJsonFormatter, +) + + +class TestCWPrettyPrintFormatter(TestCase): + def setUp(self): + self.colored = Mock() + self.pretty_print_formatter = CWPrettyPrintFormatter(self.colored) + self.group_name = "group name" + self.stream_name = "stream name" + self.message = "message" + self.event_dict = {"timestamp": 1, "message": self.message, "logStreamName": self.stream_name} + + def test_must_serialize_event(self): + colored_timestamp = "colored timestamp" + colored_stream_name = "colored stream name" + self.colored.yellow.return_value = colored_timestamp + self.colored.cyan.return_value = colored_stream_name + + event = CWLogEvent(self.group_name, self.event_dict) + + expected = " ".join([colored_stream_name, colored_timestamp, self.message]) + result = self.pretty_print_formatter.map(event) + + self.assertEqual(expected, result) + self.colored.yellow.has_calls() + self.colored.cyan.assert_called_with(self.stream_name) + + +class TestCWColorizeErrorsFormatter(TestCase): + def setUp(self): + self.colored = Mock() + self.formatter = CWColorizeErrorsFormatter(self.colored) + + @parameterized.expand( + [ + "Task timed out", + "Something happened. Task timed out. Something else happend", + "Process exited before completing request", + ] + ) + def test_must_color_crash_messages(self, input_msg): + color_result = "colored messaage" + self.colored.red.return_value = color_result + event = CWLogEvent("group_name", {"message": input_msg}) + + result = self.formatter.map(event) + self.assertEqual(result.message, color_result) + self.colored.red.assert_called_with(input_msg) + + def test_must_ignore_other_messages(self): + event = CWLogEvent("group_name", {"message": "some msg"}) + + result = self.formatter.map(event) + self.assertEqual(result.message, "some msg") + self.colored.red.assert_not_called() + + +class CWCWKeywordHighlighterFormatter(TestCase): + def setUp(self): + self.colored = Mock() + + def test_must_highlight_all_keywords(self): + input_msg = "this keyword some keyword other keyword" + keyword = "keyword" + color_result = "colored" + expected_msg = "this colored some colored other colored" + + formatter = CWKeywordHighlighterFormatter(self.colored, keyword) + + self.colored.underline.return_value = color_result + event = CWLogEvent("group_name", {"message": input_msg}) + + result = formatter.map(event) + self.assertEqual(result.message, expected_msg) + self.colored.underline.assert_called_with(keyword) + + def test_must_ignore_if_keyword_is_absent(self): + input_msg = "this keyword some keyword other keyword" + event = CWLogEvent("group_name", {"message": input_msg}) + + formatter = CWKeywordHighlighterFormatter(self.colored) + + result = formatter.map(event) + self.assertEqual(result.message, input_msg) + self.colored.underline.assert_not_called() + + +class TestCWJsonFormatter(TestCase): + def setUp(self): + self.formatter = CWJsonFormatter() + + def test_must_pretty_print_json(self): + data = {"a": "b"} + input_msg = '{"a": "b"}' + expected_msg = json.dumps(data, indent=2) + + event = CWLogEvent("group_name", {"message": input_msg}) + + result = self.formatter.map(event) + self.assertEqual(result.message, expected_msg) + + @parameterized.expand(["this is not json", '{"not a valid json"}']) + def test_ignore_non_json(self, input_msg): + + event = CWLogEvent("group_name", {"message": input_msg}) + + result = self.formatter.map(event) + self.assertEqual(result.message, input_msg) diff --git a/tests/unit/lib/logs/test_provider.py b/tests/unit/lib/observability/cw_logs/test_cw_log_group_provider.py similarity index 78% rename from tests/unit/lib/logs/test_provider.py rename to tests/unit/lib/observability/cw_logs/test_cw_log_group_provider.py index 59da01928c..295ad6d898 100644 --- a/tests/unit/lib/logs/test_provider.py +++ b/tests/unit/lib/observability/cw_logs/test_cw_log_group_provider.py @@ -1,6 +1,6 @@ from unittest import TestCase -from samcli.lib.logs.provider import LogGroupProvider +from samcli.lib.observability.cw_logs.cw_log_group_provider import LogGroupProvider class TestLogGroupProvider_for_lambda_function(TestCase): diff --git a/tests/unit/lib/observability/cw_logs/test_cw_log_puller.py b/tests/unit/lib/observability/cw_logs/test_cw_log_puller.py new file mode 100644 index 0000000000..98f4e6d3de --- /dev/null +++ b/tests/unit/lib/observability/cw_logs/test_cw_log_puller.py @@ -0,0 +1,322 @@ +import copy +from datetime import datetime +from unittest import TestCase +from unittest.mock import Mock, call, patch, ANY + +import botocore.session +from botocore.stub import Stubber + +from samcli.lib.observability.cw_logs.cw_log_event import CWLogEvent +from samcli.lib.observability.cw_logs.cw_log_puller import CWLogPuller +from samcli.lib.utils.time import to_timestamp, to_datetime + + +class TestCWLogPuller_load_time_period(TestCase): + def setUp(self): + self.log_group_name = "name" + self.stream_name = "stream name" + self.timestamp = to_timestamp(datetime.utcnow()) + + real_client = botocore.session.get_session().create_client("logs", region_name="us-east-1") + self.client_stubber = Stubber(real_client) + self.consumer = Mock() + self.fetcher = CWLogPuller(real_client, self.consumer, self.log_group_name) + + self.mock_api_response = { + "events": [ + { + "eventId": "id1", + "ingestionTime": 0, + "logStreamName": self.stream_name, + "message": "message 1", + "timestamp": self.timestamp, + }, + { + "eventId": "id2", + "ingestionTime": 0, + "logStreamName": self.stream_name, + "message": "message 2", + "timestamp": self.timestamp, + }, + ] + } + + self.expected_events = [ + CWLogEvent( + self.log_group_name, + { + "eventId": "id1", + "ingestionTime": 0, + "logStreamName": self.stream_name, + "message": "message 1", + "timestamp": self.timestamp, + }, + ), + CWLogEvent( + self.log_group_name, + { + "eventId": "id2", + "ingestionTime": 0, + "logStreamName": self.stream_name, + "message": "message 2", + "timestamp": self.timestamp, + }, + ), + ] + + def test_must_fetch_logs_for_log_group(self): + expected_params = {"logGroupName": self.log_group_name, "interleaved": True} + + # Configure the stubber to return the configured response. The stubber also verifies + # that input params were provided as expected + self.client_stubber.add_response("filter_log_events", self.mock_api_response, expected_params) + + with self.client_stubber: + self.fetcher.load_time_period() + + call_args = [args[0] for (args, _) in self.consumer.consume.call_args_list] + for event in self.expected_events: + self.assertIn(event, call_args) + + def test_must_fetch_logs_with_all_params(self): + pattern = "foobar" + start = datetime.utcnow() + end = datetime.utcnow() + + expected_params = { + "logGroupName": self.log_group_name, + "interleaved": True, + "startTime": to_timestamp(start), + "endTime": to_timestamp(end), + "filterPattern": pattern, + } + + self.client_stubber.add_response("filter_log_events", self.mock_api_response, expected_params) + + with self.client_stubber: + self.fetcher.load_time_period(start_time=start, end_time=end, filter_pattern=pattern) + + call_args = [args[0] for (args, _) in self.consumer.consume.call_args_list] + for event in self.expected_events: + self.assertIn(event, call_args) + + def test_must_paginate_using_next_token(self): + """Make three API calls, first two returns a nextToken and last does not.""" + token = "token" + expected_params = {"logGroupName": self.log_group_name, "interleaved": True} + expected_params_with_token = {"logGroupName": self.log_group_name, "interleaved": True, "nextToken": token} + + mock_response_with_token = copy.deepcopy(self.mock_api_response) + mock_response_with_token["nextToken"] = token + + # Call 1 returns a token. Also when first call is made, token is **not** passed as API params + self.client_stubber.add_response("filter_log_events", mock_response_with_token, expected_params) + + # Call 2 returns a token + self.client_stubber.add_response("filter_log_events", mock_response_with_token, expected_params_with_token) + + # Call 3 DOES NOT return a token. This will terminate the loop. + self.client_stubber.add_response("filter_log_events", self.mock_api_response, expected_params_with_token) + + # Same data was returned in each API call + expected_events_result = self.expected_events + self.expected_events + self.expected_events + + with self.client_stubber: + self.fetcher.load_time_period() + + call_args = [args[0] for (args, _) in self.consumer.consume.call_args_list] + for event in expected_events_result: + self.assertIn(event, call_args) + + +class TestCWLogPuller_tail(TestCase): + def setUp(self): + self.log_group_name = "name" + self.filter_pattern = "pattern" + self.start_time = to_datetime(10) + self.max_retries = 3 + self.poll_interval = 1 + + real_client = botocore.session.get_session().create_client("logs", region_name="us-east-1") + self.client_stubber = Stubber(real_client) + self.consumer = Mock() + self.fetcher = CWLogPuller( + real_client, + self.consumer, + self.log_group_name, + max_retries=self.max_retries, + poll_interval=self.poll_interval, + ) + + self.mock_api_empty_response = {"events": []} + self.mock_api_response_1 = { + "events": [ + { + "timestamp": 11, + }, + { + "timestamp": 12, + }, + ] + } + self.mock_api_response_2 = { + "events": [ + { + "timestamp": 13, + }, + { + "timestamp": 14, + }, + ] + } + + self.mock_events1 = [ + CWLogEvent(self.log_group_name, {"timestamp": 11}), + CWLogEvent(self.log_group_name, {"timestamp": 12}), + ] + self.mock_events2 = [ + CWLogEvent(self.log_group_name, {"timestamp": 13}), + CWLogEvent(self.log_group_name, {"timestamp": 14}), + ] + self.mock_events_empty = [] + + @patch("samcli.lib.observability.cw_logs.cw_log_puller.time") + def test_must_tail_logs_with_single_data_fetch(self, time_mock): + expected_params = { + "logGroupName": self.log_group_name, + "interleaved": True, + "startTime": 10, + "filterPattern": self.filter_pattern, + } + expected_params_second_try = { + "logGroupName": self.log_group_name, + "interleaved": True, + "startTime": 13, + "filterPattern": self.filter_pattern, + } + + # first successful return + self.client_stubber.add_response("filter_log_events", self.mock_api_response_1, expected_params) + # 3 empty returns as the number of max retries + self.client_stubber.add_response("filter_log_events", self.mock_api_empty_response, expected_params_second_try) + self.client_stubber.add_response("filter_log_events", self.mock_api_empty_response, expected_params_second_try) + self.client_stubber.add_response("filter_log_events", self.mock_api_empty_response, expected_params_second_try) + + with patch.object( + self.fetcher, "load_time_period", wraps=self.fetcher.load_time_period + ) as patched_load_time_period: + with self.client_stubber: + self.fetcher.tail( + start_time=self.start_time, + filter_pattern=self.filter_pattern, + ) + + expected_load_time_period_calls = [ + # First fetch returns data + call(self.start_time, filter_pattern=self.filter_pattern), + # Three empty fetches + call(to_datetime(13), filter_pattern=self.filter_pattern), + call(to_datetime(13), filter_pattern=self.filter_pattern), + call(to_datetime(13), filter_pattern=self.filter_pattern), + ] + + # One per poll + expected_sleep_calls = [call(self.poll_interval) for _ in expected_load_time_period_calls] + + consumer_call_args = [args[0] for (args, _) in self.consumer.consume.call_args_list] + + self.assertEqual(self.mock_events1, consumer_call_args) + self.assertEqual(expected_sleep_calls, time_mock.sleep.call_args_list) + self.assertEqual(expected_load_time_period_calls, patched_load_time_period.call_args_list) + + @patch("samcli.lib.observability.cw_logs.cw_log_puller.time") + def test_must_tail_logs_with_multiple_data_fetches(self, time_mock): + expected_params = { + "logGroupName": self.log_group_name, + "interleaved": True, + "startTime": 10, + "filterPattern": self.filter_pattern, + } + expected_params_second_try = { + "logGroupName": self.log_group_name, + "interleaved": True, + "startTime": 13, + "filterPattern": self.filter_pattern, + } + expected_params_third_try = { + "logGroupName": self.log_group_name, + "interleaved": True, + "startTime": 15, + "filterPattern": self.filter_pattern, + } + + self.client_stubber.add_response("filter_log_events", self.mock_api_response_1, expected_params) + self.client_stubber.add_response("filter_log_events", self.mock_api_empty_response, expected_params_second_try) + self.client_stubber.add_response("filter_log_events", self.mock_api_response_2, expected_params_second_try) + self.client_stubber.add_response("filter_log_events", self.mock_api_empty_response, expected_params_third_try) + self.client_stubber.add_response("filter_log_events", self.mock_api_empty_response, expected_params_third_try) + self.client_stubber.add_response("filter_log_events", self.mock_api_empty_response, expected_params_third_try) + + expected_load_time_period_calls = [ + # First fetch returns data + call(self.start_time, filter_pattern=self.filter_pattern), + # This fetch was empty + call(to_datetime(13), filter_pattern=self.filter_pattern), + # This fetch returned data + call(to_datetime(13), filter_pattern=self.filter_pattern), + # Three empty fetches + call(to_datetime(15), filter_pattern=self.filter_pattern), + call(to_datetime(15), filter_pattern=self.filter_pattern), + call(to_datetime(15), filter_pattern=self.filter_pattern), + ] + + # One per poll + expected_sleep_calls = [call(self.poll_interval) for _ in expected_load_time_period_calls] + + with patch.object( + self.fetcher, "load_time_period", wraps=self.fetcher.load_time_period + ) as patched_load_time_period: + with self.client_stubber: + self.fetcher.tail(start_time=self.start_time, filter_pattern=self.filter_pattern) + + expected_consumer_call_args = [args[0] for (args, _) in self.consumer.consume.call_args_list] + + self.assertEqual(self.mock_events1 + self.mock_events2, expected_consumer_call_args) + self.assertEqual(expected_load_time_period_calls, patched_load_time_period.call_args_list) + self.assertEqual(expected_sleep_calls, time_mock.sleep.call_args_list) + + @patch("samcli.lib.observability.cw_logs.cw_log_puller.time") + def test_without_start_time(self, time_mock): + expected_params = { + "logGroupName": self.log_group_name, + "interleaved": True, + "startTime": 0, + "filterPattern": self.filter_pattern, + } + self.client_stubber.add_response("filter_log_events", self.mock_api_empty_response, expected_params) + self.client_stubber.add_response("filter_log_events", self.mock_api_empty_response, expected_params) + self.client_stubber.add_response("filter_log_events", self.mock_api_empty_response, expected_params) + + expected_load_time_period_calls = [ + # Three empty fetches, all with default start time + call(to_datetime(0), filter_pattern=ANY), + call(to_datetime(0), filter_pattern=ANY), + call(to_datetime(0), filter_pattern=ANY), + ] + + # One per poll + expected_sleep_calls = [call(self.poll_interval) for _ in expected_load_time_period_calls] + + with patch.object( + self.fetcher, "load_time_period", wraps=self.fetcher.load_time_period + ) as patched_load_time_period: + with self.client_stubber: + self.fetcher.tail( + filter_pattern=self.filter_pattern, + ) + + expected_consumer_call_args = [args[0] for (args, _) in self.consumer.consume.call_args_list] + + self.assertEqual([], expected_consumer_call_args) + self.assertEqual(expected_load_time_period_calls, patched_load_time_period.call_args_list) + self.assertEqual(expected_sleep_calls, time_mock.sleep.call_args_list) diff --git a/tests/unit/lib/observability/test_observability_info_puller.py b/tests/unit/lib/observability/test_observability_info_puller.py new file mode 100644 index 0000000000..3fbbb9fe34 --- /dev/null +++ b/tests/unit/lib/observability/test_observability_info_puller.py @@ -0,0 +1,50 @@ +from unittest import TestCase +from unittest.mock import Mock + +from parameterized import parameterized, param + +from samcli.lib.observability.observability_info_puller import ObservabilityEventConsumerDecorator + + +class TestObservabilityEventConsumerDecorator(TestCase): + def test_decorator(self): + actual_consumer = Mock() + event = Mock() + + consumer_decorator = ObservabilityEventConsumerDecorator([], actual_consumer) + consumer_decorator.consume(event) + + actual_consumer.consume.assert_called_with(event) + + def test_decorator_with_mapper(self): + actual_consumer = Mock() + event = Mock() + mapped_event = Mock() + mapper = Mock() + mapper.map.return_value = mapped_event + + consumer_decorator = ObservabilityEventConsumerDecorator([mapper], actual_consumer) + consumer_decorator.consume(event) + + mapper.map.assert_called_with(event) + actual_consumer.consume.assert_called_with(mapped_event) + + @parameterized.expand( + [ + param([Mock()]), + param([Mock(), Mock()]), + param([Mock(), Mock(), Mock()]), + ] + ) + def test_decorator_with_mappers(self, mappers): + actual_consumer = Mock() + event = Mock() + for mapper in mappers: + mapper.map.return_value = event + + consumer_decorator = ObservabilityEventConsumerDecorator(mappers, actual_consumer) + consumer_decorator.consume(event) + + actual_consumer.consume.assert_called_with(event) + for mapper in mappers: + mapper.map.assert_called_with(event) From c25adfe94aa788f63d920a0a49e90feb3d6a1678 Mon Sep 17 00:00:00 2001 From: Mehmet Nuri Deveci <5735811+mndeveci@users.noreply.github.com> Date: Tue, 11 May 2021 13:42:35 -0700 Subject: [PATCH 2/5] re-organize due to click usage --- .../logs/console_consumers.py} | 4 ++-- samcli/commands/logs/logs_context.py | 4 ++-- .../logs/test_console_consumers.py} | 8 ++++---- 3 files changed, 8 insertions(+), 8 deletions(-) rename samcli/{lib/observability/cw_logs/cw_log_consumers.py => commands/logs/console_consumers.py} (74%) rename tests/unit/{lib/observability/cw_logs/test_cw_log_consumers.py => commands/logs/test_console_consumers.py} (52%) diff --git a/samcli/lib/observability/cw_logs/cw_log_consumers.py b/samcli/commands/logs/console_consumers.py similarity index 74% rename from samcli/lib/observability/cw_logs/cw_log_consumers.py rename to samcli/commands/logs/console_consumers.py index 5e41b4275b..ead06c7baf 100644 --- a/samcli/lib/observability/cw_logs/cw_log_consumers.py +++ b/samcli/commands/logs/console_consumers.py @@ -1,5 +1,5 @@ """ -Consumers for CloudWatch log events +Consumers that will print out events to console """ from typing import Any @@ -8,7 +8,7 @@ from samcli.lib.observability.observability_info_puller import ObservabilityEventConsumer -class CWTerminalEventConsumer(ObservabilityEventConsumer): +class CWConsoleEventConsumer(ObservabilityEventConsumer): """ Consumer implementation that will consume given event as outputting into console """ diff --git a/samcli/commands/logs/logs_context.py b/samcli/commands/logs/logs_context.py index e9d9213bc5..5504895a70 100644 --- a/samcli/commands/logs/logs_context.py +++ b/samcli/commands/logs/logs_context.py @@ -8,7 +8,7 @@ import botocore from samcli.commands.exceptions import UserException -from samcli.lib.observability.cw_logs.cw_log_consumers import CWTerminalEventConsumer +from samcli.commands.logs.console_consumers import CWConsoleEventConsumer from samcli.lib.observability.cw_logs.cw_log_formatters import ( CWColorizeErrorsFormatter, CWJsonFormatter, @@ -114,7 +114,7 @@ def fetcher(self): CWKeywordHighlighterFormatter(self.colored, self._filter_pattern), CWPrettyPrintFormatter(self.colored), ], - consumer=CWTerminalEventConsumer(), + consumer=CWConsoleEventConsumer(), ), cw_log_group=self.log_group_name, resource_name=self._function_name, diff --git a/tests/unit/lib/observability/cw_logs/test_cw_log_consumers.py b/tests/unit/commands/logs/test_console_consumers.py similarity index 52% rename from tests/unit/lib/observability/cw_logs/test_cw_log_consumers.py rename to tests/unit/commands/logs/test_console_consumers.py index d5d01c3704..9a09d2e4ec 100644 --- a/tests/unit/lib/observability/cw_logs/test_cw_log_consumers.py +++ b/tests/unit/commands/logs/test_console_consumers.py @@ -1,14 +1,14 @@ from unittest import TestCase from unittest.mock import patch, Mock, call -from samcli.lib.observability.cw_logs.cw_log_consumers import CWTerminalEventConsumer +from samcli.commands.logs.console_consumers import CWConsoleEventConsumer -class TestCWTerminalEventConsumer(TestCase): +class TestCWConsoleEventConsumer(TestCase): def setUp(self): - self.consumer = CWTerminalEventConsumer() + self.consumer = CWConsoleEventConsumer() - @patch("samcli.lib.observability.cw_logs.cw_log_consumers.click") + @patch("samcli.commands.logs.console_consumers.click") def test_consume_with_event(self, patched_click): event = Mock() self.consumer.consume(event) From 077fd68d05a767e12dd7c2c13d7f3b9255c9158d Mon Sep 17 00:00:00 2001 From: Mehmet Nuri Deveci <5735811+mndeveci@users.noreply.github.com> Date: Thu, 13 May 2021 10:31:46 -0700 Subject: [PATCH 3/5] address comments --- samcli/commands/logs/console_consumers.py | 8 ++++---- .../observability/cw_logs/cw_log_formatters.py | 16 +++++++++++----- .../observability/observability_info_puller.py | 4 ++-- .../unit/commands/logs/test_console_consumers.py | 4 ++-- .../cw_logs/test_cw_log_formatters.py | 2 +- 5 files changed, 20 insertions(+), 14 deletions(-) diff --git a/samcli/commands/logs/console_consumers.py b/samcli/commands/logs/console_consumers.py index ead06c7baf..42c76b762f 100644 --- a/samcli/commands/logs/console_consumers.py +++ b/samcli/commands/logs/console_consumers.py @@ -1,17 +1,17 @@ """ Consumers that will print out events to console """ -from typing import Any import click +from samcli.lib.observability.cw_logs.cw_log_event import CWLogEvent from samcli.lib.observability.observability_info_puller import ObservabilityEventConsumer -class CWConsoleEventConsumer(ObservabilityEventConsumer): +class CWConsoleEventConsumer(ObservabilityEventConsumer[CWLogEvent]): """ Consumer implementation that will consume given event as outputting into console """ - def consume(self, event: Any): - click.echo(event, nl=False) + def consume(self, event: CWLogEvent): + click.echo(event.message, nl=False) diff --git a/samcli/lib/observability/cw_logs/cw_log_formatters.py b/samcli/lib/observability/cw_logs/cw_log_formatters.py index 3c4acaa820..a9e56a6101 100644 --- a/samcli/lib/observability/cw_logs/cw_log_formatters.py +++ b/samcli/lib/observability/cw_logs/cw_log_formatters.py @@ -2,12 +2,16 @@ Contains all mappers (formatters) for CloudWatch logs """ import json +import logging +from json import JSONDecodeError from samcli.lib.observability.cw_logs.cw_log_event import CWLogEvent from samcli.lib.observability.observability_info_puller import ObservabilityEventMapper from samcli.lib.utils.colors import Colored from samcli.lib.utils.time import timestamp_to_iso +LOG = logging.getLogger(__name__) + class CWKeywordHighlighterFormatter(ObservabilityEventMapper[CWLogEvent]): """ @@ -60,13 +64,15 @@ class CWJsonFormatter(ObservabilityEventMapper[CWLogEvent]): """ # pylint: disable=R0201 + # Pylint recommends converting this method to a static one but we want it to stay as it is + # since formatters/mappers are combined in an array of ObservabilityEventMapper class def map(self, event: CWLogEvent) -> CWLogEvent: try: if event.message.startswith("{"): msg_dict = json.loads(event.message) event.message = json.dumps(msg_dict, indent=2) - except Exception: - pass + except JSONDecodeError as err: + LOG.debug("Can't decode string (%s) as JSON. Error (%s)", event.message, err) return event @@ -80,8 +86,8 @@ class CWPrettyPrintFormatter(ObservabilityEventMapper[CWLogEvent]): def __init__(self, colored: Colored): self._colored = colored - # pylint: disable=R0201 - def map(self, event: CWLogEvent) -> str: + def map(self, event: CWLogEvent) -> CWLogEvent: timestamp = self._colored.yellow(timestamp_to_iso(int(event.timestamp))) log_stream_name = self._colored.cyan(event.log_stream_name) - return f"{log_stream_name} {timestamp} {event.message}" + event.message = f"{log_stream_name} {timestamp} {event.message}" + return event diff --git a/samcli/lib/observability/observability_info_puller.py b/samcli/lib/observability/observability_info_puller.py index afd6efdd7f..a9ecfd0897 100644 --- a/samcli/lib/observability/observability_info_puller.py +++ b/samcli/lib/observability/observability_info_puller.py @@ -96,14 +96,14 @@ def map(self, event: ObservabilityEventType) -> Any: """ -class ObservabilityEventConsumer(ABC): +class ObservabilityEventConsumer(Generic[ObservabilityEventType]): """ Consumer interface, which will consume any event. An example is to output event into console. """ @abstractmethod - def consume(self, event: ObservabilityEvent): + def consume(self, event: ObservabilityEventType): """ Parameters ---------- diff --git a/tests/unit/commands/logs/test_console_consumers.py b/tests/unit/commands/logs/test_console_consumers.py index 9a09d2e4ec..ab824ca769 100644 --- a/tests/unit/commands/logs/test_console_consumers.py +++ b/tests/unit/commands/logs/test_console_consumers.py @@ -1,5 +1,5 @@ from unittest import TestCase -from unittest.mock import patch, Mock, call +from unittest.mock import patch, Mock from samcli.commands.logs.console_consumers import CWConsoleEventConsumer @@ -12,4 +12,4 @@ def setUp(self): def test_consume_with_event(self, patched_click): event = Mock() self.consumer.consume(event) - patched_click.echo.assert_called_with(event, nl=False) + patched_click.echo.assert_called_with(event.message, nl=False) diff --git a/tests/unit/lib/observability/cw_logs/test_cw_log_formatters.py b/tests/unit/lib/observability/cw_logs/test_cw_log_formatters.py index 4a720f8075..f864ff1fe7 100644 --- a/tests/unit/lib/observability/cw_logs/test_cw_log_formatters.py +++ b/tests/unit/lib/observability/cw_logs/test_cw_log_formatters.py @@ -33,7 +33,7 @@ def test_must_serialize_event(self): expected = " ".join([colored_stream_name, colored_timestamp, self.message]) result = self.pretty_print_formatter.map(event) - self.assertEqual(expected, result) + self.assertEqual(expected, result.message) self.colored.yellow.has_calls() self.colored.cyan.assert_called_with(self.stream_name) From e7d8d20cbc4304d5205a199e636c662815641ada Mon Sep 17 00:00:00 2001 From: Mehmet Nuri Deveci <5735811+mndeveci@users.noreply.github.com> Date: Thu, 13 May 2021 12:35:39 -0700 Subject: [PATCH 4/5] adding pylint disable for console consumer --- samcli/commands/logs/console_consumers.py | 1 + 1 file changed, 1 insertion(+) diff --git a/samcli/commands/logs/console_consumers.py b/samcli/commands/logs/console_consumers.py index 42c76b762f..2f77e34ab0 100644 --- a/samcli/commands/logs/console_consumers.py +++ b/samcli/commands/logs/console_consumers.py @@ -13,5 +13,6 @@ class CWConsoleEventConsumer(ObservabilityEventConsumer[CWLogEvent]): Consumer implementation that will consume given event as outputting into console """ + # pylint: disable=R0201 def consume(self, event: CWLogEvent): click.echo(event.message, nl=False) From d3dd744c9d8c0b8da5d7a8c4da512e12793d0159 Mon Sep 17 00:00:00 2001 From: Mehmet Nuri Deveci <5735811+mndeveci@users.noreply.github.com> Date: Thu, 13 May 2021 14:24:28 -0700 Subject: [PATCH 5/5] make pylint happy with python 3.6 --- samcli/lib/observability/cw_logs/cw_log_formatters.py | 1 + samcli/lib/observability/observability_info_puller.py | 1 + 2 files changed, 2 insertions(+) diff --git a/samcli/lib/observability/cw_logs/cw_log_formatters.py b/samcli/lib/observability/cw_logs/cw_log_formatters.py index a9e56a6101..f0d35a18a6 100644 --- a/samcli/lib/observability/cw_logs/cw_log_formatters.py +++ b/samcli/lib/observability/cw_logs/cw_log_formatters.py @@ -43,6 +43,7 @@ class CWColorizeErrorsFormatter(ObservabilityEventMapper[CWLogEvent]): Mapper implementation which will colorize some pre-defined error messages """ + # couple of pre-defined error messages for lambda functions which will be colorized when getting the logs NODEJS_CRASH_MESSAGE = "Process exited before completing request" TIMEOUT_MSG = "Task timed out" diff --git a/samcli/lib/observability/observability_info_puller.py b/samcli/lib/observability/observability_info_puller.py index a9ecfd0897..b6d6f2b906 100644 --- a/samcli/lib/observability/observability_info_puller.py +++ b/samcli/lib/observability/observability_info_puller.py @@ -128,6 +128,7 @@ def __init__(self, mappers: List[ObservabilityEventMapper], consumer: Observabil consumer : ObservabilityEventConsumer Actual consumer which will handle the events after they are processed by mappers """ + super().__init__() self._mappers = mappers self._consumer = consumer