Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#31438] Trigger Precusor work for Prism. #33763

Merged
merged 8 commits into from
Feb 4, 2025
19 changes: 18 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/engine/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,27 @@ import (
"google.golang.org/protobuf/encoding/protowire"
)

// StateData is a "union" between Bag state and MultiMap state to increase common code.
// StateData is a "union" between Bag, MultiMap, and Trigger state to increase
// common code.
//
// Trigger state is never explicitly set by users, but occurs on demand when
// a trigger requires state.
type StateData struct {
Bag [][]byte
Multimap map[string][][]byte

Trigger map[Trigger]triggerState
}

func (s *StateData) getTriggerState(key Trigger) triggerState {
if s.Trigger == nil {
damccorm marked this conversation as resolved.
Show resolved Hide resolved
s.Trigger = map[Trigger]triggerState{}
}
return s.Trigger[key]
}

func (s *StateData) setTriggerState(key Trigger, val triggerState) {
s.Trigger[key] = val
}

// TimerKey is for use as a key for timers.
Expand Down
20 changes: 18 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1195,6 +1195,7 @@ func (ss *stageState) AddPending(newPending []element) int {
}
newPending = origPending
}
//slog.Warn("AddPending", "stage", ss.ID, "pending", newPending)
lostluck marked this conversation as resolved.
Show resolved Hide resolved
if ss.stateful {
if ss.pendingByKeys == nil {
ss.pendingByKeys = map[string]*dataAndTimers{}
Expand Down Expand Up @@ -1377,12 +1378,26 @@ keysPerBundle:
minTs = dnt.elements[0].timestamp
}

dataInBundle := false

// Can we pre-compute this bit when adding to pendingByKeys?
// startBundle is in run in a single scheduling goroutine, so moving per-element code
// to be computed by the bundle parallel goroutines will speed things up a touch.
for dnt.elements.Len() > 0 {
// We can't mix data and timers in the same bundle, as there's no
// guarantee which is processed first SDK side.
// If the bundle already contains data, then it's before the timer
// by the heap invariant, and must be processed before we can fire a timer.
// AKA, keep them seperate.
if len(toProcess) > 0 && // If we have already picked some elements AND
((dataInBundle && dnt.elements[0].IsTimer()) || // we're about to add a timer to a Bundle that already has data OR
(!dataInBundle && !dnt.elements[0].IsTimer())) { // we're about to add data to a bundle that already has a time
break
}
e := heap.Pop(&dnt.elements).(element)
if e.IsTimer() {
if e.IsData() {
dataInBundle = true
} else {
lastSet, ok := dnt.timers[timerKey{family: e.family, tag: e.tag, window: e.window}]
if !ok {
timerCleared = true
Expand All @@ -1392,7 +1407,7 @@ keysPerBundle:
timerCleared = true
continue
}
holdsInBundle[e.holdTimestamp] += 1
holdsInBundle[e.holdTimestamp]++
// Clear the "fired" timer so subsequent matches can be ignored.
delete(dnt.timers, timerKey{family: e.family, tag: e.tag, window: e.window})
}
Expand Down Expand Up @@ -1526,6 +1541,7 @@ func (ss *stageState) makeInProgressBundle(genBundID func() string, toProcess []
ss.inprogressKeysByBundle[bundID] = newKeys
ss.inprogressKeys.merge(newKeys)
ss.inprogressHoldsByBundle[bundID] = holdsInBundle
//slog.Warn("makeInProgressBundle", "stage", ss.ID, "toProcess", toProcess)
lostluck marked this conversation as resolved.
Show resolved Hide resolved
return bundID
}

Expand Down
Loading
Loading