Skip to content

Commit

Permalink
Handful of minor changes to the update package (#4331)
Browse files Browse the repository at this point in the history
* Comment wording and spelling fixes

* Make update onComplete callback optional

Nice to be able to construct with update.New without requiring a
callback func.

* Prefer t.Cleanup to defer in sub-tests

* Remove atomic state read/write

In the end, the state field is not accessed concurrently.

* I don't care much for this revive rule

* remove superflous parens
  • Loading branch information
Matt McShane authored May 12, 2023
1 parent f07921b commit f84a3c3
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 63 deletions.
4 changes: 2 additions & 2 deletions service/history/api/pollupdate/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,15 @@ func TestPollOutcome(t *testing.T) {
})
t.Run("future timeout", func(t *testing.T) {
reg.FindFunc = func(ctx context.Context, updateID string) (*update.Update, bool) {
return update.New(updateID, func() {}), true
return update.New(updateID), true
}
ctx, cncl := context.WithTimeout(context.Background(), 5*time.Millisecond)
defer cncl()
_, err := pollupdate.Invoke(ctx, &req, wfcc)
require.Error(t, err)
})
t.Run("get an outcome", func(t *testing.T) {
upd := update.New(updateID, func() {})
upd := update.New(updateID)
reg.FindFunc = func(ctx context.Context, updateID string) (*update.Update, bool) {
return upd, true
}
Expand Down
7 changes: 7 additions & 0 deletions service/history/workflow/update/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,10 @@ var (
NewAccepted = newAccepted
NewCompleted = newCompleted
)

// ObserveCompletion exporses withOnComplete to unit tests
//
//revive:disable-next-line:unexported-return for testing
func ObserveCompletion(b *bool) updateOpt {
return withCompletionCallback(func() { *b = true })
}
22 changes: 14 additions & 8 deletions service/history/workflow/update/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import (
)

type (
// Registry maintains a set of updates that have been admitted to run
// against a workflow execution.
Registry interface {
// FindOrCreate finds an existing Update or creates a new one. The second
// return value (bool) indicates whether the Update returned already
Expand All @@ -50,7 +52,7 @@ type (
FindOrCreate(ctx context.Context, protocolInstanceID string) (*Update, bool, error)

// Find finds an existing update in this Registry but does not create a
// new update it is absent.
// new update if no update is found.
Find(ctx context.Context, protocolInstanceID string) (*Update, bool)

// ReadOutoundMessages polls each registered Update for outbound
Expand All @@ -66,7 +68,7 @@ type (
// sent messages to a worker.
HasOutgoing() bool

// Len observes the number of updates in this Registry.
// Len observes the number of incomplete updates in this Registry.
Len() int
}

Expand All @@ -91,6 +93,8 @@ type (

//revive:disable:unexported-return I *want* it to be unexported

// WithInFlightLimit provides an optional limit to the number of incomplete
// updates that a Registry instance will allow.
func WithInFlightLimit(f func() int) regOpt {
return func(r *RegistryImpl) {
r.maxInFlight = f
Expand Down Expand Up @@ -215,12 +219,14 @@ func (r *RegistryImpl) Len() int {
return len(r.updates)
}

func (r *RegistryImpl) remover(id string) func() {
return func() {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.updates, id)
}
func (r *RegistryImpl) remover(id string) updateOpt {
return withCompletionCallback(
func() {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.updates, id)
},
)
}

func (r *RegistryImpl) admit(context.Context) error {
Expand Down
15 changes: 2 additions & 13 deletions service/history/workflow/update/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@

package update

import "sync/atomic"

type (
state uint32
stateSet uint32
Expand Down Expand Up @@ -61,15 +59,6 @@ func (s state) String() string {
return "unrecognized state"
}

func (s *state) Is(other state) bool {
return state(atomic.LoadUint32((*uint32)(s))) == other
}

func (s *state) Set(other state) state {
return state(atomic.SwapUint32((*uint32)(s), uint32(other)))
}

func (s *state) Matches(mask stateSet) bool {
actual := atomic.LoadUint32((*uint32)(s))
return actual&uint32(mask) == actual
func (s state) Matches(mask stateSet) bool {
return uint32(s)&uint32(mask) == uint32(s)
}
36 changes: 22 additions & 14 deletions service/history/workflow/update/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,19 @@ type (
// messages from the go.temporal.io/api/update/v1 package. The update state
// machine is straightforward except in that it provides "provisional"
// in-between states where the update has received a message that has
// updated its internal state but those updates have not been made visible
// modified its internal state but those changes have not been made visible
// to clients yet (e.g. accepted or outcome futures have not been set yet).
// The effects are bound to the EventStore's effect.Set and will be
// triggered withen those effects are applied.
// The observable changes are bound to the EventStore's effect.Controller
// and will be triggered when those effects are applied.
Update struct {
// accessed only while holding workflow lock
id string
state state
request *protocolpb.Message // nil when not in stateRequested
onComplete func()
instrumentation *instrumentation

// these fields might be accessed while not holding the workflow lock
state state
accepted future.Future[*failurepb.Failure]
outcome future.Future[*updatepb.Outcome]
}
Expand All @@ -88,11 +88,11 @@ type (

// New creates a new Update instance with the provided ID that will call the
// onComplete callback when it completes.
func New(id string, onComplete func(), opts ...updateOpt) *Update {
func New(id string, opts ...updateOpt) *Update {
upd := &Update{
id: id,
state: stateAdmitted,
onComplete: onComplete,
onComplete: func() {},
instrumentation: &noopInstrumentation,
accepted: future.NewFuture[*failurepb.Failure](),
outcome: future.NewFuture[*updatepb.Outcome](),
Expand All @@ -103,17 +103,23 @@ func New(id string, onComplete func(), opts ...updateOpt) *Update {
return upd
}

func withCompletionCallback(cb func()) updateOpt {
return func(u *Update) {
u.onComplete = cb
}
}

func withInstrumentation(i *instrumentation) updateOpt {
return func(u *Update) {
u.instrumentation = i
}
}

func newAccepted(id string, onComplete func(), opts ...updateOpt) *Update {
func newAccepted(id string, opts ...updateOpt) *Update {
upd := &Update{
id: id,
state: stateAccepted,
onComplete: onComplete,
onComplete: func() {},
instrumentation: &noopInstrumentation,
accepted: future.NewReadyFuture[*failurepb.Failure](nil, nil),
outcome: future.NewFuture[*updatepb.Outcome](),
Expand All @@ -132,6 +138,7 @@ func newCompleted(
upd := &Update{
id: id,
state: stateCompleted,
onComplete: func() {},
instrumentation: &noopInstrumentation,
accepted: future.NewReadyFuture[*failurepb.Failure](nil, nil),
outcome: outcomeFuture,
Expand Down Expand Up @@ -211,7 +218,7 @@ func (u *Update) OnMessage(
// ReadOutgoingMessages loads any oubound messages from this Update state
// machine into the output slice provided.
func (u *Update) ReadOutgoingMessages(out *[]*protocolpb.Message) {
if !u.state.Is(stateRequested) {
if u.state != stateRequested {
// Update only sends messages to the workflow when it is in
// stateRequested
return
Expand All @@ -229,7 +236,7 @@ func (u *Update) onRequestMsg(
req *updatepb.Request,
eventStore EventStore,
) error {
if !u.state.Is(stateAdmitted) {
if u.state != stateAdmitted {
return nil
}
if err := validateRequestMsg(u.id, req); err != nil {
Expand Down Expand Up @@ -280,7 +287,7 @@ func (u *Update) onAcceptanceMsg(
return nil
}

// onRejectionMsg expectes the Update stae to be in stateRequested and returns
// onRejectionMsg expectes the Update state to be stateRequested and returns
// an error if it finds otherwise. On commit of buffered effects the state
// machine transitions to stateCompleted and the accepted and outcome futures
// are both completed with the failurepb.Failure value from the
Expand Down Expand Up @@ -312,7 +319,7 @@ func (u *Update) onRejectionMsg(
return nil
}

// onResponseMsg expectes the Update to be in either stateProvisionallyAccepted
// onResponseMsg expects the Update to be in either stateProvisionallyAccepted
// or stateAccepted and returns an error if it finds otherwise. On commit of
// buffered effects the state machine will transtion to stateCompleted and the
// outcome future is completed with the updatepb.Outcome from the
Expand Down Expand Up @@ -348,7 +355,7 @@ func (u *Update) hasBeenSeenByWorkflowExecution() bool {
}

func (u *Update) hasOutgoingMessage() bool {
return u.state.Is(stateRequested)
return u.state == stateRequested
}

func (u *Update) checkState(msg proto.Message, expected state) error {
Expand All @@ -366,7 +373,8 @@ func (u *Update) checkStateSet(msg proto.Message, allowed stateSet) error {
// setState assigns the current state to a new value returning the original
// value.
func (u *Update) setState(newState state) state {
prevState := u.state.Set(newState)
prevState := u.state
u.state = newState
u.instrumentation.StateChange(u.id, prevState, newState)
return prevState
}
Loading

0 comments on commit f84a3c3

Please sign in to comment.