diff --git a/docs/conf.py b/docs/conf.py index 1bf2eea..a0805d9 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -14,9 +14,7 @@ import inspect import shutil -__location__ = os.path.join( - os.getcwd(), os.path.dirname(inspect.getfile(inspect.currentframe())) -) +__location__ = os.path.join(os.getcwd(), os.path.dirname(inspect.getfile(inspect.currentframe()))) # If extensions (or modules to document with autodoc) are in another directory, # add these directories to sys.path here. If the directory is relative to the diff --git a/pylintrc b/pylintrc new file mode 100644 index 0000000..cb8c3cd --- /dev/null +++ b/pylintrc @@ -0,0 +1,4 @@ +[messages control] +disable= + fixme, + duplicate-code diff --git a/requirements.txt b/requirements.txt index 83a46aa..003a0eb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,5 @@ eiffellib[rabbitmq]~=2.4 requests~=2.31 kubernetes~=26.1 pyyaml~=6.0 +opentelemetry-api~=1.21 +opentelemetry-semantic-conventions~=0.42b0 diff --git a/setup.cfg b/setup.cfg index 53b3fce..e6f86a6 100644 --- a/setup.cfg +++ b/setup.cfg @@ -32,6 +32,8 @@ install_requires = requests~=2.31 kubernetes~=26.1 pyyaml~=6.0 + opentelemetry-api~=1.21 + opentelemetry-semantic-conventions~=0.42b0 # Require a specific Python version, e.g. Python 2.7 or >= 3.4 python_requires = >=3.4 diff --git a/src/etos_lib/eiffel/__init__.py b/src/etos_lib/eiffel/__init__.py new file mode 100644 index 0000000..c9bd6d9 --- /dev/null +++ b/src/etos_lib/eiffel/__init__.py @@ -0,0 +1,16 @@ +# Copyright Axis Communications AB. +# +# For a full list of individual contributors, please see the commit history. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""ETOS library eiffel helpers.""" diff --git a/src/etos_lib/eiffel/common.py b/src/etos_lib/eiffel/common.py new file mode 100644 index 0000000..00bae6d --- /dev/null +++ b/src/etos_lib/eiffel/common.py @@ -0,0 +1,117 @@ +# Copyright Axis Communications AB. +# +# For a full list of individual contributors, please see the commit history. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Common functions for the eiffel helpers.""" +from collections.abc import MutableMapping, Sequence +from typing import Iterable, Optional + +from eiffellib.events.eiffel_base_event import EiffelBaseEvent +from opentelemetry.semconv.trace import MessagingOperationValues, SpanAttributes +from opentelemetry.trace.span import Span +from pika.channel import Channel +from pika.spec import BasicProperties + +from ..lib.config import Config + +PUBLISHER_TEMPLATE = "{SERVER_ADDRESS}:{SERVER_PORT},{RABBITMQ_VHOST},{RABBITMQ_EXCHANGE}" +CONSUMER_TEMPLATE = "{RABBITMQ_QUEUE_NAME}" +# pylint:disable=too-many-arguments + + +def add_span_attributes( + span: Span, + channel: Channel, + properties: BasicProperties, + routing_key: str, + operation: MessagingOperationValues, + destination_name: Optional[str] = None, +) -> None: + """Add rabbitmq properties to a span. + + Copied and modified from: + https://github.com/open-telemetry/opentelemetry-python-contrib/blob/main/instrumentation/opentelemetry-instrumentation-pika + """ + ssl = bool(Config().get("rabbitmq").get("ssl")) + + span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "rabbitmq") + span.set_attribute(SpanAttributes.MESSAGING_OPERATION, operation.value) + span.set_attribute(SpanAttributes.MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY, routing_key) + + if destination_name is not None: + span.set_attribute(SpanAttributes.MESSAGING_DESTINATION_NAME, destination_name) + span.set_attribute("messaging.destination_publish.name", properties.type) + span.set_attribute(SpanAttributes.MESSAGING_DESTINATION_TEMPLATE, CONSUMER_TEMPLATE) + else: + span.set_attribute(SpanAttributes.MESSAGING_DESTINATION_NAME, properties.type) + span.set_attribute(SpanAttributes.MESSAGING_DESTINATION_TEMPLATE, PUBLISHER_TEMPLATE) + + span.set_attribute(SpanAttributes.NETWORK_PROTOCOL_NAME, "amqps" if ssl else "amqp") + span.set_attribute(SpanAttributes.NETWORK_TYPE, "ipv4") + span.set_attribute(SpanAttributes.NETWORK_TRANSPORT, "tcp") + + span.set_attribute(SpanAttributes.SERVER_ADDRESS, channel.connection.params.host) + span.set_attribute(SpanAttributes.SERVER_PORT, channel.connection.params.port) + + +def add_span_eiffel_attributes(span: Span, event: EiffelBaseEvent) -> None: + """Add Eiffel properties to a span.""" + span.set_attribute(SpanAttributes.EVENT_NAME, event.meta.type) + span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, event.meta.event_id) + + +def _flatten(d: dict, parent_key: str = "", sep: str = ".") -> Iterable[str, str]: + """Flatten a dictionary to be compatible with opentelemetry.""" + for k, v in d.items(): + new_key = parent_key + sep + k if parent_key else k + if isinstance(v, MutableMapping): + yield from _flatten_dict(v, new_key, sep=sep).items() + elif isinstance(v, list): + for i, lv in enumerate(v): + if isinstance(lv, str): + yield new_key, v + break + if isinstance(lv, MutableMapping): + new_key = new_key + sep + str(i) + yield from _flatten_dict(lv, new_key, sep=sep).items() + else: + yield new_key, v + + +def _flatten_dict(d: MutableMapping, parent_key: str = "", sep: str = ".") -> dict: + """Call flatten on a dictionary.""" + return dict(_flatten(d, parent_key, sep)) + + +def _links_to_dict(links: Sequence) -> MutableMapping: + """Convert an Eiffel links structure to a dictionary.""" + dict_links = {} + for link in links: + key = link["type"].lower() + if key in dict_links: + if not isinstance(dict_links[key], list): + dict_links[key] = [dict_links[key]] + dict_links[key].append(link["target"]) + else: + dict_links[key] = link["target"] + return dict_links + + +def add_event(event: EiffelBaseEvent) -> dict: + """Add event data to a dictionary.""" + attributes = {} + event_json = event.json + event_json["links"] = _links_to_dict(event_json.pop("links")) + attributes.update(**_flatten_dict(event_json, parent_key="eiffel")) + return attributes diff --git a/src/etos_lib/eiffel/publisher.py b/src/etos_lib/eiffel/publisher.py new file mode 100644 index 0000000..13f826f --- /dev/null +++ b/src/etos_lib/eiffel/publisher.py @@ -0,0 +1,128 @@ +# Copyright Axis Communications AB. +# +# For a full list of individual contributors, please see the commit history. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Custom publishers for eiffellib.""" +import logging +import time +from copy import deepcopy +from threading import current_thread + +from eiffellib.events.eiffel_base_event import EiffelBaseEvent +from eiffellib.publishers.rabbitmq_publisher import RabbitMQPublisher +from opentelemetry import propagate, trace +from opentelemetry.semconv.trace import MessagingOperationValues +from opentelemetry.trace import SpanKind +from pika.spec import BasicProperties + +from .common import add_event, add_span_attributes, add_span_eiffel_attributes + +_LOG = logging.getLogger(__name__) + + +class TracingRabbitMQPublisher(RabbitMQPublisher): + """Custom RabbitMQ publisher that propagates otel trace information to headers.""" + + def __init__(self, *args, **kwargs): + """Get a tracer.""" + # Must import this here, otherwise there would be a cyclic import problem. + # pylint:disable=cyclic-import,import-outside-toplevel + from etos_lib import __version__ + + super().__init__(*args, **kwargs) + self.tracer = trace.get_tracer( + __name__, + __version__, + schema_url="https://opentelemetry.io/schemas/1.11.0", + ) + self.destination = f"{self.parameters.host},{self.parameters.virtual_host},{self.exchange}" + + def send_event(self, event: EiffelBaseEvent, block: bool = True) -> None: + """Validate and send an eiffel event to the rabbitmq server. + + This method will set the source on all events if there is a source + added to the :obj:`RabbitMQPublisher`. + If the routing key is set to None in the :obj:`RabbitMQPublisher` this + method will use the routing key from the event that is being sent. + The event domainId will also be added to `meta.source` if it is set to + anything other than the default value. If there is no domainId + set on the event, then the domainId from the source in the + :obj:`RabbitMQPublisher` will be used in the routing key, with a default + value taken from the :obj:`eiffellib.events.eiffel_base_event.EiffelBaseEvent`. + + :param event: Event to send. + :type event: :obj:`eiffellib.events.eiffel_base_event.EiffelBaseEvent` + :param block: Set to True in order to block for channel to become ready. + Default: True + :type block: bool + """ + if block: + self.wait_start() + while self._channel is None or not self._channel.is_open: + time.sleep(0.1) + + properties = BasicProperties( + content_type="application/json", delivery_mode=2, headers={}, type=self.destination + ) + + source = deepcopy(self.source) + if self.routing_key is None and event.domain_id != EiffelBaseEvent.domain_id: + source = source or {} + source["domainId"] = event.domain_id + elif self.routing_key is None and source is not None: + # EiffelBaseEvent.domain_id will be the default value. + # By using that value instead of setting the default in this + # method there will only be one place to set the default (the events). + event.domain_id = source.get("domainId", EiffelBaseEvent.domain_id) + if source is not None: + event.meta.add("source", source) + event.validate() + routing_key = self.routing_key or event.routing_key + + task_name = f"{self.exchange if self.exchange else routing_key} send" + span = self.tracer.start_span( + name=task_name, + kind=SpanKind.PRODUCER, + ) + if span.is_recording(): + add_span_attributes( + span, + self._channel, + properties, + routing_key, + MessagingOperationValues.PUBLISH, + ) + add_span_eiffel_attributes(span, event) + + _LOG.debug("[%s] Attempting to acquire 'send_event' lock", current_thread().name) + with self._lock, trace.use_span(span, end_on_exit=True) as _span: + _LOG.debug("[%s] 'send_event' Lock acquired", current_thread().name) + propagate.inject(properties.headers) + if properties.headers == {}: # Tracing is not enabled? + properties.headers = None + try: + self._channel.basic_publish( + self.exchange, + routing_key, + event.serialized, + properties, + ) + _span.add_event("Published event", attributes=add_event(event)) + except Exception as exception: # pylint:disable=broad-except + self._nacked_deliveries.append(event) + _span.record_exception(exception, escaped=True) + return + self._delivered += 1 + self._deliveries[self._delivered] = event + _LOG.debug("[%s] 'send_event' Lock released", current_thread().name) diff --git a/src/etos_lib/eiffel/subscriber.py b/src/etos_lib/eiffel/subscriber.py new file mode 100644 index 0000000..d23ab67 --- /dev/null +++ b/src/etos_lib/eiffel/subscriber.py @@ -0,0 +1,169 @@ +# Copyright Axis Communications AB. +# +# For a full list of individual contributors, please see the commit history. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Custom subscribers for eiffellib.""" +import functools +import json +import logging +import traceback +from typing import Optional + +import eiffellib.events +from eiffellib.events.eiffel_base_event import EiffelBaseEvent +from eiffellib.subscribers.rabbitmq_subscriber import RabbitMQSubscriber +from opentelemetry import context, propagate, trace +from opentelemetry.propagators.textmap import CarrierT, Getter +from opentelemetry.semconv.trace import MessagingOperationValues +from opentelemetry.trace import SpanKind +from pika.spec import Basic, BasicProperties + +from .common import add_span_attributes, add_span_eiffel_attributes + +_LOG = logging.getLogger(__name__) + + +class _Getter(Getter[CarrierT]): # type: ignore + """Getter for receiving a key from amqp headers.""" + + def get(self, carrier: CarrierT, key: str) -> Optional[list[str]]: + """Get a key from headers.""" + value = carrier.get(key, None) + if value is None: + return None + return [value] + + def keys(self, carrier: CarrierT) -> list[str]: + """Return an empy list of keys.""" + return [] + + +_GETTER = _Getter() + + +class TracingRabbitMQSubscriber(RabbitMQSubscriber): + """Custom RabbitMQ subscriber that gets otel trace information to headers.""" + + def __init__(self, *args, **kwargs): + """Get a trace.""" + # Must import this here, otherwise there would be a cyclic import problem. + # pylint:disable=cyclic-import,import-outside-toplevel + from etos_lib import __version__ + + super().__init__(*args, **kwargs) + self.tracer = trace.get_tracer( + __name__, + __version__, + schema_url="https://opentelemetry.io/schemas/1.11.0", + ) + + def _on_message( + self, _, method: Basic.Deliver, properties: BasicProperties, body: bytes + ) -> None: + """On message callback. Called on each message. Will block if no place in queue. + + For each message attempt to acquire the `threading.Semaphore`. The semaphore + size is `max_threads` + `max_queue`. This is to limit the amount of threads + in the queue, waiting to be processed. + For each message apply them async to a `ThreadPool` with size=`max_threads`. + + :param method: Pika basic deliver object. + :param properties: Pika basic properties object. + :param body: Message body. + """ + self._RabbitMQSubscriber__workers.acquire() # pylint:disable=no-member + delivery_tag = method.delivery_tag + error_callback = functools.partial(self.callback_error, delivery_tag) + result_callback = functools.partial(self.callback_results, delivery_tag) + self._RabbitMQSubscriber__thread_pool.apply_async( # pylint:disable=no-member + self._tracer_call, + args=(body, method, properties), + callback=result_callback, + error_callback=error_callback, + ) + + def _tracer_call( + self, body: bytes, method: Basic.Deliver, properties: BasicProperties + ) -> tuple[bool, bool]: + """Tracing callback for the custom subscriber that extracts a trace from amq headers.""" + if not properties: + properties = BasicProperties(headers={}) + if properties.headers is None: + properties.headers = {} + ctx = propagate.extract(properties.headers, getter=_GETTER) + if not ctx: + ctx = context.get_current() + token = context.attach(ctx) + + task_name = f"{method.exchange if method.exchange else self.routing_key} receive" + span = self.tracer.start_span( + name=task_name, + kind=SpanKind.CONSUMER, + ) + if span.is_recording(): + add_span_attributes( + span, + self._channel, + properties, + self.routing_key, + MessagingOperationValues.RECEIVE, + self.queue, + ) + try: + event = self._event(body) + except: # pylint:disable=bare-except + with trace.use_span(span, end_on_exit=True) as span: + raise + if span.is_recording(): + add_span_eiffel_attributes(span, event) + try: + with trace.use_span(span, end_on_exit=True): + response = self._event_call(event) + finally: + context.detach(token) + return response + + def _event_call(self, event: EiffelBaseEvent) -> tuple[bool, bool]: + """Call followers and subscribers of an event.""" + try: + ack = self._call_subscribers(event.meta.type, event) + self._call_followers(event) + except: # noqa, pylint:disable=bare-except + _LOG.error( + "Caught exception while processing subscriber " + "callbacks, some callbacks may not have been called: %s", + traceback.format_exc(), + ) + ack = False + return ack, True # Requeue only if ack is False. + + def _event(self, body: bytes) -> EiffelBaseEvent: + """Rebuild event.""" + # pylint:disable=broad-exception-raised + try: + json_data = json.loads(body.decode("utf-8")) + except (json.decoder.JSONDecodeError, UnicodeDecodeError) as err: + raise Exception( + f"Unable to deserialize message body ({err}), rejecting: {body!r}" + ) from err + try: + meta_type = json_data.get("meta", {}).get("type") + event = getattr(eiffellib.events, meta_type)(json_data.get("meta", {}).get("version")) + except (AttributeError, TypeError) as err: + raise Exception(f"Malformed message. Rejecting: {json_data!r}") from err + try: + event.rebuild(json_data) + except Exception as err: # pylint:disable=broad-except + raise Exception(f"Unable to deserialize message ({err}): {json_data!r}") from err + return event diff --git a/src/etos_lib/etos.py b/src/etos_lib/etos.py index b50d898..fdc052b 100644 --- a/src/etos_lib/etos.py +++ b/src/etos_lib/etos.py @@ -14,22 +14,22 @@ # See the License for the specific language governing permissions and # limitations under the License. """ETOS Library module.""" -from eiffellib.subscribers import RabbitMQSubscriber -from eiffellib.publishers import RabbitMQPublisher +from .eiffel.publisher import TracingRabbitMQPublisher as RabbitMQPublisher +from .eiffel.subscriber import TracingRabbitMQSubscriber as RabbitMQSubscriber +from .graphql.query_handler import GraphQLQueryHandler from .lib.config import Config -from .lib.events import Events -from .lib.monitor import Monitor -from .lib.utils import Utils -from .lib.http import Http -from .lib.debug import Debug -from .lib.feature_flags import FeatureFlags from .lib.database import Database +from .lib.debug import Debug +from .lib.events import Events from .lib.exceptions import ( PublisherConfigurationMissing, - SubscriberConfigurationMissing, PublisherNotStarted, + SubscriberConfigurationMissing, ) -from .graphql.query_handler import GraphQLQueryHandler +from .lib.feature_flags import FeatureFlags +from .lib.http import Http +from .lib.monitor import Monitor +from .lib.utils import Utils class ETOS: # pylint: disable=too-many-instance-attributes diff --git a/src/etos_lib/kubernetes/base.py b/src/etos_lib/kubernetes/base.py index d2aca61..87b44f3 100644 --- a/src/etos_lib/kubernetes/base.py +++ b/src/etos_lib/kubernetes/base.py @@ -27,9 +27,7 @@ class Kubernetes: __core = None __apps = None - def __init__( - self, namespace=os.getenv("ETOS_NAMESPACE"), context=None, in_cluster=True - ): + def __init__(self, namespace=os.getenv("ETOS_NAMESPACE"), context=None, in_cluster=True): """Initialize kubernetes library and load kubernetes configuration. :param namespace: Which namespace to operate in. diff --git a/src/etos_lib/kubernetes/jobs.py b/src/etos_lib/kubernetes/jobs.py index e2b92d3..633e830 100644 --- a/src/etos_lib/kubernetes/jobs.py +++ b/src/etos_lib/kubernetes/jobs.py @@ -44,9 +44,7 @@ def wait_for_job_started(self, job_name, timeout=300): """ timeout = time.time() + timeout while time.time() < timeout: - response = self.batch_v1.read_namespaced_job_status( - job_name, self.namespace - ) + response = self.batch_v1.read_namespaced_job_status(job_name, self.namespace) status = response.status if status.active is not None: print(f"Started at: {status.start_time}") @@ -68,9 +66,7 @@ def wait_for_job_finished(self, job_name, timeout=3600 * 10): timeout = time.time() + timeout result = False while time.time() < timeout: - response = self.batch_v1.read_namespaced_job_status( - job_name, self.namespace - ) + response = self.batch_v1.read_namespaced_job_status(job_name, self.namespace) status = response.status # pylint:disable=no-else-break if status.failed is not None: diff --git a/src/etos_lib/lib/debug.py b/src/etos_lib/lib/debug.py index b5a6286..2992973 100644 --- a/src/etos_lib/lib/debug.py +++ b/src/etos_lib/lib/debug.py @@ -25,12 +25,8 @@ class Debug: """Debug flags for ETOS.""" - __events_published = deque( - maxlen=int(os.getenv("ETOS_PUBLISHED_EVENT_HISTORY_SIZE", "100")) - ) - __events_received = deque( - maxlen=int(os.getenv("ETOS_RECEIVED_EVENT_HISTORY_SIZE", "100")) - ) + __events_published = deque(maxlen=int(os.getenv("ETOS_PUBLISHED_EVENT_HISTORY_SIZE", "100"))) + __events_received = deque(maxlen=int(os.getenv("ETOS_RECEIVED_EVENT_HISTORY_SIZE", "100"))) @property def default_log_path(self): diff --git a/src/etos_lib/lib/events.py b/src/etos_lib/lib/events.py index 6c63457..1856065 100644 --- a/src/etos_lib/lib/events.py +++ b/src/etos_lib/lib/events.py @@ -162,9 +162,7 @@ def send_environment_defined(self, name, links=None, **optional): and optional.get("host") is None and optional.get("uri") is None ): - raise ValueError( - "At least one of 'host', 'image' or 'uri' must be provided" - ) + raise ValueError("At least one of 'host', 'image' or 'uri' must be provided") links = links if links is not None else {} data = {"name": name} data.update(**optional) @@ -204,9 +202,7 @@ def send_test_suite_finished(self, test_suite, links=None, **optional): data = optional return self.send(EiffelTestSuiteFinishedEvent(), links, data) - def send_announcement_published( - self, heading, body, severity, links=None, **optional - ): + def send_announcement_published(self, heading, body, severity, links=None, **optional): """Publish an announcement event. https://github.com/eiffel-community/eiffel/blob/master/eiffel-vocabulary/EiffelAnnouncementPublishedEvent.md @@ -242,9 +238,7 @@ def send_test_execution_recipe_collection_created( :type optional: dict """ if optional.get("batches") is None and optional.get("batchesUri") is None: - raise ValueError( - "At least one of 'batches' or 'batchesUri' must be provided" - ) + raise ValueError("At least one of 'batches' or 'batchesUri' must be provided") links = links if links is not None else {} data = {"selectionStrategy": selection_strategy} data.update(**optional) @@ -343,9 +337,7 @@ def send_artifact_created_event(self, identity, links=None, **optional): data.update(**optional) return self.send(EiffelArtifactCreatedEvent(), links, data) - def send_artifact_published_event( - self, locations, artifact, links=None, **optional - ): + def send_artifact_published_event(self, locations, artifact, links=None, **optional): """Publish an artifact created event. https://github.com/eiffel-community/eiffel/blob/master/eiffel-vocabulary/EiffelArtifactPublishedEvent.md diff --git a/src/etos_lib/lib/utils.py b/src/etos_lib/lib/utils.py index b3b2e12..b73fb1b 100644 --- a/src/etos_lib/lib/utils.py +++ b/src/etos_lib/lib/utils.py @@ -234,9 +234,7 @@ def call( :rtype: tuple """ out = [] - for _, line in self.iterable_call( - cmd, shell, env, executable, output, wait_output - ): + for _, line in self.iterable_call(cmd, shell, env, executable, output, wait_output): if isinstance(line, str): out.append(line) else: diff --git a/src/etos_lib/logging/formatter.py b/src/etos_lib/logging/formatter.py index b776231..b5efcd9 100644 --- a/src/etos_lib/logging/formatter.py +++ b/src/etos_lib/logging/formatter.py @@ -98,6 +98,4 @@ def formatTime(self, record, datefmt=None): # Make Logstash's @timestamp parser happy by including a "T" # between the date and the time. Append 'Z' to make it clear # that the timestamp is UTC. - return datetime.datetime.utcfromtimestamp(record.created).strftime( - "%Y-%m-%dT%H:%M:%S.%fZ" - ) + return datetime.datetime.utcfromtimestamp(record.created).strftime("%Y-%m-%dT%H:%M:%S.%fZ") diff --git a/src/etos_lib/logging/log_publisher.py b/src/etos_lib/logging/log_publisher.py index d5edea0..b7089d3 100644 --- a/src/etos_lib/logging/log_publisher.py +++ b/src/etos_lib/logging/log_publisher.py @@ -33,9 +33,7 @@ def send_event(self, event, block=True, routing_key="#"): self.wait_start() while self._channel is None or not self._channel.is_open: time.sleep(0.1) - properties = pika.BasicProperties( - content_type="application/json", delivery_mode=2 - ) + properties = pika.BasicProperties(content_type="application/json", delivery_mode=2) if not isinstance(event, str): event = json.dumps(event) diff --git a/src/etos_lib/logging/logger.py b/src/etos_lib/logging/logger.py index dfb98bc..ff81988 100644 --- a/src/etos_lib/logging/logger.py +++ b/src/etos_lib/logging/logger.py @@ -136,9 +136,7 @@ def setup_rabbitmq_logging(log_filter): logging.getLogger("eiffellib.publishers.rabbitmq_publisher").propagate = False logging.getLogger("base_rabbitmq").propagate = False - rabbitmq = RabbitMQLogPublisher( - **Config().etos_rabbitmq_publisher_data(), routing_key=None - ) + rabbitmq = RabbitMQLogPublisher(**Config().etos_rabbitmq_publisher_data(), routing_key=None) if Debug().enable_sending_logs: rabbitmq.start() atexit.register(close_rabbit, rabbitmq) diff --git a/tox.ini b/tox.ini index c4e786a..57abff3 100644 --- a/tox.ini +++ b/tox.ini @@ -11,7 +11,7 @@ envlist = py3,black,pylint,pydocstyle deps = black commands = - black --check --diff . + black --check --diff -l 100 . [testenv:pylint] deps =