Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make InMemoryConnector Thread-Safe for Multi-Threaded FastAPI Use Cases #1320

Open
romainrey opened this issue Feb 6, 2025 · 3 comments · May be fixed by #1329
Open

Make InMemoryConnector Thread-Safe for Multi-Threaded FastAPI Use Cases #1320

romainrey opened this issue Feb 6, 2025 · 3 comments · May be fixed by #1329

Comments

@romainrey
Copy link

Description

When using procrastinate.testing.InMemoryConnector with FastAPI and synchronous routes, an issue arises due to thread-safety violations in asyncio operations. The in-memory connector, while useful for testing, assumes that all operations occur on the same event loop. However, when used in a FastAPI sync route, which executes in a separate worker thread, the connector’s notification system (_notify) tries to interact with an asyncio event loop running in a different thread, leading to a RuntimeError.

Context: Using Procrastinate with FastAPI Sync Routes

  • FastAPI sync routes are executed in a thread pool by default.
  • procrastinate.App and its worker are typically running on the main event loop.
  • When a sync route defers a task (task.defer()), procrastinate processes it asynchronously.
  • The InMemoryConnector calls _notify, which interacts with an asyncio.Event from the wrong thread.
  • Since uvloop (default event loop for FastAPI) is not thread-safe, this leads to a non-thread-safe operation error.

Error Message

When deferring a task from a synchronous route, the following error occurs:
RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one

Full traceback:

File "/path/to/procrastinate/testing.py", line 195, in _notify
    await self.on_notification(
File "/path/to/procrastinate/worker.py", line 379, in _handle_notification
    self._new_job_event.set()
File "uvloop/loop.pyx", line 1279, in uvloop.loop.Loop.call_soon
File "uvloop/loop.pyx", line 715, in uvloop.loop.Loop._check_thread
RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one

Cause

  1. The sync route runs in a separate thread from the main event loop.
  2. When task.defer() is called, it eventually triggers _notify in InMemoryConnector.
  3. _notify calls an async function from a different thread than the main event loop.
  4. uvloop (default FastAPI event loop) does not allow thread-unsafe operations.
  5. The RuntimeError occurs when _notify tries to await an event notification from the wrong thread.

Proposed Fix

I propose a thread-safe version of InMemoryConnector that:

  • Saves the event loop and thread ID at startup.
  • Ensures all notifications run on the correct event loop.
  • Uses asyncio.run_coroutine_threadsafe when necessary to safely execute _notify.
import asyncio
import json
import threading
import procrastinate
from procrastinate import testing


class InMemoryConnectorThreadSafe(testing.InMemoryConnector):
    async def open_async(
        self, pool: procrastinate.connector.Pool | None = None
    ) -> None:
        """
        Save the current event loop and its thread id so that later notifications
        can be scheduled on this loop.
        """
        self._loop = asyncio.get_running_loop()
        self._loop_thread_id = threading.get_ident()
        self.states.append("open_async")

    async def _notify(
        self, queue_name: str, notification: procrastinate.jobs.Notification
    ) -> None:
        """
        Instead of directly awaiting on_notification, we check the current thread.
        If we’re not on the same thread as the one where the loop was saved,
        we schedule the notification on the correct loop.
        """
        if not self.on_notification:
            return

        destination_channels = {
            "procrastinate_any_queue_v1",
            f"procrastinate_queue_v1#{queue_name}",
        }
        for channel in set(self.notify_channels).intersection(destination_channels):
            coro = self.on_notification(
                channel=channel, payload=json.dumps(notification)
            )
            if threading.get_ident() == self._loop_thread_id:
                # Already on the right thread: just await.
                await coro
            else:
                # Not on the correct thread: schedule the coroutine on the saved loop.
                future = asyncio.run_coroutine_threadsafe(coro, self._loop)
                # Wrap the concurrent.futures.Future so we can await it.
                await asyncio.wrap_future(future)

Proposed Integration

This could be merged into InMemoryConnector to improve its usability with multi-threaded FastAPI applications.
Suggested changes:
• Modify InMemoryConnector to include the _notify fix.
• Ensure proper testing for multi-threaded environments.

Why This Fix?

✅ Prevents non-thread-safe calls to the event loop.
✅ Makes InMemoryConnector compatible with FastAPI sync routes.
✅ Keeps the fix lightweight, only modifying the _notify behavior.
✅ Does not affect the existing usage of InMemoryConnector in single-threaded tests.

Final Thoughts

Thanks to the Procrastinate team for the awesome work on the library! This fix would make InMemoryConnector much more robust for testing with FastAPI, particularly for teams that mix sync and async routes.
Would love to hear your thoughts on integrating this fix! 🚀

@onlyann
Copy link
Contributor

onlyann commented Feb 7, 2025

Thank you so much for submitting an issue with such level of detail.

That looks great.

Would you like to submit a pull request?

Happy for the existing InMemoryConnector to be amended, unless someone can think of a reason not to.

@ewjoachim
Copy link
Member

ewjoachim commented Feb 16, 2025

No, I think that it's not worth having 2 InMemoryConnector, and if the existing one can be used in all situations, it's best. Why would someone wish to have the thread-unsafe connector anyway :) ?

Great writeup, feel free to let us know whether you want to do the PR or if you'd rather we do it.

@romainrey romainrey linked a pull request Feb 16, 2025 that will close this issue
9 tasks
@romainrey
Copy link
Author

Sounds good team. Just created the PR, let me know if that works for you :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants