-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#31438] Trigger Precusor work for Prism. #33763
Conversation
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Outdated
Show resolved
Hide resolved
RFAL We have a disagreement on the design (single stateful isReady vs splitting it into onElement, shouldFire, onFire, like Java does). I'm not convinced there's a material benefit to the split in regards to readability or testability. You get extremely fine grain unit tests, but what happens within a sequence of elements for a given watermark is what matters. What I'm looking for is a reason for them to be separate. eg. Without the individual methods, we can't evaluate X, which would need shouldFire + onFire, but never onElement. It is possible that the necessity of a split will become obviously required when I get to actually incorporating it into the execution flow. |
Took a break from this yesterday in order to control my tab load, but will be doing the onElement, shouldFire, onFire split in this PR presently. |
PTAL now that I've moved everything to the "stateless shouldFire" approach. |
// a finished state. | ||
onFire(state *StateData) | ||
|
||
// TODO merging triggers and state for merging windows |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any docs or issues (preferred) we can reference here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just the umbrella trigger issue at this time. Linking for here, and for the unimplemented ProcessingTime triggers at the bottom as well.
I can change them to more precise issues with the later PRs that this one is blocking.
return | ||
} | ||
if !state.getTriggerState(t.Early).finished { | ||
if t.Early.shouldFire(state) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we check ts.extra
here as well? I don't really follow how ts.extra
is being used in this context
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had that in a previous version but it was leading to incorrect testing behaviors:
In particular, in the Java implementation, it uses triggers for all firings, Early, and Late, but also OnTime and Closing (which is either OnTime or Late).
The implementation I've settled on is not to do that, since OnTime and Closing are already handled and only occur when the watermark has advanced anyway.
So the extra state is mostly there to mark that we transition from the Early firings trigger (if any) to the Late firings trigger (if any).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What you describe could cause weirdness for a corner case that is rare but I got called out for: if someone has windowing but also the trigger is Repeat(ElementCount(n)
or Repeat(ProcessingTime(30s))
then they are not expecting an extra firing when the watermark crosses the end of the window. Presumably Prism would emit at that point. I honestly doubt any reasonable pipeline would notice the difference, but there you have it. (note that for a trigger Repeat(ProcessingTime(30s))
we still mark the final non-late pane as "on time")
In the Java implementation the end-of-window firing is still governed by the trigger (but also governed by OnTimeBehavior
), but the expiry firing is governed only by ClosingBehavior
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At present, in the follow up PR, Prism will always Emit at the end of window, except for the never trigger, which would be only at the closing time.
It doesn't make sense to me for triggers to be designed or implemented in such a way to lead to a trivial method of dataloss. That is against the point of a data processing system.
Is there a benefit to allow for trivial dataloss, vs having the sensible behavior of always firing at the end of the window?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I can imagine a case where I define my trigger as Repeat(ElementCount(4)), AccumulationMode.Discarding
, and I really only want batches of 4 elements at a time or something. But that is also pretty trivial to implement by just dropping the data.
Repeat(ProcessingTime(30s))
seems less problematic, since processing time timers aren't guaranteed to fire on time anyways, right?
Overall, I don't feel super strongly either way, and I do think this is a reasonable behavior. My main concern is consistency, but it is probably ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PTAL
// a finished state. | ||
onFire(state *StateData) | ||
|
||
// TODO merging triggers and state for merging windows |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just the umbrella trigger issue at this time. Linking for here, and for the unimplemented ProcessingTime triggers at the bottom as well.
I can change them to more precise issues with the later PRs that this one is blocking.
return | ||
} | ||
if !state.getTriggerState(t.Early).finished { | ||
if t.Early.shouldFire(state) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had that in a previous version but it was leading to incorrect testing behaviors:
In particular, in the Java implementation, it uses triggers for all firings, Early, and Late, but also OnTime and Closing (which is either OnTime or Late).
The implementation I've settled on is not to do that, since OnTime and Closing are already handled and only occur when the watermark has advanced anyway.
So the extra state is mostly there to mark that we transition from the Early firings trigger (if any) to the Late firings trigger (if any).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even though I have some nits, I didn't see anything that I'm certain would lead to incorrect behavior under normal circumstances. Since this stuff is so fiddly, I definitely think expanding the testing to the trigger_trascripts (and adding to it) is worthwhile. Also you may want to wait for another approval from Danny to get more eyes on it.
} | ||
|
||
// IsTriggerReady updates the trigger state with the given input, and returns | ||
// if the trigger is ready to fire. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This description is inaccurate, in that it does not mention that it will run the trigger's onFire
logic, hence the state will no longer be in the ready state.
Perhaps
// IsTriggerReady updates the trigger state with the given input, checks if the trigger is ready and, if so,
// advances state according to the trigger's semantics. Returns `true` if the trigger was fired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I've always understood "ready to fire" here means "Caller, you must create a bundle now".
Again, it's an efficiency thing more than anything. The subsequent PR creates the triggered bundle immeadiately, which is a valid approach to how the triggers fire.
It could instead finish processing all the elements received, and then fire a larger consolidated bundle for the firing.
But that leads to performance optimization which I sadly have no time for.
} | ||
|
||
func (t *TriggerAfterEach) onFire(state *StateData) { | ||
if !t.shouldFire(state) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is interesting. In general I don't think each onFire
needs to check shouldFire
and exit early. Even if it represents an erroneous call in that state, it will cause probably spookier behavior than the behavior that would result from it just doing what the caller says.
It is probably fine either way. The Java codebase does it differently, with the only such check being in the AfterFirstStateMachine
call for onFire
which checks which of its subtriggers is eligible for fire and tells them to fire. TBH these are all corners of triggers semantics with no user value and probably no users. I'm OK with it the way it is, too. Python simply didn't implement any of the triggers that would be problematic.
Incidentally this is a reason to combine shouldFire
, onFire
, and onBundleMetadata
into one call like you had before :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's largely that at this point I'd rather have an implementation that works, than be efficient.
If efficiency is required, those can be much smaller targetted PRs, as guided by a profiler.
|
||
func (t *TriggerElementCount) shouldFire(state *StateData) bool { | ||
ts := state.getTriggerState(t) | ||
if ts.finished { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noting that the Java SDK factored it so that an overarching structure watched the state for finished or not, saving the copy/paste of this check into every shouldFire
. But again this call should be moot because the overaching thing should never be checking shouldFire
for a trigger that is finished.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer having the explicit check, over the implicit magic.
If each trigger were a leaf rather than an arbitrary tree, then I'd have put it in a single place external to them.
But the Java approach of spawning manipulation machines from within the contexts is convoluted indirection IMHO. Being explicit that the trigger is checking it's finished state is easier for everyone to see wether anything is handled appropriately in a concrete fashion vs through a 3rd layer abstraction.
return | ||
} | ||
if !state.getTriggerState(t.Early).finished { | ||
if t.Early.shouldFire(state) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What you describe could cause weirdness for a corner case that is rare but I got called out for: if someone has windowing but also the trigger is Repeat(ElementCount(n)
or Repeat(ProcessingTime(30s))
then they are not expecting an extra firing when the watermark crosses the end of the window. Presumably Prism would emit at that point. I honestly doubt any reasonable pipeline would notice the difference, but there you have it. (note that for a trigger Repeat(ProcessingTime(30s))
we still mark the final non-late pane as "on time")
In the Java implementation the end-of-window firing is still governed by the trigger (but also governed by OnTimeBehavior
), but the expiry firing is governed only by ClosingBehavior
.
@@ -44,3 +44,391 @@ func TestEarliestCompletion(t *testing.T) { | |||
} | |||
} | |||
} | |||
|
|||
func TestTriggers_isReady(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aha I found the file: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/testing/data/trigger_transcripts.yaml
There are fewer cases in here that I would expect/hope. But this is where Robert started to gather them in an SDK-independent way.
I have another PR or two waiting in the wings that pass the Java test suite, and I'm running out of time for context switches. I'm already delayed a week due to the Spec having been incorrect (#33840). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - it makes me uncomfortable that some of these corner cases are a little poorly defined/may be inconsistent across runners, but I don't think this PR is able to address that.
The lack of orthogonality and the obvious data loss gaps as possible interpretations of triggers really vindicate my choice in avoiding them for the longest time. Conversely, the loss definitions permit closing the poorer implementations in practice. Thanks for the reviews! |
Mostly locking in some fixes and work to avoid making Triggers impossible to review.
This PR has 3 components.
Part of #31438.
Actual use of the StateMachine will occur in a subsequent PR. Keeping the commented out debug prints for the time being.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.