Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incorrect queued duration for deferred tasks in gantt view #35288

Closed
2 tasks done
tirkarthi opened this issue Oct 31, 2023 · 1 comment · Fixed by #35984
Closed
2 tasks done

Incorrect queued duration for deferred tasks in gantt view #35288

tirkarthi opened this issue Oct 31, 2023 · 1 comment · Fixed by #35984
Assignees
Labels
area:core kind:bug This is a clearly a bug

Comments

@tirkarthi
Copy link
Contributor

tirkarthi commented Oct 31, 2023

Apache Airflow version

main (development)

What happened

Gantt view calculates the diff between start date and queued at values to show queued duration. In case of deferred tasks that tasks get re-queued when the triggerer returns an event causing queued at to be greater than start date. This causes incorrect values to be shown in the UI. I am not sure how to fix this. Maybe queued duration can be not shown on the tooltip when queued time is greater than start time.

Screenshot from 2023-10-31 09-15-54

What you think should happen instead

No response

How to reproduce

  1. Trigger the below dag
  2. touch /tmp/a to ensure triggerer returns an event.
  3. Check for queued duration value in gantt view.
from __future__ import annotations

from datetime import datetime

from airflow import DAG
from airflow.models.baseoperator import BaseOperator
from airflow.triggers.file import FileTrigger


class FileCheckOperator(BaseOperator):
    def __init__(self, filepath, **kwargs):
        self.filepath = filepath
        super().__init__(**kwargs)

    def execute(self, context):
        self.defer(
            trigger=FileTrigger(filepath=self.filepath),
            method_name="execute_complete",
        )

    def execute_complete(self, context, event=None):
        pass


with DAG(
    dag_id="file_trigger",
    start_date=datetime(2021, 1, 1),
    catchup=False,
    schedule_interval=None,
) as dag:
    t1 = FileCheckOperator(task_id="t1", filepath="/tmp/a")
    t2 = FileCheckOperator(task_id="t2", filepath="/tmp/b")

    t1
    t2

Operating System

Ubuntu

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@tirkarthi tirkarthi added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Oct 31, 2023
@tirkarthi
Copy link
Contributor Author

We are testing a patch for this internally and hopefully upstream it as a pull request in coming week. Please assign this to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core kind:bug This is a clearly a bug
Projects
None yet
2 participants