diff --git a/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py b/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py index 3cebdc59ca..9495f38896 100644 --- a/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py @@ -195,6 +195,7 @@ def _build_span_meta_data_for_pipeline(instance, sanitize_query): return command_stack, resource, span_name +# pylint: disable=R0915 def _instrument( tracer, request_hook: _RequestHookT = None, diff --git a/instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py b/instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py index a7e3ca7885..35cf3ac215 100644 --- a/instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py +++ b/instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py @@ -33,9 +33,9 @@ def __init__(self): self.mock = mock.Mock() async def __call__(self, *args, **kwargs): - f = asyncio.Future() - f.set_result("random") - return f + future = asyncio.Future() + future.set_result("random") + return future def __getattr__(self, item): return AsyncMock() diff --git a/tests/opentelemetry-docker-tests/tests/redis/test_redis_functional.py b/tests/opentelemetry-docker-tests/tests/redis/test_redis_functional.py index 675a37fa9f..bbd7b17e2c 100644 --- a/tests/opentelemetry-docker-tests/tests/redis/test_redis_functional.py +++ b/tests/opentelemetry-docker-tests/tests/redis/test_redis_functional.py @@ -13,6 +13,7 @@ # limitations under the License. import asyncio +from time import time_ns import redis import redis.asyncio @@ -326,6 +327,29 @@ def test_basics(self): ) self.assertEqual(span.attributes.get("db.redis.args_length"), 2) + def test_execute_command_traced_full_time(self): + """Command should be traced for coroutine execution time, not creation time.""" + coro_created_time = None + finish_time = None + + async def pipeline_simple(): + nonlocal coro_created_time + nonlocal finish_time + + # delay coroutine creation from coroutine execution + coro = self.redis_client.get("foo") + coro_created_time = time_ns() + await coro + finish_time = time_ns() + + async_call(pipeline_simple()) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertTrue(span.start_time > coro_created_time) + self.assertTrue(span.end_time < finish_time) + def test_pipeline_traced(self): async def pipeline_simple(): async with self.redis_client.pipeline( @@ -348,6 +372,35 @@ async def pipeline_simple(): ) self.assertEqual(span.attributes.get("db.redis.pipeline_length"), 3) + def test_pipeline_traced_full_time(self): + """Command should be traced for coroutine execution time, not creation time.""" + coro_created_time = None + finish_time = None + + async def pipeline_simple(): + async with self.redis_client.pipeline( + transaction=False + ) as pipeline: + nonlocal coro_created_time + nonlocal finish_time + pipeline.set("blah", 32) + pipeline.rpush("foo", "éé") + pipeline.hgetall("xxx") + + # delay coroutine creation from coroutine execution + coro = pipeline.execute() + coro_created_time = time_ns() + await coro + finish_time = time_ns() + + async_call(pipeline_simple()) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertTrue(span.start_time > coro_created_time) + self.assertTrue(span.end_time < finish_time) + def test_pipeline_immediate(self): async def pipeline_immediate(): async with self.redis_client.pipeline() as pipeline: @@ -367,6 +420,33 @@ async def pipeline_immediate(): span.attributes.get(SpanAttributes.DB_STATEMENT), "SET b 2" ) + def test_pipeline_immediate_traced_full_time(self): + """Command should be traced for coroutine execution time, not creation time.""" + coro_created_time = None + finish_time = None + + async def pipeline_simple(): + async with self.redis_client.pipeline( + transaction=False + ) as pipeline: + nonlocal coro_created_time + nonlocal finish_time + pipeline.set("a", 1) + + # delay coroutine creation from coroutine execution + coro = pipeline.immediate_execute_command("SET", "b", 2) + coro_created_time = time_ns() + await coro + finish_time = time_ns() + + async_call(pipeline_simple()) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertTrue(span.start_time > coro_created_time) + self.assertTrue(span.end_time < finish_time) + def test_parent(self): """Ensure OpenTelemetry works with redis.""" ot_tracer = trace.get_tracer("redis_svc") @@ -416,6 +496,29 @@ def test_basics(self): ) self.assertEqual(span.attributes.get("db.redis.args_length"), 2) + def test_execute_command_traced_full_time(self): + """Command should be traced for coroutine execution time, not creation time.""" + coro_created_time = None + finish_time = None + + async def pipeline_simple(): + nonlocal coro_created_time + nonlocal finish_time + + # delay coroutine creation from coroutine execution + coro = self.redis_client.get("foo") + coro_created_time = time_ns() + await coro + finish_time = time_ns() + + async_call(pipeline_simple()) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertTrue(span.start_time > coro_created_time) + self.assertTrue(span.end_time < finish_time) + def test_pipeline_traced(self): async def pipeline_simple(): async with self.redis_client.pipeline( @@ -438,6 +541,35 @@ async def pipeline_simple(): ) self.assertEqual(span.attributes.get("db.redis.pipeline_length"), 3) + def test_pipeline_traced_full_time(self): + """Command should be traced for coroutine execution time, not creation time.""" + coro_created_time = None + finish_time = None + + async def pipeline_simple(): + async with self.redis_client.pipeline( + transaction=False + ) as pipeline: + nonlocal coro_created_time + nonlocal finish_time + pipeline.set("blah", 32) + pipeline.rpush("foo", "éé") + pipeline.hgetall("xxx") + + # delay coroutine creation from coroutine execution + coro = pipeline.execute() + coro_created_time = time_ns() + await coro + finish_time = time_ns() + + async_call(pipeline_simple()) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertTrue(span.start_time > coro_created_time) + self.assertTrue(span.end_time < finish_time) + def test_parent(self): """Ensure OpenTelemetry works with redis.""" ot_tracer = trace.get_tracer("redis_svc")