Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Go SDK] Timers with new datalayer #26101

Merged
merged 23 commits into from
Apr 28, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 227 additions & 0 deletions sdks/go/examples/timer_wordcap/wordcap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// timer_wordcap is a toy streaming pipeline that uses State and Timers with PubSub. It
// does the following:
//
// (1) create a topic and publish a few messages to it
// (2) Set user state and timer
//
// NOTE: it only runs on Dataflow and must be manually cancelled.
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
package main

import (
"context"
"flag"
"fmt"
"os"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
"golang.org/x/exp/slog"
)

var (
input = flag.String("input", os.ExpandEnv("$USER-wordcap"), "Pubsub input topic.")
)

var (
data = []string{
"foo",
"bar",
"baz",
}
)
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved

type Stateful struct {
ElementBag state.Bag[string]
TimerTime state.Value[int64]
MinTime state.Combining[int64, int64, int64]

OutputState timers.ProcessingTime
}

func NewStateful() *Stateful {
return &Stateful{
ElementBag: state.MakeBagState[string]("elementBag"),
TimerTime: state.MakeValueState[int64]("timerTime"),
MinTime: state.MakeCombiningState[int64, int64, int64]("minTiInBag", func(a, b int64) int64 {
if a < b {
return a
}
return b
}),

OutputState: timers.InProcessingTime("outputState"),
}
}

func (s *Stateful) OnTimer(ctx context.Context, ts beam.EventTime, tp timers.Provider, key, timerKey, timerTag string) {
Copy link
Contributor

@hnnsgstfssn hnnsgstfssn Apr 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example only includes the key. If I want to access the value in OnTimer, will I have to pass it through the state API? I see that the design says a value should be mapped, but this says the interaction is through state API. I have not been able to add an input value, getting some variant of

panic: reflect: Call using zero Value argument goroutine 1472 [running]:
runtime/debug.Stack()
	/usr/lib/go/src/runtime/debug/stack.go:24 +0x65
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.callNoPanic.func1()
	/home/rru/go/pkg/mod/github.com/riteshghorse/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/util.go:58 +0xa5
panic({0x1d9c940, 0xc000c39260})
	/usr/lib/go/src/runtime/panic.go:884 +0x213
reflect.Value.call({0x2016b60?, 0xc000a4d480?, 0x44f6d2?}, {0x28561b5, 0x4}, {0xc000a89970, 0x7, 0x5ddc11?})
	/usr/lib/go/src/reflect/value.go:437 +0x1aee
reflect.Value.Call({0x2016b60?, 0xc000a4d480?, 0x40e427?}, {0xc000a89970?, 0x24e75a0?, 0xc0006e6801?})
	/usr/lib/go/src/reflect/value.go:370 +0xbc
github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx.(*reflectFunc).Call(0xc000a86ed0, {0xc000931340?, 0xc0006e6988?, 0xa6660f?})
	/home/rru/go/pkg/mod/github.com/riteshghorse/beam/sdks/[email protected]/go/pkg/beam/core/util/reflectx/call.go:87 +0x59
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*invoker).initCall.func46({0x0, 0x0, 0x0, 0x0, 0x0}, {0x3ea9da0, 0x1, 0x1}, 0xf?)
	/home/rru/go/pkg/mod/github.com/riteshghorse/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/fn_arity.go:307 +0x94
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*invoker).invokeWithOpts(0xc000b6a780, {0x2b630a8?, 0xc000a4ccc0}, {0x0, 0x0, 0x0, 0x0, 0x0}, {0x3ea9da0, 0x1, ...}, ...)

Similarly I have not been able to add any emitter, getting similar errors as per above.

Another issue I had was that it failed to validate when the beam.EventTime was placed after the timers.Provider. It says then that it needs to be before the main input. Moving it into the same position as here seems to work.

Finally I was unable to add timers to a GBK DoFn, having it complain about needing to use KV with timers, but I need to investigate further on this one. Maybe it is not supposed to be possible to use timers in such cases?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your interest!

The design doc is a little out of date, and was authored with a less than complete understanding of timers. The Beam Model doc is correct and any values would need to be carried over via the state API.

The FnAPI simply doesn't have a provision for the additional data.

Timers and State are implicitly per key, but timers can have an additional "tag" associated with them for an additional uniqueness factor.


We definitely do need to have emitters working (which would always match the ProcessElement emitters exactly), and it's probably simply just not implemented in this WIP PR. Adding new "lifecycle" methods isn't simple.

I believe that GBKs should be able to have a timer callback and associated state it's largely just a validation thing on the SDK that needs updating.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for testing it out. I got diverted to other part of the work. I've added support for emitters now. Feel free to test your pipelines

switch timerKey {
case "outputState":
log.Infof(ctx, "Timer outputState fired on stateful for element: %v.", key)
s.OutputState.Clear(tp)
switch timerTag {
case "001":
log.Infof(ctx, "Timer with tag 001 fired on outputState stateful DoFn.")
s.OutputState.Set(tp, mtime.Now().ToTime().Add(1*time.Minute), timers.WithTag(timerTag))
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

func (s *Stateful) ProcessElement(ctx context.Context, ts beam.EventTime, sp state.Provider, tp timers.Provider, key, word string, emit func(string, string)) error {
s.ElementBag.Add(sp, word)
s.MinTime.Add(sp, int64(ts))

toFire, ok, err := s.TimerTime.Read(sp)
if err != nil {
return err
}
if !ok {
toFire = int64(mtime.Now().Add(1 * time.Minute))
}
minTime, _, err := s.MinTime.Read(sp)
if err != nil {
return err
}

s.OutputState.Set(tp, mtime.Time(toFire).ToTime(), timers.WithOutputTimestamp(mtime.Time(minTime).ToTime()), timers.WithTag(word))
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
s.TimerTime.Write(sp, toFire)

return nil
}

type eventtimeSDFStream struct {
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
RestSize, Mod, Fixed int64
Sleep time.Duration
}

func (fn *eventtimeSDFStream) Setup() error {
return nil
}

func (fn *eventtimeSDFStream) CreateInitialRestriction(v beam.T) offsetrange.Restriction {
return offsetrange.Restriction{Start: 0, End: fn.RestSize}
}

func (fn *eventtimeSDFStream) SplitRestriction(v beam.T, r offsetrange.Restriction) []offsetrange.Restriction {
// No split
return []offsetrange.Restriction{r}
}

func (fn *eventtimeSDFStream) RestrictionSize(v beam.T, r offsetrange.Restriction) float64 {
return r.Size()
}

func (fn *eventtimeSDFStream) CreateTracker(r offsetrange.Restriction) *sdf.LockRTracker {
return sdf.NewLockRTracker(offsetrange.NewTracker(r))
}

func (fn *eventtimeSDFStream) ProcessElement(ctx context.Context, _ *CWE, rt *sdf.LockRTracker, v beam.T, emit func(beam.EventTime, int64)) sdf.ProcessContinuation {
r := rt.GetRestriction().(offsetrange.Restriction)
i := r.Start
if r.Size() < 1 {
log.Debugf(ctx, "size 0 restriction, stoping to process sentinel %v", slog.Any("value", v))
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
return sdf.StopProcessing()
}
slog.Debug("emitting element to restriction", slog.Any("value", v), slog.Group("restriction",
slog.Any("value", v),
slog.Float64("size", r.Size()),
slog.Int64("pos", i),
))
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
if rt.TryClaim(i) {
v := (i % fn.Mod) + fn.Fixed
emit(mtime.Now(), v)
}
return sdf.ResumeProcessingIn(fn.Sleep)
}

func (fn *eventtimeSDFStream) InitialWatermarkEstimatorState(_ beam.EventTime, _ offsetrange.Restriction, _ beam.T) int64 {
return int64(mtime.MinTimestamp)
}

func (fn *eventtimeSDFStream) CreateWatermarkEstimator(initialState int64) *CWE {
return &CWE{Watermark: initialState}
}

func (fn *eventtimeSDFStream) WatermarkEstimatorState(e *CWE) int64 {
return e.Watermark
}

type CWE struct {
Watermark int64 // uses int64, since the SDK prevent mtime.Time from serialization.
}

func (e *CWE) CurrentWatermark() time.Time {
return mtime.Time(e.Watermark).ToTime()
}

func (e *CWE) ObserveTimestamp(ts time.Time) {
// We add 10 milliseconds to allow window boundaries to
// progress after emitting
e.Watermark = int64(mtime.FromTime(ts.Add(-90 * time.Millisecond)))
}

func init() {
register.DoFn7x1[context.Context, beam.EventTime, state.Provider, timers.Provider, string, string, func(string, string), error](&Stateful{})
register.Emitter2[string, string]()
register.DoFn5x1[context.Context, *CWE, *sdf.LockRTracker, beam.T, func(beam.EventTime, int64), sdf.ProcessContinuation]((*eventtimeSDFStream)(nil))
register.Emitter2[beam.EventTime, int64]()
}

func main() {
flag.Parse()
beam.Init()

ctx := context.Background()

log.Infof(ctx, "Publishing %v messages to: %v", len(data), *input)
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved

p := beam.NewPipeline()
s := p.Root()

imp := beam.Impulse(s)
elms := 3
out := beam.ParDo(s, &eventtimeSDFStream{
Sleep: time.Second,
RestSize: int64(elms),
Mod: int64(elms),
Fixed: 1,
}, imp)

str := beam.ParDo(s, func(b int64) string {
return fmt.Sprintf("%03d", b)
}, out)

keyed := beam.ParDo(s, func(ctx context.Context, ts beam.EventTime, s string) (string, string) {
return "test", s
}, str)

timed := beam.ParDo(s, NewStateful(), keyed)
debug.Printf(s, "post stateful: %v", timed)

if err := beamx.Run(context.Background(), p); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}
Loading