Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DatasetAlreadyExistsError thrown when using ThreadRunner, dataset factories #3739

Closed
melvinkokxw opened this issue Mar 26, 2024 · 4 comments
Closed
Labels
Community Issue/PR opened by the open-source community

Comments

@melvinkokxw
Copy link

Description

Using ThreadRunner with dataset factories leads to a DatasetAlreadyExistsError

Context

I have a pipeline that has two nodes using the same input, both inputs should be loaded using dataset factories. When using ThreadRunner with my pipeline, kedro throws a DatasetAlreadyExistsError.

Steps to Reproduce

Here is a minimal reproducible example:

import yaml
from kedro.io import DataCatalog
from kedro.pipeline import Pipeline, node
from kedro.runner import ThreadRunner

catalog_yml = """
"{name}":
  type: MemoryDataset
"""

catalog = yaml.safe_load(catalog_yml)

io = DataCatalog.from_config(catalog)

def return_dataframe(input_df):
    return input_df.head(1)

pipeline = Pipeline(
    [
        node(
            func=return_dataframe, inputs="input_df", outputs="output_df1", name="node1"
        ),
        node(
            func=return_dataframe, inputs="input_df", outputs="output_df2", name="node2"
        ),
    ]
)
runner = ThreadRunner()
runner.run(pipeline, io)

Expected Result

Pipeline should run successfully with no errors

Actual Result

Full error logs here
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ in <cell line: 4>:4                                                                              │
│                                                                                                  │
│   1 from kedro.runner import ThreadRunner                                                        │
│   2                                                                                              │
│   3 runner = ThreadRunner()                                                                      │
│ ❱ 4 runner.run(pipeline, io)                                                                     │
│   5                                                                                              │
│                                                                                                  │
│ /Users/user/.pyenv/versions/3.9.18/envs/evn-3.9/lib/python3.9/site-packages/kedro/runner/r │
│ unner.py:103 in run                                                                              │
│                                                                                                  │
│   100 │   │   │   self._logger.info(                                                             │
│   101 │   │   │   │   "Asynchronous mode is enabled for loading and saving data"                 │
│   102 │   │   │   )                                                                              │
│ ❱ 103 │   │   self._run(pipeline, catalog, hook_manager, session_id)                             │
│   104 │   │                                                                                      │
│   105 │   │   self._logger.info("Pipeline execution completed successfully.")                    │
│   106                                                                                            │
│                                                                                                  │
│ /Users/user/.pyenv/versions/3.9.18/envs/evn-3.9/lib/python3.9/site-packages/kedro/runner/t │
│ hread_runner.py:133 in _run                                                                      │
│                                                                                                  │
│   130 │   │   │   │   done, futures = wait(futures, return_when=FIRST_COMPLETED)                 │
│   131 │   │   │   │   for future in done:                                                        │
│   132 │   │   │   │   │   try:                                                                   │
│ ❱ 133 │   │   │   │   │   │   node = future.result()                                             │
│   134 │   │   │   │   │   except Exception:                                                      │
│   135 │   │   │   │   │   │   self._suggest_resume_scenario(pipeline, done_nodes, catalog)       │
│   136 │   │   │   │   │   │   raise                                                              │
│                                                                                                  │
│ /Users/user/.pyenv/versions/3.9.18/lib/python3.9/concurrent/futures/_base.py:439 in result │
│                                                                                                  │
│   436 │   │   │   │   if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:                     │
│   437 │   │   │   │   │   raise CancelledError()                                                 │
│   438 │   │   │   │   elif self._state == FINISHED:                                              │
│ ❱ 439 │   │   │   │   │   return self.__get_result()                                             │
│   440 │   │   │   │                                                                              │
│   441 │   │   │   │   self._condition.wait(timeout)                                              │
│   442                                                                                            │
│                                                                                                  │
│ /Users/user/.pyenv/versions/3.9.18/lib/python3.9/concurrent/futures/_base.py:391 in        │
│ __get_result                                                                                     │
│                                                                                                  │
│   388 │   def __get_result(self):                                                                │
│   389 │   │   if self._exception:                                                                │
│   390 │   │   │   try:                                                                           │
│ ❱ 391 │   │   │   │   raise self._exception                                                      │
│   392 │   │   │   finally:                                                                       │
│   393 │   │   │   │   # Break a reference cycle with the exception in self._exception            │
│   394 │   │   │   │   self = None                                                                │
│                                                                                                  │
│ /Users/user/.pyenv/versions/3.9.18/lib/python3.9/concurrent/futures/thread.py:58 in run    │
│                                                                                                  │
│    55 │   │   │   return                                                                         │
│    56 │   │                                                                                      │
│    57 │   │   try:                                                                               │
│ ❱  58 │   │   │   result = self.fn(*self.args, **self.kwargs)                                    │
│    59 │   │   except BaseException as exc:                                                       │
│    60 │   │   │   self.future.set_exception(exc)                                                 │
│    61 │   │   │   # Break a reference cycle with the exception 'exc'                             │
│                                                                                                  │
│ /Users/user/.pyenv/versions/3.9.18/envs/evn-3.9/lib/python3.9/site-packages/kedro/runner/r │
│ unner.py:331 in run_node                                                                         │
│                                                                                                  │
│   328 │   if is_async:                                                                           │
│   329 │   │   node = _run_node_async(node, catalog, hook_manager, session_id)                    │
│   330 │   else:                                                                                  │
│ ❱ 331 │   │   node = _run_node_sequential(node, catalog, hook_manager, session_id)               │
│   332 │                                                                                          │
│   333 │   for name in node.confirms:                                                             │
│   334 │   │   catalog.confirm(name)                                                              │
│                                                                                                  │
│ /Users/user/.pyenv/versions/3.9.18/envs/evn-3.9/lib/python3.9/site-packages/kedro/runner/r │
│ unner.py:414 in _run_node_sequential                                                             │
│                                                                                                  │
│   411 │                                                                                          │
│   412 │   for name in node.inputs:                                                               │
│   413 │   │   hook_manager.hook.before_dataset_loaded(dataset_name=name, node=node)              │
│ ❱ 414 │   │   inputs[name] = catalog.load(name)                                                  │
│   415 │   │   hook_manager.hook.after_dataset_loaded(                                            │
│   416 │   │   │   dataset_name=name, data=inputs[name], node=node                                │
│   417 │   │   )                                                                                  │
│                                                                                                  │
│ /Users/user/.pyenv/versions/3.9.18/envs/evn-3.9/lib/python3.9/site-packages/kedro/io/data_ │
│ catalog.py:500 in load                                                                           │
│                                                                                                  │
│   497 │   │   │   >>> df = io.load("cars")                                                       │
│   498 │   │   """                                                                                │
│   499 │   │   load_version = Version(version, None) if version else None                         │
│ ❱ 500 │   │   dataset = self._get_dataset(name, version=load_version)                            │
│   501 │   │                                                                                      │
│   502 │   │   self._logger.info(                                                                 │
│   503 │   │   │   "Loading data from '%s' (%s)...", name, type(dataset).__name__                 │
│                                                                                                  │
│ /Users/user/.pyenv/versions/3.9.18/envs/evn-3.9/lib/python3.9/site-packages/kedro/io/data_ │
│ catalog.py:414 in _get_dataset                                                                   │
│                                                                                                  │
│   411 │   │   │   │   │   data_set_name,                                                         │
│   412 │   │   │   │   )                                                                          │
│   413 │   │   │                                                                                  │
│ ❱ 414 │   │   │   self.add(data_set_name, data_set)                                              │
│   415 │   │   if data_set_name not in self._data_sets:                                           │
│   416 │   │   │   error_msg = f"Dataset '{data_set_name}' not found in the catalog"              │
│   417                                                                                            │
│                                                                                                  │
│ /Users/user/.pyenv/versions/3.9.18/envs/evn-3.9/lib/python3.9/site-packages/kedro/io/data_ │
│ catalog.py:608 in add                                                                            │
│                                                                                                  │
│   605 │   │   │   if replace:                                                                    │
│   606 │   │   │   │   self._logger.warning("Replacing dataset '%s'", data_set_name)              │
│   607 │   │   │   else:                                                                          │
│ ❱ 608 │   │   │   │   raise DatasetAlreadyExistsError(                                           │
│   609 │   │   │   │   │   f"Dataset '{data_set_name}' has already been registered"               │
│   610 │   │   │   │   )                                                                          │
│   611 │   │   self._data_sets[data_set_name] = data_set                                          │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
DatasetAlreadyExistsError: Dataset 'input_df' has already been registered

Your Environment

  • Kedro version used (pip show kedro or kedro -V): 0.18.14, also reproducible on 0.19.3
  • Python version used (python -V): 3.9.18
  • Operating system and version: macOS 14.4 (23E214), Apple M2
@melvinkokxw
Copy link
Author

Here is a temporary fix, by creating all the catalog entries required before the pipeline starts running.

class ResolveDatasetsHooks:
    @hook_impl
    def before_pipeline_run(self, pipeline, catalog):

        data_sets = set()
        for node in pipeline.nodes:
            data_sets.update(node.outputs)
            data_sets.update(node.inputs)

        for ds in data_sets:
            catalog._get_dataset(ds)

@noklam noklam added the Community Issue/PR opened by the open-source community label Mar 26, 2024
@noklam
Copy link
Contributor

noklam commented Mar 26, 2024

Hey @melvinkokxw, love that you provide a clean script instead of a scaffold project, it's very easy for me to run this, appreciate your effort a lot ✨!

I suspect this is related to:

Can you try to change {name} -> {abc}? I try to change the Runner to SequentialRunner which is still failing, so maybe there is something wrong in the script. After I change the {name}, I get different error message and that may solve your problem already.

@noklam
Copy link
Contributor

noklam commented Mar 26, 2024

I manage to run this successfully, it is more of a problem of your script. Was it copied from old version of Kedro? Can you explain a little bit what you are trying to do? Maybe that will give us more context to come up with a better solution.

There are few problems:

  1. The catalog format is wrong, the type key is not intent properly.
  2. you need to wrap things under if __name__ == '__main__', which is a python thing when deal with multi-process/thread
  3. input_df doesn't exist, you cannot use default dataset with no data, I monkeypatch this with a lambda function to bypass the MemoryDataset checking.
import yaml
from kedro.io import DataCatalog
from kedro.pipeline import Pipeline, node
from kedro.runner import ThreadRunner
from kedro.runner.parallel_runner import ParallelRunner
from kedro.runner.sequential_runner import SequentialRunner

if __name__ == "__main__":
    catalog_yml = """
    "{name}":
        type: MemoryDataset
    """

    from kedro.io.memory_dataset import MemoryDataset
    MemoryDataset._load = lambda x: print("lambda!")
    catalog = yaml.safe_load(catalog_yml)

    io = DataCatalog.from_config(catalog)

    def return_dataframe(input_df):
        return "return!"

    pipeline = Pipeline(
        [
            node(
                func=return_dataframe, inputs="input_df", outputs="output_df1", name="node1"
            ),
            node(
                func=return_dataframe, inputs="input_df", outputs="output_df2", name="node2"
            ),
        ]
    )
    runner = ThreadRunner()
    # runner = ThreadRunner()
    runner.run(pipeline, io)

@noklam
Copy link
Contributor

noklam commented Apr 4, 2024

I am closing this issue due to no activity, I tried to reproduce this last time and it work expected. Please reopen with an valid example.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Community Issue/PR opened by the open-source community
Projects
None yet
Development

No branches or pull requests

2 participants