Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow 2.0.0 support? #14

Closed
mjpieters opened this issue Oct 15, 2020 · 8 comments
Closed

Airflow 2.0.0 support? #14

mjpieters opened this issue Oct 15, 2020 · 8 comments

Comments

@mjpieters
Copy link

mjpieters commented Oct 15, 2020

Airflow 2.0.0a1 has been released (see https://lists.apache.org/thread.html/rf34558953ba367561574c194500a34d7f3c21fe2798b173b86fc309c%40%3Cdev.airflow.apache.org%3E).

Airflow 2.0.0 no longer has a DagRunOrder:

[2020-10-15 16:07:17,999] {plugins_manager.py:156} ERROR - Failed to import plugin airflow_multi_dagrun
Traceback (most recent call last):
  File "/.../lib/python3.6/site-packages/airflow/plugins_manager.py", line 149, in load_entrypoint_plugins
    plugin_class = entry_point.load()
  File "/.../lib/python3.6/site-packages/pkg_resources/__init__.py", line 2447, in load
    return self.resolve()
  File "/.../lib/python3.6/site-packages/pkg_resources/__init__.py", line 2453, in resolve
    module = __import__(self.module_name, fromlist=['__name__'], level=0)
  File "/.../lib/python3.6/site-packages/airflow_multi_dagrun/__init__.py", line 5, in <module>
    from . import operators
  File "/.../lib/python3.6/site-packages/airflow_multi_dagrun/operators/__init__.py", line 1, in <module>
    from .multi_dagrun import TriggerMultiDagRunOperator  # noqa
  File "/.../lib/python3.6/site-packages/airflow_multi_dagrun/operators/multi_dagrun.py", line 3, in <module>
    from airflow.operators.dagrun_operator import DagRunOrder, TriggerDagRunOperator
ImportError: cannot import name 'DagRunOrder'

see apache/airflow#6317 and this UPDATING note.

@mjpieters
Copy link
Author

As far as I am concerned, the best way to handle this is to just *not use DagRunOrder, e.g. instead of the python callable generating DagRunOrder instances with a payload:

for ... in ...:
    yield DagRunOrder(payload={ ... })

just yield the payloads directly:

for ... in ...:
    yield { ... }

This already works (see these lines of code that auto-wrap non-DRO results), but in order for this to work in Airflow 2, the whole notion of the DRO instances could be dropped entirely.

@mastak
Copy link
Owner

mastak commented Nov 19, 2020

Hi @mjpieters
I've updated the code for airflow 2. You can find code in rc-2.0.0 branch.

You can start the server with the next instructions (docker required):

1. `make init` - create db
2. `make add-admin` - create `admin` user (is asks a password)
3. `make run` - start docker containers, run airflow webserver

`make down` will stop and remove docker containers 

As I see in UI - trigger_with_multi_dagrun_sensor works fine. But in the logs, I see the next error, and I can't find the root cause:

webserver_1  | Process ForkProcess-35:
webserver_1  | Traceback (most recent call last):
webserver_1  |   File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
webserver_1  |     self.run()
webserver_1  |   File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
webserver_1  |     self._target(*self._args, **self._kwargs)
webserver_1  |   File "/usr/local/lib/python3.7/site-packages/airflow/utils/dag_processing.py", line 365, in _run_processor_manager
webserver_1  |     processor_manager.start()
webserver_1  |   File "/usr/local/lib/python3.7/site-packages/airflow/utils/dag_processing.py", line 596, in start
webserver_1  |     return self._run_parsing_loop()
webserver_1  |   File "/usr/local/lib/python3.7/site-packages/airflow/utils/dag_processing.py", line 659, in _run_parsing_loop
webserver_1  |     self._processors.pop(processor.file_path)
webserver_1  | KeyError: '/usr/local/airflow/dags/common_target.py'

@mastak
Copy link
Owner

mastak commented Nov 19, 2020

may be the issue in airflow and it will be fixed with a stable release of 2.0.0 version.

@mjpieters
Copy link
Author

The KeyError is a known issue, I've reported it in the Airflow 2.0 slack channel at some point, but can't right now find any issue for it on GitHub.

@mjpieters
Copy link
Author

Thanks for updating!

I've since created my own re-implementation as I needed the operator sooner; it is largely the same except I sub-classed PythonOperator, and load the dag from a DagBag() instance directly, just once. That avoids having running through the same code paths that re-query the database each time for each dag run created, and lets you do all the work in a single SQLAlchemy session.

E.g. the execute method looks like this:

    @provide_session
    def execute(self, context: Dict, session=None):
        context.update(self.op_kwargs)
        context["templates_dict"] = self.templates_dict

        self.op_kwargs = determine_kwargs(self.python_callable, self.op_args, context)

        dag_id = self.trigger_dag_id
        dag_model = DagModel.get_current(dag_id, session=session)
        if dag_model is None:
            raise DagNotFound(f"Dag id {dag_id} not found in DagModel")

        dag_bag = DagBag(dag_folder=dag_model.fileloc, read_dags_from_db=True)
        dag = dag_bag.get_dag(dag_id, session=session)
        dag_hash = dag_bag.dags_hash.get(dag_id)
        dag_run_ids = []
        for run_conf in self.execute_callable():
            now = timezone.utcnow()
            dag_run = dag.create_dagrun(
                state=State.RUNNING,
                execution_date=now,
                external_trigger=True,
                conf=run_conf,
                run_type=DagRunType.MANUAL,
                dag_hash=dag_hash,
                session=session,
            )
            dag_run_ids.append(dag_run.id)
            self.log.info("Created DagRun %s, %s", dag_run, now)

        if dag_run_ids:
            context["ti"].xcom_push(TRIGGER_DAGRUN_XCOM_KEY, dag_run_ids)
        else:
            self.log.info("No DagRuns created")

@mastak
Copy link
Owner

mastak commented Nov 21, 2020

Sounds reasonable, I'll apply your way. Thank you!

@honarkhah
Copy link

Hi, is there any update?

@mastak
Copy link
Owner

mastak commented Feb 28, 2021

@honarkhah
Hi, just released 2.0.0 version.

@mastak mastak closed this as completed Feb 28, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants