Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add worker config to cache task completion results. #3178

Merged
merged 5 commits into from
Jun 24, 2022

Conversation

riga
Copy link
Contributor

@riga riga commented Jun 21, 2022

Description

This PR adds a new worker config cache_task_completion which, when enabled (default is False of course), leads to tasks' successful complete() results being cached by the worker. In short, this can save resources in case tasks have a large number of dynamic dependencies that are yielded in several stages (see below), and the tasks' complete() calls are somewhat expensive (e.g. when file targets are remote).

Motivation and Context

We sometimes have k's of tasks being yielded through dynamic dependencies, and we only use files on remote locations. Therefore we want to limit the number of API calls and would really benefit from this option. Also, in our case we are sure that once a task is complete, it never toggles to being incomplete again, so that this kind of caching is 100% safe (and I'm sure that the opposite is rather rare). Example:

class MyTask(luigi.Task):
    ...
    def run(self):
        # first round
        deps_1 = [DepA1(), DepB1(), ..., DepZ1()]
        outputs_1 = yield deps_1

        # second round
        # (with some artificial relation to outputs_1 to justify that two yields are required ...)
        deps_1 = [dep(param=x) for dep, x in zip([DepA2, DepB2, ..., DepZ2], outputs_1)]
        yield deps_2

When running, deps_1 are yielded twice, and the completion checks are triggered multiple times:

  1. when yielded the first time and reaching this point,
  2. before being added to the tree
  3. after each dep ran, in the post-run check,
  4. when yielded the second time at the same position as 1.5.

And for every additional round of yielding, there will be another check (same as 4.). With the new config enabled, the checks would only run for 1. and 2..

Have you tested this? If so, how?

I added a test to worker_test.py that checks the number of complete() calls of tasks being yielded as dynamic dependencies, with and without the new config. The documentation is updated as well.

@riga riga requested review from dlstadther and a team as code owners June 21, 2022 14:27
Copy link
Collaborator

@dlstadther dlstadther left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the purpose of this change and I think it's great! However, I'm a bit confused by some of the unittest assertions. Hoping to get a little clarity in my understanding of those values.

Comment on lines +496 to +498
self.assertEqual(a10.complete_count, 5)
self.assertEqual(a11.complete_count, 4)
self.assertEqual(a12.complete_count, 3)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm a little unclear why these complete_count values for each of the tasks differ in quantity. Could you clarify that for me?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, i'm confused as to why the assertions below (with check_complete_on_run=True) resulted in larger complete_count quantities.

Copy link
Contributor Author

@riga riga Jun 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm a little unclear why these complete_count values for each of the tasks differ in quantity. Could you clarify that for me?

Sure!

The complete count with cache_task_completion=False of (5, 4, 3) for (a10, a11, a12) is due to luigi's assumption of idempotence of run() methods that yield dynamic dependencies in that the worker invokes run() and in case it's a generator, it get's the next result and

  • if it's a bunch of already complete tasks (some of the completeness checks is happening here), it gets the next generator result, or
  • if it's a bunch of tasks of which at least one is not complete yet, it adds all of them to the tree and forgets about the state of the generator.

(code here) The yielding task is placed back to the tree in PENDING state, too. And when it's started again later on, the entire procedure is triggered again, leading to a new generator in its initial state, but now with the previously incomplete bunch being complete. Therefore, completion checks of tasks of a certain bunch are always performed at least once more than those in the next bunch.

Similarly, i'm confused as to why the assertions below (with check_complete_on_run=True) resulted in larger complete_count quantities.

With check_complete_on_run=True there is a single, additional call happening here which is increasing the counts. I wanted to check if that's consistent with the proposed changes so I added this block in the same test.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, i see now. This makes much more sense now! Thank you for the thorough response!

Copy link
Collaborator

@dlstadther dlstadther left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code changes, documentation additions, and unittest coverage LGTM!

Thanks for your contribution!

Comment on lines +496 to +498
self.assertEqual(a10.complete_count, 5)
self.assertEqual(a11.complete_count, 4)
self.assertEqual(a12.complete_count, 3)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, i see now. This makes much more sense now! Thank you for the thorough response!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants