From f75766ea57725e7dbff1422060333157d7fa4cf1 Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Fri, 14 Jul 2023 12:03:31 -0700 Subject: [PATCH] [#22737] Add Go SDK timers to programming guide (#27496) * [#22737] Add Go SDK timers to programming guide * pr comments --------- Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com> --- sdks/go/examples/snippets/04transforms.go | 485 +++++++++++++++++- .../en/documentation/programming-guide.md | 33 +- 2 files changed, 486 insertions(+), 32 deletions(-) diff --git a/sdks/go/examples/snippets/04transforms.go b/sdks/go/examples/snippets/04transforms.go index bb21e9e317e4..e0ff23351135 100644 --- a/sdks/go/examples/snippets/04transforms.go +++ b/sdks/go/examples/snippets/04transforms.go @@ -27,6 +27,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" "github.com/apache/beam/sdks/v2/go/pkg/beam/register" @@ -548,58 +549,114 @@ func contains(s []string, e string) bool { return false } -// TODO(https://github.com/apache/beam/issues/22737): Update state_and_timers to a good example to demonstrate both state and timers. -// Rename this to bag_state and update the bag state example in the programming guide at that point. // [START state_and_timers] +// stateAndTimersFn is an example stateful DoFn with state and a timer. +type stateAndTimersFn struct { + Buffer1 state.Bag[string] + Buffer2 state.Bag[int64] + Watermark timers.EventTime +} + +func (s *stateAndTimersFn) ProcessElement(sp state.Provider, tp timers.Provider, w beam.Window, key string, value int64, emit func(string, int64)) error { + // ... handle processing elements here, set a callback timer... + + // Read all the data from Buffer1 in this window. + vals, ok, err := s.Buffer1.Read(sp) + if err != nil { + return err + } + if ok && s.shouldClearBuffer(vals) { + // clear the buffer data if required conditions are met. + s.Buffer1.Clear(sp) + } + + // Add the value to Buffer2. + s.Buffer2.Add(sp, value) + + if s.allConditionsMet() { + // Clear the timer if certain condition met and you don't want to trigger + // the callback method. + s.Watermark.Clear(tp) + } + + emit(key, value) + + return nil +} + +func (s *stateAndTimersFn) OnTimer(sp state.Provider, tp timers.Provider, w beam.Window, key string, timer timers.Context, emit func(string, int64)) error { + // Window and key parameters are really useful especially for debugging issues. + switch timer.Family { + case s.Watermark.Family: + // timer expired, emit a different signal + emit(key, -1) + } + return nil +} + +func (s *stateAndTimersFn) shouldClearBuffer([]string) bool { + // some business logic + return false +} + +func (s *stateAndTimersFn) allConditionsMet() bool { + // other business logic + return true +} + +// [END state_and_timers] + +// [START bag_state] + // bagStateFn only emits words that haven't been seen type bagStateFn struct { - bag state.Bag[string] + Bag state.Bag[string] } -func (s *bagStateFn) ProcessElement(p state.Provider, book string, word string, emitWords func(string)) error { +func (s *bagStateFn) ProcessElement(p state.Provider, book, word string, emitWords func(string)) error { // Get all values we've written to this bag state in this window. - vals, ok, err := s.bag.Read(p) + vals, ok, err := s.Bag.Read(p) if err != nil { return err } if !ok || !contains(vals, word) { emitWords(word) - s.bag.Add(p, word) + s.Bag.Add(p, word) } if len(vals) > 10000 { // Example of clearing and starting again with an empty bag - s.bag.Clear(p) + s.Bag.Clear(p) } return nil } -// [END state_and_timers] +// [END bag_state] // [START value_state] // valueStateFn keeps track of the number of elements seen. type valueStateFn struct { - val state.Value[int] + Val state.Value[int] } func (s *valueStateFn) ProcessElement(p state.Provider, book string, word string, emitWords func(string)) error { // Get the value stored in our state - val, ok, err := s.val.Read(p) + val, ok, err := s.Val.Read(p) if err != nil { return err } if !ok { - s.val.Write(p, 1) + s.Val.Write(p, 1) } else { - s.val.Write(p, val+1) + s.Val.Write(p, val+1) } if val > 10000 { // Example of clearing and starting again with an empty bag - s.val.Clear(p) + s.Val.Clear(p) } return nil @@ -620,7 +677,7 @@ func (m MyCustomType) FromBytes(_ []byte) MyCustomType { // [START value_state_coder] type valueStateDoFn struct { - val state.Value[MyCustomType] + Val state.Value[MyCustomType] } func encode(m MyCustomType) []byte { @@ -644,40 +701,422 @@ type combineFn struct{} // combiningStateFn keeps track of the number of elements seen. type combiningStateFn struct { // types are the types of the accumulator, input, and output respectively - val state.Combining[int, int, int] + Val state.Combining[int, int, int] } func (s *combiningStateFn) ProcessElement(p state.Provider, book string, word string, emitWords func(string)) error { // Get the value stored in our state - val, _, err := s.val.Read(p) + val, _, err := s.Val.Read(p) if err != nil { return err } - s.val.Add(p, 1) + s.Val.Add(p, 1) if val > 10000 { // Example of clearing and starting again with an empty bag - s.val.Clear(p) + s.Val.Clear(p) } return nil } -func main() { +func combineState(s beam.Scope, input beam.PCollection) beam.PCollection { // ... // CombineFn param can be a simple fn like this or a structural CombineFn cFn := state.MakeCombiningState[int, int, int]("stateKey", func(a, b int) int { return a + b }) + combined := beam.ParDo(s, combiningStateFn{Val: cFn}, input) + // ... // [END combining_state] - fmt.Print(cFn) + return combined +} + +// [START event_time_timer] + +type eventTimerDoFn struct { + State state.Value[int64] + Timer timers.EventTime +} + +func (fn *eventTimerDoFn) ProcessElement(ts beam.EventTime, sp state.Provider, tp timers.Provider, book, word string, emitWords func(string)) { + // ... + + // Set an event-time timer to the element timestamp. + fn.Timer.Set(tp, ts.ToTime()) + + // ... +} + +func (fn *eventTimerDoFn) OnTimer(sp state.Provider, tp timers.Provider, w beam.Window, key string, timer timers.Context, emitWords func(string)) { + switch timer.Family { + case fn.Timer.Family: + // process callback for this timer + } +} + +func AddEventTimeDoFn(s beam.Scope, in beam.PCollection) beam.PCollection { + return beam.ParDo(s, &eventTimerDoFn{ + // Timers are given family names so their callbacks can be handled independantly. + Timer: timers.InEventTime("processWatermark"), + State: state.MakeValueState[int64]("latest"), + }, in) +} + +// [END event_time_timer] + +// [START processing_time_timer] + +type processingTimerDoFn struct { + Timer timers.ProcessingTime +} + +func (fn *processingTimerDoFn) ProcessElement(sp state.Provider, tp timers.Provider, book, word string, emitWords func(string)) { + // ... + + // Set a timer to go off 30 seconds in the future. + fn.Timer.Set(tp, time.Now().Add(30*time.Second)) + + // ... +} + +func (fn *processingTimerDoFn) OnTimer(sp state.Provider, tp timers.Provider, w beam.Window, key string, timer timers.Context, emitWords func(string)) { + switch timer.Family { + case fn.Timer.Family: + // process callback for this timer + } +} + +func AddProcessingTimeDoFn(s beam.Scope, in beam.PCollection) beam.PCollection { + return beam.ParDo(s, &processingTimerDoFn{ + // Timers are given family names so their callbacks can be handled independantly. + Timer: timers.InProcessingTime("timer"), + }, in) +} + +// [END processing_time_timer] + +// [START dynamic_timer_tags] + +type hasAction interface { + Action() string +} + +type dynamicTagsDoFn[V hasAction] struct { + Timer timers.EventTime +} + +func (fn *dynamicTagsDoFn[V]) ProcessElement(ts beam.EventTime, tp timers.Provider, key string, value V, emitWords func(string)) { + // ... + + // Set a timer to go off 30 seconds in the future. + fn.Timer.Set(tp, ts.ToTime(), timers.WithTag(value.Action())) + + // ... +} + +func (fn *dynamicTagsDoFn[V]) OnTimer(tp timers.Provider, w beam.Window, key string, timer timers.Context, emitWords func(string)) { + switch timer.Family { + case fn.Timer.Family: + tag := timer.Tag // Do something with fired tag + _ = tag + } +} + +func AddDynamicTimerTagsDoFn[V hasAction](s beam.Scope, in beam.PCollection) beam.PCollection { + return beam.ParDo(s, &dynamicTagsDoFn[V]{ + Timer: timers.InEventTime("actionTimers"), + }, in) +} + +// [END dynamic_timer_tags] + +// [START timer_output_timestamps_bad] + +type badTimerOutputTimestampsFn[V any] struct { + ElementBag state.Bag[V] + TimerSet state.Value[bool] + OutputState timers.ProcessingTime +} + +func (fn *badTimerOutputTimestampsFn[V]) ProcessElement(sp state.Provider, tp timers.Provider, key string, value V, emit func(string)) error { + // Add the current element to the bag for this key. + if err := fn.ElementBag.Add(sp, value); err != nil { + return err + } + set, _, err := fn.TimerSet.Read(sp) + if err != nil { + return err + } + if !set { + fn.OutputState.Set(tp, time.Now().Add(1*time.Minute)) + fn.TimerSet.Write(sp, true) + } + return nil +} + +func (fn *badTimerOutputTimestampsFn[V]) OnTimer(sp state.Provider, tp timers.Provider, w beam.Window, key string, timer timers.Context, emit func(string)) error { + switch timer.Family { + case fn.OutputState.Family: + vs, _, err := fn.ElementBag.Read(sp) + if err != nil { + return err + } + for _, v := range vs { + // Output each element + emit(fmt.Sprintf("%v", v)) + } + + fn.ElementBag.Clear(sp) + // Note that the timer has now fired. + fn.TimerSet.Clear(sp) + } + return nil +} + +// [END timer_output_timestamps_bad] + +// [START timer_output_timestamps_good] + +type element[V any] struct { + Timestamp int64 + Value V +} + +type goodTimerOutputTimestampsFn[V any] struct { + ElementBag state.Bag[element[V]] // The bag of elements accumulated. + TimerTimerstamp state.Value[int64] // The timestamp of the timer set. + MinTimestampInBag state.Combining[int64, int64, int64] // The minimum timestamp stored in the bag. + OutputState timers.ProcessingTime // The timestamp of the timer. +} + +func (fn *goodTimerOutputTimestampsFn[V]) ProcessElement(et beam.EventTime, sp state.Provider, tp timers.Provider, key string, value V, emit func(beam.EventTime, string)) error { + // ... + // Add the current element to the bag for this key, and preserve the event time. + if err := fn.ElementBag.Add(sp, element[V]{Timestamp: et.Milliseconds(), Value: value}); err != nil { + return err + } + + // Keep track of the minimum element timestamp currently stored in the bag. + fn.MinTimestampInBag.Add(sp, et.Milliseconds()) + + // If the timer is already set, then reset it at the same time but with an updated output timestamp (otherwise + // we would keep resetting the timer to the future). If there is no timer set, then set one to expire in a minute. + ts, ok, _ := fn.TimerTimerstamp.Read(sp) + var tsToSet time.Time + if ok { + tsToSet = time.UnixMilli(ts) + } else { + tsToSet = time.Now().Add(1 * time.Minute) + } + + minTs, _, _ := fn.MinTimestampInBag.Read(sp) + outputTs := time.UnixMilli(minTs) + + // Setting the outputTimestamp to the minimum timestamp in the bag holds the watermark to that timestamp until the + // timer fires. This allows outputting all the elements with their timestamp. + fn.OutputState.Set(tp, tsToSet, timers.WithOutputTimestamp(outputTs)) + fn.TimerTimerstamp.Write(sp, tsToSet.UnixMilli()) + + return nil +} + +func (fn *goodTimerOutputTimestampsFn[V]) OnTimer(sp state.Provider, tp timers.Provider, w beam.Window, key string, timer timers.Context, emit func(beam.EventTime, string)) error { + switch timer.Family { + case fn.OutputState.Family: + vs, _, err := fn.ElementBag.Read(sp) + if err != nil { + return err + } + for _, v := range vs { + // Output each element with their timestamp + emit(beam.EventTime(v.Timestamp), fmt.Sprintf("%v", v.Value)) + } + + fn.ElementBag.Clear(sp) + // Note that the timer has now fired. + fn.TimerTimerstamp.Clear(sp) + } + return nil +} + +func AddTimedOutputBatching[V any](s beam.Scope, in beam.PCollection) beam.PCollection { + return beam.ParDo(s, &goodTimerOutputTimestampsFn[V]{ + ElementBag: state.MakeBagState[element[V]]("elementBag"), + TimerTimerstamp: state.MakeValueState[int64]("timerTimestamp"), + MinTimestampInBag: state.MakeCombiningState[int64, int64, int64]("minTimestampInBag", func(a, b int64) int64 { + if a < b { + return a + } + return b + }), + OutputState: timers.InProcessingTime("outputState"), + }, in) +} + +// [END timer_output_timestamps_good] + +// updateState exists for example purposes only +func updateState(sp, state, k, v any) {} + +// [START timer_garbage_collection] + +type timerGarbageCollectionFn[V any] struct { + State state.Value[V] // The state for the key. + MaxTimestampInBag state.Combining[int64, int64, int64] // The maximum element timestamp seen so far. + GcTimer timers.EventTime // The timestamp of the timer. +} + +func (fn *timerGarbageCollectionFn[V]) ProcessElement(et beam.EventTime, sp state.Provider, tp timers.Provider, key string, value V, emit func(beam.EventTime, string)) { + updateState(sp, fn.State, key, value) + fn.MaxTimestampInBag.Add(sp, et.Milliseconds()) + + // Set the timer to be one hour after the maximum timestamp seen. This will keep overwriting the same timer, so + // as long as there is activity on this key the state will stay active. Once the key goes inactive for one hour's + // worth of event time (as measured by the watermark), then the gc timer will fire. + maxTs, _, _ := fn.MaxTimestampInBag.Read(sp) + expirationTime := time.UnixMilli(maxTs).Add(1 * time.Hour) + fn.GcTimer.Set(tp, expirationTime) } +func (fn *timerGarbageCollectionFn[V]) OnTimer(sp state.Provider, tp timers.Provider, w beam.Window, key string, timer timers.Context, emit func(beam.EventTime, string)) { + switch timer.Family { + case fn.GcTimer.Family: + // Clear all the state for the key + fn.State.Clear(sp) + fn.MaxTimestampInBag.Clear(sp) + } +} + +func AddTimerGarbageCollection[V any](s beam.Scope, in beam.PCollection) beam.PCollection { + return beam.ParDo(s, &timerGarbageCollectionFn[V]{ + State: state.MakeValueState[V]("timerTimestamp"), + MaxTimestampInBag: state.MakeCombiningState[int64, int64, int64]("maxTimestampInBag", func(a, b int64) int64 { + if a > b { + return a + } + return b + }), + GcTimer: timers.InEventTime("gcTimer"), + }, in) +} + +// [END timer_garbage_collection] + +type Event struct{} + +func (*Event) isClick() bool { return false } + +// [START join_dofn_example] + +type JoinedEvent struct { + View, Click *Event +} + +type joinDoFn struct { + View state.Value[*Event] // Store the view event. + Click state.Value[*Event] // Store the click event. + + MaxTimestampSeen state.Combining[int64, int64, int64] // The maximum element timestamp seen so far. + GcTimer timers.EventTime // The timestamp of the timer. +} + +func (fn *joinDoFn) ProcessElement(et beam.EventTime, sp state.Provider, tp timers.Provider, key string, event *Event, emit func(JoinedEvent)) { + valueState := fn.View + if event.isClick() { + valueState = fn.Click + } + valueState.Write(sp, event) + + view, _, _ := fn.View.Read(sp) + click, _, _ := fn.Click.Read(sp) + if view != nil && click != nil { + emit(JoinedEvent{View: view, Click: click}) + fn.clearState(sp) + return + } + + fn.MaxTimestampSeen.Add(sp, et.Milliseconds()) + expTs, _, _ := fn.MaxTimestampSeen.Read(sp) + fn.GcTimer.Set(tp, time.UnixMilli(expTs).Add(1*time.Hour)) +} + +func (fn *joinDoFn) OnTimer(sp state.Provider, tp timers.Provider, w beam.Window, key string, timer timers.Context, emit func(beam.EventTime, string)) { + switch timer.Family { + case fn.GcTimer.Family: + fn.clearState(sp) + } +} + +func (fn *joinDoFn) clearState(sp state.Provider) { + fn.View.Clear(sp) + fn.Click.Clear(sp) + fn.MaxTimestampSeen.Clear(sp) +} + +func AddJoinDoFn(s beam.Scope, in beam.PCollection) beam.PCollection { + return beam.ParDo(s, &joinDoFn{ + View: state.MakeValueState[*Event]("view"), + Click: state.MakeValueState[*Event]("click"), + MaxTimestampSeen: state.MakeCombiningState[int64, int64, int64]("maxTimestampSeen", func(a, b int64) int64 { + if a > b { + return a + } + return b + }), + GcTimer: timers.InEventTime("gcTimer"), + }, in) +} + +// [END join_dofn_example] + +func sendRpc(...any) {} + +// [START batching_dofn_example] + +type bufferDoFn[V any] struct { + Elements state.Bag[V] // Store the elements buffered so far. + IsTimerSet state.Value[bool] // Keep track of whether a timer is currently set or not. + + OutputElements timers.ProcessingTime // The processing-time timer user to publish the RPC. +} + +func (fn *bufferDoFn[V]) ProcessElement(et beam.EventTime, sp state.Provider, tp timers.Provider, key string, value V) { + fn.Elements.Add(sp, value) + + isSet, _, _ := fn.IsTimerSet.Read(sp) + if !isSet { + fn.OutputElements.Set(tp, time.Now().Add(10*time.Second)) + fn.IsTimerSet.Write(sp, true) + } +} + +func (fn *bufferDoFn[V]) OnTimer(sp state.Provider, tp timers.Provider, w beam.Window, key string, timer timers.Context) { + switch timer.Family { + case fn.OutputElements.Family: + elements, _, _ := fn.Elements.Read(sp) + sendRpc(elements) + fn.Elements.Clear(sp) + fn.IsTimerSet.Clear(sp) + } +} + +func AddBufferDoFn[V any](s beam.Scope, in beam.PCollection) beam.PCollection { + return beam.ParDo(s, &bufferDoFn[V]{ + Elements: state.MakeBagState[V]("elements"), + IsTimerSet: state.MakeValueState[bool]("isTimerSet"), + + OutputElements: timers.InProcessingTime("outputElements"), + }, in) +} + +// [END batching_dofn_example] + type statefulDoFn struct { - s state.Value[int] + S state.Value[int] } func statefulPipeline() beam.PCollection { @@ -686,7 +1125,9 @@ func statefulPipeline() beam.PCollection { // [START windowed_state] - items := beam.ParDo(s, statefulDoFn{}, elements) + items := beam.ParDo(s, statefulDoFn{ + S: state.MakeValueState[int]("S"), + }, elements) out := beam.WindowInto(s, window.NewFixedWindows(24*time.Hour), items) // [END windowed_state] diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md index 30bdb7247d49..0427e50e0b19 100644 --- a/website/www/site/content/en/documentation/programming-guide.md +++ b/website/www/site/content/en/documentation/programming-guide.md @@ -2568,9 +2568,7 @@ Timers and States are explained in more detail in the {{< paragraph class="language-go">}} **Timer and State:** -User defined State parameters can be used in a stateful DoFn. Timers aren't implemented in the Go SDK yet; -see more at [Issue 22737](https://github.com/apache/beam/issues/22737). Once implemented, user defined Timer -parameters can be used in a stateful DoFn. +User defined State and Timer parameters can be used in a stateful DoFn. Timers and States are explained in more detail in the [Timely (and Stateful) Processing with Apache Beam](/blog/2017/08/28/timely-processing.html) blog post. {{< /paragraph >}} @@ -6147,7 +6145,7 @@ _ = (p | 'Read per user' >> ReadPerUser() {{< /highlight >}} {{< highlight go >}} -{{< code_sample "sdks/go/examples/snippets/04transforms.go" state_and_timers >}} +{{< code_sample "sdks/go/examples/snippets/04transforms.go" bag_state >}} {{< /highlight >}} ### 11.2. Deferred state reads {#deferred-state-reads} @@ -6270,7 +6268,7 @@ _ = (p | 'Read per user' >> ReadPerUser() {{< /highlight >}} {{< highlight go >}} -This is not supported yet, see https://github.com/apache/beam/issues/22737. +{{< code_sample "sdks/go/examples/snippets/04transforms.go" event_time_timer >}} {{< /highlight >}} #### 11.3.2. Processing-time timers {#processing-time-timers} @@ -6322,7 +6320,7 @@ _ = (p | 'Read per user' >> ReadPerUser() {{< /highlight >}} {{< highlight go >}} -This is not supported yet, see https://github.com/apache/beam/issues/22737. +{{< code_sample "sdks/go/examples/snippets/04transforms.go" processing_time_timer >}} {{< /highlight >}} #### 11.3.3. Dynamic timer tags {#dynamic-timer-tags} @@ -6383,7 +6381,7 @@ _ = (p | 'Read per user' >> ReadPerUser() {{< /highlight >}} {{< highlight go >}} -This is not supported yet, see https://github.com/apache/beam/issues/22737. +{{< code_sample "sdks/go/examples/snippets/04transforms.go" dynamic_timer_tags >}} {{< /highlight >}} #### 11.3.4. Timer output timestamps {#timer-output-timestamps} @@ -6435,6 +6433,10 @@ perUser.apply(ParDo.of(new DoFn, OutputT>() { })); {{< /highlight >}} +{{< highlight go >}} +{{< code_sample "sdks/go/examples/snippets/04transforms.go" timer_output_timestamps_bad >}} +{{< /highlight >}} + The problem with this code is that the ParDo is buffering elements, however nothing is preventing the watermark from advancing past the timestamp of those elements, so all those elements might be dropped as late data. In order to prevent this from happening, an output timestamp needs to be set on the timer to prevent the watermark from advancing @@ -6471,7 +6473,7 @@ perUser.apply(ParDo.of(new DoFn, OutputT>() { ? Instant.now().plus(Duration.standardMinutes(1)) : new Instant(timerTimestampMs); // Setting the outputTimestamp to the minimum timestamp in the bag holds the watermark to that timestamp until the // timer fires. This allows outputting all the elements with their timestamp. - timer.withOutputTimestamp(minTimestamp.read()).set(timerToSet). + timer.withOutputTimestamp(minTimestamp.read()).s et(timerToSet). timerTimestamp.write(timerToSet.getMillis()); } @@ -6494,7 +6496,7 @@ Timer output timestamps is not yet supported in Python SDK. See https://github.c {{< /highlight >}} {{< highlight go >}} -This is not supported yet, see https://github.com/apache/beam/issues/22737. +{{< code_sample "sdks/go/examples/snippets/04transforms.go" timer_output_timestamps_good >}} {{< /highlight >}} ### 11.4. Garbage collecting state {#garbage-collecting-state} @@ -6624,7 +6626,7 @@ _ = (p | 'Read per user' >> ReadPerUser() {{< /highlight >}} {{< highlight go >}} -This is not supported yet, see https://github.com/apache/beam/issues/22737. +{{< code_sample "sdks/go/examples/snippets/04transforms.go" timer_garbage_collection >}} {{< /highlight >}} ### 11.5. State and timers examples {#state-timers-examples} @@ -6764,6 +6766,11 @@ _ = (p | 'EventsPerLinkId' >> ReadPerLinkEvents() | 'Join DoFn' >> beam.ParDo(JoinDoFn())) {{< /highlight >}} + +{{< highlight go >}} +{{< code_sample "sdks/go/examples/snippets/04transforms.go" join_dofn_example >}} +{{< /highlight >}} + #### 11.5.2. Batching RPCs {#batching-rpcs} In this example, input elements are being forwarded to an external RPC service. The RPC accepts batch requests - @@ -6830,6 +6837,12 @@ class BufferDoFn(DoFn): {{< /highlight >}} + +{{< highlight go >}} +{{< code_sample "sdks/go/examples/snippets/04transforms.go" batching_dofn_example >}} +{{< /highlight >}} + + ## 12. Splittable `DoFns` {#splittable-dofns} A Splittable `DoFn` (SDF) enables users to create modular components containing I/Os (and some advanced