Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unclear docs on custom decorators #33167

Open
2 tasks done
sheenarbw opened this issue Aug 7, 2023 · 7 comments
Open
2 tasks done

unclear docs on custom decorators #33167

sheenarbw opened this issue Aug 7, 2023 · 7 comments

Comments

@sheenarbw
Copy link

What do you see as an issue?

Referring to: https://airflow.apache.org/docs/apache-airflow/2.3.4/howto/create-custom-decorator.html

The documentation is unclear. It assumes that the reader has done a bunch of things that haven't been well defined and then the last section talks about the ProviderManager class but it's very brief. You're expected to already know what the ProviderManager is. If I use the search box in the docs then apparently ProviderManager is only mentioned in the create-custom-decorator page.

It seems like the only way to make use of this feature is to go digging through the source code. The operator decorator code is not the most straight-forward to read. I assume that most people wont be able to make use of this feature at all.

Solving the problem

Either turn it into a tutorial and walk the reader through the whole process in another way.

Anything else

This is a workaround that seems to do the trick:

I initially came across this because I wanted to make a django operator that would work with the taskflow api. I did this and it seems to work.

def make_django_decorator(app_path, settings_module):
    def django_task(task_id: str = None, *args, **kwargs):
        def django_task_decorator(fn):
            @task(task_id=task_id or fn.__name__, *args, **kwargs)
            def new_fn(*args, **kwargs):
                django_connect(app_path, settings_module)  # connect to the database. Now fn can make use of Django models
                return fn(
                    *args,
                )

            return new_fn

        return django_task_decorator

    return django_task


# I have a Django project named Tilde, now I have a tilde-specific task decorator. I should be able to make operatrs for other Django projects f I want to

tilde_task = make_django_decorator(
    app_path=TILDE_REPO_PATH / "backend", settings_module="backend.settings"
)

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@sheenarbw sheenarbw added kind:bug This is a clearly a bug kind:documentation needs-triage label for new issues that we didn't triage yet labels Aug 7, 2023
@boring-cyborg
Copy link

boring-cyborg bot commented Aug 7, 2023

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@potiuk
Copy link
Member

potiuk commented Aug 7, 2023

Indeed - we have a few of those missing, and indeed there are quite a few cases where we miss tutorials and our users have to look at the source code. Airflow is created by almost 2600 contributors, so some of the documentatio might not entirely reflect the "step-by-step" tutorials. Many of those contributors became ones because they saw something missng and contributed it back - being a code, documentation, ci scripts etc.

Luckily, you seem to be on a good track to go through the process and figure out all the questions and even better - as a new user where some of the assumption that were in the heads of those who created the docs are not in your head. That makes you an ideal person to become new contributor - as you can probably write such documentation from the perspective of someone who would like to learn it - also you do not have to be afraid of making mistakes, writing such documentation in a PR is actually one of the best ways to get attention of maintainers that will make comments and correct any bad assumptions there.

Since you expressed the interest in adding it - I will assign you to the issue.

@potiuk potiuk removed the needs-triage label for new issues that we didn't triage yet label Aug 7, 2023
@Kache
Copy link
Contributor

Kache commented Nov 21, 2023

I had trouble with this too when trying out using creating a custom @task decorator for my custom operator.

I want to keep my code within the same project, but it seems the feature expects you to create a standalone package.

Reading through Airflow source, I found the following to work:

# file: plugins/operators/my_runner_operator.py
class MyRunnerOperator(BaseOperator):
    def __init__(self, *, command = [], env = {}, **kwargs):
        super().__init__(**kwargs)

        self.command = command
        self.env = env

    def execute(self, context: Context):
        # kinda like DockerOperator or run an ECS Task


class MyRunnerDecoratedOperator(DecoratedOperator, MyRunnerOperator):
    custom_operator_name: str = "@task.runner"


def runner_task(
    python_callable: Callable | None = None,
    multiple_outputs: bool | None = None,
    **kwargs,
) -> TaskDecorator:
    return task_decorator_factory(
        python_callable,
        multiple_outputs=multiple_outputs,
        decorated_operator_class=MyRunnerDecoratedOperator,
        **kwargs,
    )


# HACK
pm = ProvidersManager()
pm.initialize_providers_list()
provider_info = {
    'package-name': 'myproj-airflow-providers',
    'name': 'My Proj',
    'description': "a description",
    'task-decorators': [
        {
            'name': 'runner',
            'class-name': 'operators.my_runner_operator.runner_task',
        },
    ],
}
pm._provider_schema_validator.validate(provider_info)
pm._provider_dict['myproj-airflow-providers'] = ProviderInfo('0.0.1', provider_info, "package")
pm._provider_dict = dict(sorted(pm._provider_dict.items()))

This really isn't ideal, and the usage doesn't yet involve the wrapped function at all:

@task.runner(command=['my_exec', 'arg1', 'arg2'])
def do_something(): pass

I'm going to play around with it some more and see if I can get the XCom args working so that incoming args can be used in command within the function instead of in the decorator, and output of runner can be returned from the function into XCom.

But if not, it's going to be better to just use a factory function for my custom operator:

def runner(*cmd, **other_conveniences):
    args, kwargs = process(*cmd, **other_conveniences)  # e.g. auto generate task_id
    return MyRunnerOperator(*args, **kwargs)

@eladkal eladkal removed the kind:bug This is a clearly a bug label Feb 21, 2024
@kyleburke-meq
Copy link

Do we have a better solution for this yet for custom decorators within the same project?

@potiuk
Copy link
Member

potiuk commented Jan 27, 2025

Do we have a better solution for this yet for custom decorators within the same project?

Not sure what you mean by a better solution but if you want to contribute docs about doing it, you are absolutely welcome to help.

@kyleburke-meq
Copy link

@potiuk I was referring to the comment above about having to register custom decorators via the providers manager vs the way you would do it with say a custom operator.

I actually found a work around in another issue and was able to get it working this way:
#44779

I would be happy to contribute this to the docs, but I am not sure if this is intended use.

@potiuk
Copy link
Member

potiuk commented Jan 27, 2025

I would be happy to contribute this to the docs, but I am not sure if this is intended use.

Me neither.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants