From 53698aad3e762f737480a22ed631501fc4edd0f2 Mon Sep 17 00:00:00 2001 From: Adrien Bennatan Date: Wed, 27 Apr 2022 17:45:43 +0200 Subject: [PATCH] feat: add retries and retries tests --- .../instrumentation/remoulade/__init__.py | 17 +++++-- .../tests/test_messages.py | 48 ++++++++++++++++--- 2 files changed, 54 insertions(+), 11 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-remoulade/src/opentelemetry/instrumentation/remoulade/__init__.py b/instrumentation/opentelemetry-instrumentation-remoulade/src/opentelemetry/instrumentation/remoulade/__init__.py index e93a96057c..a28e7f8254 100644 --- a/instrumentation/opentelemetry-instrumentation-remoulade/src/opentelemetry/instrumentation/remoulade/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-remoulade/src/opentelemetry/instrumentation/remoulade/__init__.py @@ -26,10 +26,15 @@ def __init__(self, _tracer): def before_process_message(self, _broker, message): trace_ctx = extract(message.options["trace_ctx"]) - operation_name = "remoulade/process" + retry_count = message.options.get("retries") + + operation_name = "remoulade/process" if retry_count is None else f"remoulade/process(retry-{retry_count})" span = self._tracer.start_span(operation_name, kind=trace.SpanKind.CONSUMER, context=trace_ctx) + if retry_count is not None: + span.set_attribute("retry_count", retry_count) + activation = trace.use_span(span, end_on_exit=True) activation.__enter__() @@ -44,8 +49,6 @@ def after_process_message(self, _broker, message, *, result=None, exception=None if span.is_recording(): span.set_attribute(_MESSAGE_TAG_KEY, _MESSAGE_RUN) - # utils.set_attributes_from_context(span, kwargs) - # utils.set_attributes_from_context(span, task.request) span.set_attribute(_MESSAGE_NAME_KEY, message.actor_name) pass @@ -53,15 +56,19 @@ def after_process_message(self, _broker, message, *, result=None, exception=None utils.detach_span(self._span_registry, message.message_id) def before_enqueue(self, _broker, message, delay): - operation_name = "remoulade/send" + retry_count = message.options.get("retries") + + operation_name = "remoulade/send" if retry_count is None else f"remoulade/send(retry-{retry_count})" span = self._tracer.start_span(operation_name, kind=trace.SpanKind.PRODUCER) + if retry_count is not None: + span.set_attribute("retry_count", retry_count) + if span.is_recording(): span.set_attribute(_MESSAGE_TAG_KEY, _MESSAGE_SEND) span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, message.message_id) span.set_attribute(_MESSAGE_NAME_KEY, message.actor_name) - # utils.set_attributes_from_context(span, kwargs) pass activation = trace.use_span(span, end_on_exit=True) diff --git a/instrumentation/opentelemetry-instrumentation-remoulade/tests/test_messages.py b/instrumentation/opentelemetry-instrumentation-remoulade/tests/test_messages.py index 05c739d9b5..21d8a36f5b 100644 --- a/instrumentation/opentelemetry-instrumentation-remoulade/tests/test_messages.py +++ b/instrumentation/opentelemetry-instrumentation-remoulade/tests/test_messages.py @@ -6,9 +6,9 @@ from opentelemetry.semconv.trace import SpanAttributes -@remoulade.actor -def actor_multiply(x, y): - return x * y +@remoulade.actor(max_retries=3) +def actor_div(x, y): + return x / y class TestRemouladeInstrumentation(TestBase): @@ -20,10 +20,10 @@ def setUp(self): remoulade.set_broker(broker) RemouladeInstrumentor().instrument() - broker.declare_actor(actor_multiply) + broker.declare_actor(actor_div) def test_message(self): - actor_multiply.send(1, 2) + actor_div.send(2, 3) spans = self.sorted_spans(self.memory_exporter.get_finished_spans()) self.assertEqual(len(spans), 2) @@ -52,4 +52,40 @@ def test_message(self): self.assertNotEqual(consumer.parent, producer.context) self.assertEqual(consumer.parent.span_id, producer.context.span_id) - self.assertEqual(consumer.context.trace_id, producer.context.trace_id) \ No newline at end of file + self.assertEqual(consumer.context.trace_id, producer.context.trace_id) + + def test_retries(self): + try: + actor_div.send(1, 0) + except ZeroDivisionError: + pass + + spans = self.sorted_spans(self.memory_exporter.get_finished_spans()) + self.assertEqual(len(spans), 8) + + consumer_spans = spans[::2] + producer_spans = spans[1::2] + + self.assertEqual(consumer_spans[0].name, "remoulade/process(retry-3)") + self.assertSpanHasAttributes( + consumer_spans[0], + { "retry_count": 3 } + ) + self.assertEqual(consumer_spans[1].name, "remoulade/process(retry-2)") + self.assertSpanHasAttributes( + consumer_spans[1], + {"retry_count": 2} + ) + self.assertEqual(consumer_spans[3].name, "remoulade/process") + + self.assertEqual(producer_spans[0].name, "remoulade/send(retry-3)") + self.assertSpanHasAttributes( + producer_spans[0], + {"retry_count": 3} + ) + self.assertEqual(producer_spans[1].name, "remoulade/send(retry-2)") + self.assertSpanHasAttributes( + producer_spans[1], + {"retry_count": 2} + ) + self.assertEqual(producer_spans[3].name, "remoulade/send")