Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make IncrementalDataset's confirms "namespaced" #4039

Open
gtauzin opened this issue Jul 29, 2024 · 4 comments
Open

Make IncrementalDataset's confirms "namespaced" #4039

gtauzin opened this issue Jul 29, 2024 · 4 comments
Assignees
Labels
Component: Framework Issue/PR that addresses core framework functionality Issue: Feature Request New feature or improvement to existing feature

Comments

@gtauzin
Copy link

gtauzin commented Jul 29, 2024

Description

I have a namespace-based incremental dataset and wish to use the confirms attribute to trigger CHECKPOINT update further down my pipeline. However, based on discussions on Slack, it seems that incremental datasets are not meant to be used within namespaces and so confirms is not "namespaced" by design.

Following discussion with @noklam on Slack, it seems that my use case could justify having "namespaced" confirms.

Context

I have many devices that regularly record event files and push it to a S3 bucket. I would like to run a preprocessing pipeline that is different for each device and that would for each of them:

  1. Load all new files as dataframes, preprocess them and concatenate the preprocessed recorded event and save the results to another S3 bucket
  2. Load all preprocessed recorded files computed so far and concatenate them

Then , I use the concatenation of all recorded preprocessed event seen so far for data science purposes.

The way I achieve this with Kedro is:

  • For step 1, I use IncrementalDataset and the concatenated dataframe is saved using a versioned ParquetDataset
  • For step 2, I use a PartionedDataset that is able to find all preprocessed recorded event computer so far (with load_args withdirs and max_depth set accordingly)

Those steps are done for each device, so I use namespace to reuse the same logic for all of them varying the S3 bucket path. I need the confirms to be at step 2 because only then I can consider new files to have been processed.

Workaround

@noklam suggested to try putting the namespace in the argument, e.g. confirms=namespace.data, as a workaround and I can confirm this worked.

@gtauzin gtauzin added the Issue: Feature Request New feature or improvement to existing feature label Jul 29, 2024
@gtauzin
Copy link
Author

gtauzin commented Sep 10, 2024

I believe this is also hidding a bug. If the incremental dataset is namespaced and the confirms argument is not explicitely set as per the workaround, no checkpoint file is created. I would guess that this is because if confirms is not provided, it is set to the incremental dataset name without the namespace and this dataset does not actually exist.

@astrojuanlu astrojuanlu added Community Issue/PR opened by the open-source community and removed Issue: Feature Request New feature or improvement to existing feature labels Dec 2, 2024
@github-project-automation github-project-automation bot moved this to Wizard inbox in Kedro Wizard 🪄 Dec 2, 2024
@astrojuanlu
Copy link
Member

Thanks @gtauzin and sorry for the slow response. We will investigate the issue you mention first.

@ElenaKhaustova ElenaKhaustova self-assigned this Feb 19, 2025
@ElenaKhaustova ElenaKhaustova moved this from Wizard inbox to In Progress in Kedro Wizard 🪄 Feb 19, 2025
@ElenaKhaustova
Copy link
Contributor

Example

The related example of the pipeline shared by user: #4164

def create_pipeline(**kwargs) -> Pipeline:
    def get_pipeline(namespace: str):
        template_pipeline = pipeline(
            [
                node(
                    concatenate_increment,
                    inputs="data_increment",
                    outputs=["concatenated_data_increment", "data_increment_concatenated"],
                    name="concatenate_increment",
                    confirms=f"{namespace}.data_increment", # This is needed as the incremental dataset is namespaced
                ),
                node(
                    concatenate_partition,
                    inputs=[
                        "partitioned_concatenated_data",
                        "data_increment_concatenated",
                    ],
                    outputs="extracted_data",
                    name="concatenate_partition",
                ),
            ],
        )

        return template_pipeline
 
    pipelines = pipeline(
        pipe=get_pipeline(namespace=SOURCES[0]),
        namespace=SOURCES[0],
    )
    for source in SOURCES[1:]:
        pipelines += pipeline(
            pipe=get_pipeline(namespace=source),
            namespace=source,
        )

    return pipelines
"{source}.data_increment":
  type: partitions.IncrementalDataset
  path: data/01_raw//{source}/
  dataset:
    type: pandas.CSVDataset
  filename_suffix: ".csv"

Explanation

After the node execution we check if dataset should be confirmed and then confirm it via catalog.confirm. So for the example above we will use namespaced dataset name to check if it present in node.confirms

for name in node.confirms:

On the node side we store confirms as str or list[str] input from node constructor.

self._confirms = confirms

So node.confirms returns confirms as is without applying node namespace as reported by user.

def confirms(self) -> list[str]:

@gtauzin currently there's no way to apply namespace dynamically to what you pass into confirms when constructing a node.

Possible solution

Since we know node namespace and node name at the level of the node we can resolve inputs like that {namespace}.data_increment when creating self._confirms property for node. So at the time of the check node.confirms contained namespaced dataset names.

@ElenaKhaustova ElenaKhaustova added Issue: Feature Request New feature or improvement to existing feature Component: Framework Issue/PR that addresses core framework functionality and removed Community Issue/PR opened by the open-source community labels Feb 19, 2025
@gtauzin
Copy link
Author

gtauzin commented Feb 20, 2025

@ElenaKhaustova Thanks for looking into this!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Component: Framework Issue/PR that addresses core framework functionality Issue: Feature Request New feature or improvement to existing feature
Projects
Status: No status
Development

No branches or pull requests

3 participants