-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Integration is broken for schedulers #3
Comments
The removal of importing application also causes
And then causes My current work-around is to pass the full module name containing tasks to the worker cmd args (modules), since I have only a few modules include tasks. I tried
I believe this is because I'm using Python 3.11 in my project, so I didn't go deeper. I'm fine with the current work-around, but it'd be great if we could come out some solutions for these problems with the compatibility of multi-broker. If you have some direction or ideas but don't have the time to research it, I'd like to contribute. |
Hi, @nicognaW. Relative imports doesn't work, because taskiq imports your modules as is and has no propper python-path for each tasks file. Consider removing all relative imports from your projects. |
Hi, I confirmed that I'm not using any relative imports in my own code, it's possible that one of my dependencies does though. Today when I cloned the project on another macOS machine, the error didn't occur, and everything went fine. Not sure if relevant, there's one more differences between the Windows and macOS environment is that I use Anyway, I personally prefer using the |
Another side-effect with |
That's really weird. At first I decided to separate tasks from the project and import them using modules as arguments to taskiq. I created a project and defined a task using async_shared_broker. from taskiq import async_shared_broker
@async_shared_broker.task
async def my_task(a: int) -> None:
print("lol", a) And then I created another project which depepnds on the first one. from taskiq import async_shared_broker
from taskiq_redis import ListQueueBroker
broker = ListQueueBroker("redis://localhost")
async_shared_broker.default_broker(broker) And finally, I created a import asyncio
from taskslib.tasks import my_task
from tkqtest.tkq import broker
async def main():
await broker.startup()
await my_task.kiq(a=1)
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main()) I started worker with this command: taskiq worker tkqtest.tkq:broker taskslib.tasks And it worked out. I tried setting |
I'm sorry if I missexpressed anything, but the import issue is with Manually specify the task modules like you did:
didn't produce the error. And reviewing my stacktraces, I realized that one of my dependencies So overall it's not an issue for separating tasks from the project, it's just |
ProblemI was running my FastAPI project with taskiq integrated (generated from fastapi_template). And I added a taskiq-scheduler instance like this, but it failed to send tasks (like nothing happened). # api_service/deploy/docker-compose.yml
api: &main_app
build:
context: .
dockerfile: ./deploy/Dockerfile
target: prod
image: api_service:${API_SERVICE_VERSION:-latest}
restart: always
env_file:
- .env
networks:
- default
taskiq-worker:
<<: *main_app
labels: []
command:
- taskiq
- worker
- api_service.tkq:broker
- --fs-discover
- --tasks-pattern
- "schedule_*.py"
taskiq-scheduler:
<<: *main_app
labels: []
command:
- taskiq
- scheduler
- api_service.tkq:scheduler
- --fs-discover
- --tasks-pattern
- "schedule_*.py"
depends_on:
api:
condition: service_started
taskiq-worker:
condition: service_started
# ... SolutionI observed that it cannot detect my scheduled tasks in this way. After that, I tried two experiments and SUCCESSFULLY made the scheduler active:
# api_service/api_service/services/scheduler/__main__.py
import asyncio
from taskiq import TaskiqScheduler
from taskiq.cli.scheduler.args import SchedulerArgs
from taskiq.cli.scheduler.run import run_scheduler
from taskiq.schedule_sources import LabelScheduleSource
from api_service.tkq import broker
scheduler_tkq = TaskiqScheduler(
broker=broker,
sources=[LabelScheduleSource(broker)],
refresh_delay=2,
)
async def main() -> None:
"""Initialize Taskiq scheduler."""
await run_scheduler(
SchedulerArgs(
scheduler=scheduler_tkq,
modules=[],
fs_discover=True,
tasks_pattern="schedule_*.py",
),
)
if __name__ == "__main__":
asyncio.run(main()) # api_service/deploy/docker-compose.yml
# ...
taskiq-scheduler:
image: api_service:${API_SERVICE_VERSION:-latest}
restart: "always"
command:
- /usr/local/bin/python
- -m
- api_service.services.scheduler
volumes:
# Adds current directory as volume.
- .:/app/src/
depends_on:
api:
condition: service_started
taskiq-worker:
condition: service_started
# ... Packages version:
|
@haophancs, seems like the problem is with imports, not sure what was the initial problem. Can you provide a little bit more context? Debug logs for example and how you define the scheduler in your tkq module? |
@s3rius We can see that in the beginning, I set the taskiq scheduler api_service.tkq:scheduler --fs-discover --tasks-pattern "schedule_*.py" It just shows two lines of initial logs:
And nothing else, even when it's time to do the task determined by cron. Therefore I feel the scheduler cannot detect the scheduled tasks. So I tried two solutions above and it works (I see the log line about sending task). This is my # api_service/api_service/tkq.py
import taskiq_fastapi
from taskiq import InMemoryBroker
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend
from api_service.settings import settings
result_backend = RedisAsyncResultBackend(
redis_url=str(settings.redis_url.with_path("/1")),
)
broker = ListQueueBroker(
str(settings.redis_url.with_path("/1")),
).with_result_backend(result_backend)
if settings.environment.lower() == "pytest":
broker = InMemoryBroker()
taskiq_fastapi.init(
broker,
"api_service.app.application:get_app",
)
# if we define a separate module for the scheduler as in my second solution,
# the scheduler will not be here but the __main__.py file of the module.
scheduler = TaskiqScheduler(
broker=broker,
sources=[LabelScheduleSource(broker)],
refresh_delay=settings.scheduler_refresh_delay,
) And the task file: # api_service/api_service/services/scheduler/schedules/schedule_example.py
from api_service.tkq import broker
@broker.task(schedule=[{"cron": "* * * * *"}])
async def example_task() -> None:
message = "this is a heavy task" In my project now I surely prefer my second solution: just define the separate module and give it a |
@s3rius Oh I got it, if wanna use the CLI command instead of the programmatically way, we need to declare the scheduler BEFORE calling Now the implementation below can make the scheduler done right. # api_service/api_service/tkq.py
import taskiq_fastapi
from taskiq import InMemoryBroker, TaskiqScheduler
from taskiq.schedule_sources import LabelScheduleSource
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend
from api_service.settings import settings
result_backend = RedisAsyncResultBackend(
redis_url=str(settings.redis_url.with_path("/1")),
)
broker = ListQueueBroker(
str(settings.redis_url.with_path("/1")),
).with_result_backend(result_backend)
if settings.environment.lower() == "pytest":
broker = InMemoryBroker()
# before calling taskiq_fastapi.init(...)
scheduler = TaskiqScheduler(
broker=broker,
sources=[LabelScheduleSource(broker)],
refresh_delay=settings.scheduler_refresh_delay,
)
taskiq_fastapi.init(
broker,
"api_service.app.application:get_app",
) And then just run an instance for it # api_service/deploy/docker-compose.yml
taskiq-scheduler:
<<: *main_app
labels: []
command:
- taskiq
- scheduler
- api_service.tkq:scheduler
- --fs-discover
- --tasks-pattern
- "schedule_*.py"
depends_on:
api:
condition: service_started
taskiq-worker:
condition: service_started |
Wow! Thanks for finding this out. Actually, i have no idea why this is happening, because by design there should be no such constraints as declaring scheduler before calling init. I'll take a look. At least it would be nice to note this behavior somewhere in docs. |
Just curios. Has the issue been triaged to the root cause? |
The issue is the pattern, it grabs third party packages tasks.py file and drops an exectio with it. After changed the file to tasks_something.py and added the |
When starting scheduler, it doesn't import application and doesn't set dependency context.
Original issue: taskiq-python/taskiq#142
The text was updated successfully, but these errors were encountered: