Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error using Processor as context manager in async method: "popped unexpected object" #368

Closed
aiguofer opened this issue Jan 9, 2024 · 6 comments

Comments

@aiguofer
Copy link

aiguofer commented Jan 9, 2024

We have a FastAPI/Strawberry app that we ensure proper tracing using a custom Strawberry schema (there's a set of values we inject into every log using a Processor). We recently did some work to do some async work in the permissions handler, and now we're running into some errors:

  File "/usr/local/lib/python3.9/site-packages/strawberry/http/async_base_view.py", line 115, in execute_operation
    return await self.schema.execute(
  File "/usr/src/mfs/telemetry/schema.py", line 31, in execute
    return await super().execute(*args, **kwargs)
  File "/usr/local/lib/python3.9/contextlib.py", line 126, in __exit__
    next(self.gen)
  File "/usr/src/mfs/telemetry/context.py", line 49, in with_telemetry
    yield
  File "/usr/local/lib/python3.9/site-packages/logbook/_fallback.py", line 106, in __exit__
    self.pop_thread()
  File "/usr/local/lib/python3.9/site-packages/logbook/base.py", line 231, in pop_thread
    assert popped is self, 'popped unexpected object'
AssertionError: popped unexpected object

Our telemetry/schema.py file looks like:

import traceback
from typing import Any, Callable, List, Optional

import strawberry
from graphql import GraphQLError, GraphQLResolveInfo
from strawberry.extensions.tracing import DatadogTracingExtensionSync
from strawberry.types import ExecutionResult

from mfs.logger import GLOBAL_LOGGER as logger


class TrackingSchema(strawberry.Schema):
    """Extend a strawberry schema to ensure telemetry is properly configured."""

    def __init__(self, *args: Any, **kwargs: Any) -> None:  # noqa: D
        kwargs["extensions"] = [DatadogTracingExtension, *kwargs.get("extensions", [])]
        super().__init__(*args, **kwargs)

    def execute_sync(self, *args: Any, **kwargs: Any) -> ExecutionResult:  # noqa: D
        context = kwargs.get("context_value")
        if context:
            with context.telemetry.with_telemetry():
                return super().execute_sync(*args, **kwargs)
        else:
            return super().execute_sync(*args, **kwargs)

    async def execute(self, *args: Any, **kwargs: Any) -> ExecutionResult:  # noqa: D
        context = kwargs.get("context_value")
        if context:
            with context.telemetry.with_telemetry():
                return await super().execute(*args, **kwargs)
        else:
            return await super().execute(*args, **kwargs)

    def process_errors(
        self,
        errors: List[GraphQLError],
        execution_context: Optional[strawberry.types.ExecutionContext] = None,
    ) -> None:
        """Add errors to the root span customize error log with our app logger.

        It seems this function is called outside of the span context, so we must
        pass it in explicitly to ensure it gets associated with the trace.

        See: https://strawberry.rocks/docs/types/schema#handling-execution-errors
        """
        if execution_context:
            telemetry = execution_context.context.telemetry
            context = telemetry.root_span.context if telemetry.root_span is not None else None
            with telemetry.with_telemetry(context):
                for error in errors:
                    e = error.original_error
                    stacktrace = traceback.format_exception(type(e), e, e.__traceback__) if e else None
                    telemetry.set("graphql.error", error)

                    logger.warn(
                        "Graphql error",
                        extra={
                            "original": error.original_error,
                            "stacktrace": stacktrace,
                        },
                    )


class DatadogTracingExtension(DatadogTracingExtensionSync):
    """Extend DataDogTracingExtension to add the name of the query or mutation to the root span.

    See: https://strawberry.rocks/docs/extensions/datadog#extending-the-extension
    """

    def resolve(  # noqa: D
        self, _next: Callable, root: Any, info: GraphQLResolveInfo, *args: str, **kwargs: Any
    ) -> Any:
        telemetry = info.context.telemetry
        # TODO: is this logic correct? what if there's multiple queries or mutations in the same request?
        if info.parent_type.name in ["Mutation", "Query"]:
            field_path = f"{info.parent_type}.{info.field_name}"
            telemetry.set("graphql.field_name", info.field_name)
            telemetry.set("graphql.parent_type", info.parent_type.name)
            telemetry.set("graphql.field_path", field_path)
            telemetry.set("graphql.path", ".".join(map(str, info.path.as_list())))
        return super().resolve(_next, root, info, *args, **kwargs)

And our context class looks like:

class TelemetryContext:
    """This class holds all telemetry context for a request and injects it to span tags and logs.

    When a key value pair are added to the context, they get added to both log extra and
    as tags on the root level span.

    The purpose of this class is to ensure we have as much info as necessary associated
    with each request to enable easier debugging and datadog reporting.
    """

    def __init__(self) -> None:  # noqa: D
        self._context_map: Dict[str, Any] = {}
        self.root_span = tracer.current_root_span()
        self._processor = logbook.Processor(self._inject_logging_context)
        self.add("request_id", uuid4())
        self.add("domain", "semantic_layer")

    def add(self, key: str, value: Any, span: Optional[Span] = None) -> None:
        """Add a key value pair to the root span and logging context.

        If the optional span is passed it, they'll be added to that span instead.
        """
        span = span or self.root_span
        if span:
            span.set_tag(key, value)
        self._context_map[key] = value

    @contextmanager
    def with_telemetry(self, trace_context: Optional[Context] = None) -> Iterator:
        """Context manager to ensure the telemetry context is added to all logs.

        If the optional trace_context is passed, we first activate it to ensure
        logs get correctly associted with the right trace. This is useful for jobs
        running on a separate thread or for functions that get called outside the
        span context like Schema.process_errors.
        """
        if trace_context:
            tracer.context_provider.activate(trace_context)
        with self._processor:
            yield

    def _inject_logging_context(self, record: logbook.LogRecord) -> None:
        record.extra.update(self._context_map)

My guess is that there's some issues going on with the async execute and the context manager from the Processor. I'm not super familiar with async coroutines in Python, but should Processor also implement __aenter__ and __aexit__ so we can have an async with for the async execute?

@RazerM
Copy link
Collaborator

RazerM commented Jan 14, 2024

Could you try to make a minimal, reproducible example?

Some thoughts:

  1. adding some logging to every pop and push method of ContextStackManager may help show what's going on
  2. if popped isn't self, what is it

@aiguofer
Copy link
Author

aiguofer commented Jan 15, 2024

@RazerM Made a minimal reproducible example: https://github.com/aiguofer/async_logbook_error_example/

I'll play around with some logging to see what happens!

@aiguofer
Copy link
Author

Hmm ok adding some logging and comparing results when using async has_permission and:

class IsAuthorized(BasePermission):
    def has_permission(self, source, info, **kwargs):
        sleep(1)
        return True

I see that the non-async version handles all requests independently with no concurrency in the worker, while the async version handles all the permission checks concurrently, so everything gets pushed first before anything gets popped.

Async:

INFO:     Uvicorn running on http://0.0.0.0:8001 (Press CTRL+C to quit)
Push thread (0, <logbook.base.Processor object at 0x10663d0d0>)
Push thread (1, <logbook.base.Processor object at 0x1066654f0>)
Push thread (2, <logbook.base.Processor object at 0x10667a370>)
Push thread (3, <logbook.base.Processor object at 0x10667e7f0>)
Push thread (4, <logbook.base.Processor object at 0x10667a9d0>)
Push thread (5, <logbook.base.Processor object at 0x1066869a0>)
Push thread (6, <logbook.base.Processor object at 0x10667a7f0>)
Push thread (7, <logbook.base.Processor object at 0x106698460>)
Push thread (8, <logbook.base.Processor object at 0x10667aa90>)
Push thread (9, <logbook.base.Processor object at 0x1066a4a60>)
Pop thread <logbook.base.Processor object at 0x1066a4a60>
INFO:     127.0.0.1:59786 - "POST /graphql HTTP/1.1" 500 Internal Server Error
ERROR:    Exception in ASGI application

Sync:

INFO:     Uvicorn running on http://0.0.0.0:8001 (Press CTRL+C to quit)
Push thread (0, <logbook.base.Processor object at 0x119a2a190>)
Pop thread <logbook.base.Processor object at 0x119a2a190>
INFO:     127.0.0.1:59822 - "POST /graphql HTTP/1.1" 200 OK
Push thread (1, <logbook.base.Processor object at 0x119a2a130>)
Pop thread <logbook.base.Processor object at 0x119a2a130>
INFO:     127.0.0.1:59823 - "POST /graphql HTTP/1.1" 200 OK
Push thread (2, <logbook.base.Processor object at 0x119a2a040>)
Pop thread <logbook.base.Processor object at 0x119a2a040>
...

I take it the logbook Processor just doesn't play well while async if each concurrent worker needs to have a different Processor?

@RazerM
Copy link
Collaborator

RazerM commented Jan 15, 2024

I think you need to do

with logbook.Processor(self._inject_logging_context).contextbound():
    ...

@aiguofer
Copy link
Author

ohhhh that did it!!! thanks so much!

It'd be great to have a breakdown of the different stacks (greenlet, async context, thread, and application) in https://logbook.readthedocs.io/en/stable/stacks.html

@RazerM
Copy link
Collaborator

RazerM commented Jan 17, 2024

@aiguofer I think we can make the default context manager "just work" and remove the myriad ways of pushing context: #370

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants