Skip to content

Commit

Permalink
[WIP] Fixing instrumentation for workflows (#16290)
Browse files Browse the repository at this point in the history
  • Loading branch information
logan-markewich authored Oct 3, 2024
1 parent c7b67e7 commit b45586b
Showing 1 changed file with 54 additions and 8 deletions.
62 changes: 54 additions & 8 deletions llama-index-core/llama_index/core/instrumentation/dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
from functools import partial
from contextlib import contextmanager
from contextvars import ContextVar, Token
from contextvars import Context, ContextVar, Token, copy_context
from typing import Any, Callable, Generator, List, Optional, Dict, Protocol
import inspect
import uuid
Expand Down Expand Up @@ -251,6 +253,10 @@ def wrapper(func: Callable, instance: Any, args: list, kwargs: dict) -> Any:
bound_args = inspect.signature(func).bind(*args, **kwargs)
id_ = f"{func.__qualname__}-{uuid.uuid4()}"
tags = active_instrument_tags.get()
result = None

# Copy the current context
context = copy_context()

token = active_span_id.set(id_)
parent_id = None if token.old_value is Token.MISSING else token.old_value
Expand All @@ -261,20 +267,60 @@ def wrapper(func: Callable, instance: Any, args: list, kwargs: dict) -> Any:
parent_id=parent_id,
tags=tags,
)

def handle_future_result(
future: asyncio.Future,
span_id: str,
bound_args: inspect.BoundArguments,
instance: Any,
context: Context,
) -> None:
try:
result = future.result()
self.span_exit(
id_=span_id,
bound_args=bound_args,
instance=instance,
result=result,
)
return result
except BaseException as e:
self.event(SpanDropEvent(span_id=span_id, err_str=str(e)))
self.span_drop(
id_=span_id, bound_args=bound_args, instance=instance, err=e
)
raise
finally:
context.run(active_span_id.reset, token)

try:
result = func(*args, **kwargs)
if isinstance(result, asyncio.Future):
# If the result is a Future, wrap it
new_future = asyncio.ensure_future(result)
new_future.add_done_callback(
partial(
handle_future_result,
span_id=id_,
bound_args=bound_args,
instance=instance,
context=context,
)
)
return new_future
else:
# For non-Future results, proceed as before
self.span_exit(
id_=id_, bound_args=bound_args, instance=instance, result=result
)
return result
except BaseException as e:
self.event(SpanDropEvent(span_id=id_, err_str=str(e)))
self.span_drop(id_=id_, bound_args=bound_args, instance=instance, err=e)
raise
else:
self.span_exit(
id_=id_, bound_args=bound_args, instance=instance, result=result
)
return result
finally:
# clean up
active_span_id.reset(token)
if not isinstance(result, asyncio.Future):
active_span_id.reset(token)

@wrapt.decorator
async def async_wrapper(
Expand Down

0 comments on commit b45586b

Please sign in to comment.