From f84a3c370043d2a7f1fbdf9d2c58c0c15fb65bd0 Mon Sep 17 00:00:00 2001 From: Matt McShane Date: Fri, 12 May 2023 17:58:07 -0400 Subject: [PATCH] Handful of minor changes to the update package (#4331) * Comment wording and spelling fixes * Make update onComplete callback optional Nice to be able to construct with update.New without requiring a callback func. * Prefer t.Cleanup to defer in sub-tests * Remove atomic state read/write In the end, the state field is not accessed concurrently. * I don't care much for this revive rule * remove superflous parens --- service/history/api/pollupdate/api_test.go | 4 +- .../history/workflow/update/export_test.go | 7 +++ service/history/workflow/update/registry.go | 22 +++++--- service/history/workflow/update/state.go | 15 +----- service/history/workflow/update/update.go | 36 +++++++------ .../history/workflow/update/update_test.go | 50 +++++++++---------- 6 files changed, 71 insertions(+), 63 deletions(-) diff --git a/service/history/api/pollupdate/api_test.go b/service/history/api/pollupdate/api_test.go index 7d48ebf903f..9e1db5e60af 100644 --- a/service/history/api/pollupdate/api_test.go +++ b/service/history/api/pollupdate/api_test.go @@ -141,7 +141,7 @@ func TestPollOutcome(t *testing.T) { }) t.Run("future timeout", func(t *testing.T) { reg.FindFunc = func(ctx context.Context, updateID string) (*update.Update, bool) { - return update.New(updateID, func() {}), true + return update.New(updateID), true } ctx, cncl := context.WithTimeout(context.Background(), 5*time.Millisecond) defer cncl() @@ -149,7 +149,7 @@ func TestPollOutcome(t *testing.T) { require.Error(t, err) }) t.Run("get an outcome", func(t *testing.T) { - upd := update.New(updateID, func() {}) + upd := update.New(updateID) reg.FindFunc = func(ctx context.Context, updateID string) (*update.Update, bool) { return upd, true } diff --git a/service/history/workflow/update/export_test.go b/service/history/workflow/update/export_test.go index bf2e740aaa8..3361bbc3599 100644 --- a/service/history/workflow/update/export_test.go +++ b/service/history/workflow/update/export_test.go @@ -32,3 +32,10 @@ var ( NewAccepted = newAccepted NewCompleted = newCompleted ) + +// ObserveCompletion exporses withOnComplete to unit tests +// +//revive:disable-next-line:unexported-return for testing +func ObserveCompletion(b *bool) updateOpt { + return withCompletionCallback(func() { *b = true }) +} diff --git a/service/history/workflow/update/registry.go b/service/history/workflow/update/registry.go index 3ffcef58839..85afea3beb9 100644 --- a/service/history/workflow/update/registry.go +++ b/service/history/workflow/update/registry.go @@ -42,6 +42,8 @@ import ( ) type ( + // Registry maintains a set of updates that have been admitted to run + // against a workflow execution. Registry interface { // FindOrCreate finds an existing Update or creates a new one. The second // return value (bool) indicates whether the Update returned already @@ -50,7 +52,7 @@ type ( FindOrCreate(ctx context.Context, protocolInstanceID string) (*Update, bool, error) // Find finds an existing update in this Registry but does not create a - // new update it is absent. + // new update if no update is found. Find(ctx context.Context, protocolInstanceID string) (*Update, bool) // ReadOutoundMessages polls each registered Update for outbound @@ -66,7 +68,7 @@ type ( // sent messages to a worker. HasOutgoing() bool - // Len observes the number of updates in this Registry. + // Len observes the number of incomplete updates in this Registry. Len() int } @@ -91,6 +93,8 @@ type ( //revive:disable:unexported-return I *want* it to be unexported +// WithInFlightLimit provides an optional limit to the number of incomplete +// updates that a Registry instance will allow. func WithInFlightLimit(f func() int) regOpt { return func(r *RegistryImpl) { r.maxInFlight = f @@ -215,12 +219,14 @@ func (r *RegistryImpl) Len() int { return len(r.updates) } -func (r *RegistryImpl) remover(id string) func() { - return func() { - r.mu.Lock() - defer r.mu.Unlock() - delete(r.updates, id) - } +func (r *RegistryImpl) remover(id string) updateOpt { + return withCompletionCallback( + func() { + r.mu.Lock() + defer r.mu.Unlock() + delete(r.updates, id) + }, + ) } func (r *RegistryImpl) admit(context.Context) error { diff --git a/service/history/workflow/update/state.go b/service/history/workflow/update/state.go index 8d391b81e00..a5e309f3702 100644 --- a/service/history/workflow/update/state.go +++ b/service/history/workflow/update/state.go @@ -24,8 +24,6 @@ package update -import "sync/atomic" - type ( state uint32 stateSet uint32 @@ -61,15 +59,6 @@ func (s state) String() string { return "unrecognized state" } -func (s *state) Is(other state) bool { - return state(atomic.LoadUint32((*uint32)(s))) == other -} - -func (s *state) Set(other state) state { - return state(atomic.SwapUint32((*uint32)(s), uint32(other))) -} - -func (s *state) Matches(mask stateSet) bool { - actual := atomic.LoadUint32((*uint32)(s)) - return actual&uint32(mask) == actual +func (s state) Matches(mask stateSet) bool { + return uint32(s)&uint32(mask) == uint32(s) } diff --git a/service/history/workflow/update/update.go b/service/history/workflow/update/update.go index 7b8c8f8ebe2..2f9eaa40636 100644 --- a/service/history/workflow/update/update.go +++ b/service/history/workflow/update/update.go @@ -66,19 +66,19 @@ type ( // messages from the go.temporal.io/api/update/v1 package. The update state // machine is straightforward except in that it provides "provisional" // in-between states where the update has received a message that has - // updated its internal state but those updates have not been made visible + // modified its internal state but those changes have not been made visible // to clients yet (e.g. accepted or outcome futures have not been set yet). - // The effects are bound to the EventStore's effect.Set and will be - // triggered withen those effects are applied. + // The observable changes are bound to the EventStore's effect.Controller + // and will be triggered when those effects are applied. Update struct { // accessed only while holding workflow lock id string + state state request *protocolpb.Message // nil when not in stateRequested onComplete func() instrumentation *instrumentation // these fields might be accessed while not holding the workflow lock - state state accepted future.Future[*failurepb.Failure] outcome future.Future[*updatepb.Outcome] } @@ -88,11 +88,11 @@ type ( // New creates a new Update instance with the provided ID that will call the // onComplete callback when it completes. -func New(id string, onComplete func(), opts ...updateOpt) *Update { +func New(id string, opts ...updateOpt) *Update { upd := &Update{ id: id, state: stateAdmitted, - onComplete: onComplete, + onComplete: func() {}, instrumentation: &noopInstrumentation, accepted: future.NewFuture[*failurepb.Failure](), outcome: future.NewFuture[*updatepb.Outcome](), @@ -103,17 +103,23 @@ func New(id string, onComplete func(), opts ...updateOpt) *Update { return upd } +func withCompletionCallback(cb func()) updateOpt { + return func(u *Update) { + u.onComplete = cb + } +} + func withInstrumentation(i *instrumentation) updateOpt { return func(u *Update) { u.instrumentation = i } } -func newAccepted(id string, onComplete func(), opts ...updateOpt) *Update { +func newAccepted(id string, opts ...updateOpt) *Update { upd := &Update{ id: id, state: stateAccepted, - onComplete: onComplete, + onComplete: func() {}, instrumentation: &noopInstrumentation, accepted: future.NewReadyFuture[*failurepb.Failure](nil, nil), outcome: future.NewFuture[*updatepb.Outcome](), @@ -132,6 +138,7 @@ func newCompleted( upd := &Update{ id: id, state: stateCompleted, + onComplete: func() {}, instrumentation: &noopInstrumentation, accepted: future.NewReadyFuture[*failurepb.Failure](nil, nil), outcome: outcomeFuture, @@ -211,7 +218,7 @@ func (u *Update) OnMessage( // ReadOutgoingMessages loads any oubound messages from this Update state // machine into the output slice provided. func (u *Update) ReadOutgoingMessages(out *[]*protocolpb.Message) { - if !u.state.Is(stateRequested) { + if u.state != stateRequested { // Update only sends messages to the workflow when it is in // stateRequested return @@ -229,7 +236,7 @@ func (u *Update) onRequestMsg( req *updatepb.Request, eventStore EventStore, ) error { - if !u.state.Is(stateAdmitted) { + if u.state != stateAdmitted { return nil } if err := validateRequestMsg(u.id, req); err != nil { @@ -280,7 +287,7 @@ func (u *Update) onAcceptanceMsg( return nil } -// onRejectionMsg expectes the Update stae to be in stateRequested and returns +// onRejectionMsg expectes the Update state to be stateRequested and returns // an error if it finds otherwise. On commit of buffered effects the state // machine transitions to stateCompleted and the accepted and outcome futures // are both completed with the failurepb.Failure value from the @@ -312,7 +319,7 @@ func (u *Update) onRejectionMsg( return nil } -// onResponseMsg expectes the Update to be in either stateProvisionallyAccepted +// onResponseMsg expects the Update to be in either stateProvisionallyAccepted // or stateAccepted and returns an error if it finds otherwise. On commit of // buffered effects the state machine will transtion to stateCompleted and the // outcome future is completed with the updatepb.Outcome from the @@ -348,7 +355,7 @@ func (u *Update) hasBeenSeenByWorkflowExecution() bool { } func (u *Update) hasOutgoingMessage() bool { - return u.state.Is(stateRequested) + return u.state == stateRequested } func (u *Update) checkState(msg proto.Message, expected state) error { @@ -366,7 +373,8 @@ func (u *Update) checkStateSet(msg proto.Message, allowed stateSet) error { // setState assigns the current state to a new value returning the original // value. func (u *Update) setState(newState state) state { - prevState := u.state.Set(newState) + prevState := u.state + u.state = newState u.instrumentation.StateChange(u.id, prevState, newState) return prevState } diff --git a/service/history/workflow/update/update_test.go b/service/history/workflow/update/update_test.go index da4b154b88f..fac15ca7067 100644 --- a/service/history/workflow/update/update_test.go +++ b/service/history/workflow/update/update_test.go @@ -49,8 +49,6 @@ func successOutcome(t *testing.T, s string) *updatepb.Outcome { } } -func ignoreCompletion() {} - var eventStoreUnused update.EventStore type mockEventStore struct { @@ -87,7 +85,7 @@ func (m mockEventStore) AddWorkflowExecutionUpdateCompletedEvent( func TestNilMessage(t *testing.T) { t.Parallel() ctx := context.Background() - upd := update.New(t.Name()+"update-id", ignoreCompletion) + upd := update.New(t.Name() + "update-id") err := upd.OnMessage(ctx, nil, mockEventStore{}) var invalidArg *serviceerror.InvalidArgument require.ErrorAs(t, err, &invalidArg) @@ -96,7 +94,7 @@ func TestNilMessage(t *testing.T) { func TestUnsupportedMessageType(t *testing.T) { t.Parallel() ctx := context.Background() - upd := update.New(t.Name()+"update-id", ignoreCompletion) + upd := update.New(t.Name() + "update-id") notAMessageType := historypb.HistoryEvent{} err := upd.OnMessage(ctx, ¬AMessageType, mockEventStore{}) var invalidArg *serviceerror.InvalidArgument @@ -139,7 +137,7 @@ func TestRequestAcceptComplete(t *testing.T) { return nil, nil }, } - upd = update.New(meta.UpdateId, func() { completed = true }) + upd = update.New(meta.UpdateId, update.ObserveCompletion(&completed)) ) t.Run("request", func(t *testing.T) { @@ -174,7 +172,7 @@ func TestRequestAcceptComplete(t *testing.T) { require.False(t, completed) ctx, cncl := context.WithTimeout(ctx, 5*time.Millisecond) - defer cncl() + t.Cleanup(cncl) _, err = upd.WaitAccepted(ctx) require.ErrorIs(t, err, context.DeadlineExceeded, "update acceptance should not be observable until effects are applied") @@ -194,7 +192,7 @@ func TestRequestAcceptComplete(t *testing.T) { require.False(t, completed) ctx, cncl := context.WithTimeout(ctx, 5*time.Millisecond) - defer cncl() + t.Cleanup(cncl) _, err = upd.WaitOutcome(ctx) require.ErrorIs(t, err, context.DeadlineExceeded, "update outcome should not be observable until effects are applied") @@ -221,7 +219,7 @@ func TestRequestReject(t *testing.T) { effects = effect.Buffer{} store = mockEventStore{Controller: &effects} updateID = t.Name() + "-update-id" - upd = update.New(updateID, func() { completed = true }) + upd = update.New(updateID, update.ObserveCompletion(&completed)) req = updatepb.Request{ Meta: &updatepb.Meta{UpdateId: updateID}, Input: &updatepb.Input{Name: t.Name()}, @@ -247,14 +245,14 @@ func TestRequestReject(t *testing.T) { { ctx, cncl := context.WithTimeout(ctx, 5*time.Millisecond) - defer cncl() + t.Cleanup(cncl) _, err := upd.WaitAccepted(ctx) require.ErrorIs(t, err, context.DeadlineExceeded, "update acceptance failure should not be observable until effects are applied") } { ctx, cncl := context.WithTimeout(ctx, 5*time.Millisecond) - defer cncl() + t.Cleanup(cncl) _, err := upd.WaitOutcome(ctx) require.ErrorIs(t, err, context.DeadlineExceeded, "update acceptance failure should not be observable until effects are applied") @@ -281,7 +279,7 @@ func TestWithProtocolMessage(t *testing.T) { ctx = context.Background() store = mockEventStore{Controller: &effect.Buffer{}} updateID = t.Name() + "-update-id" - upd = update.New(updateID, ignoreCompletion) + upd = update.New(updateID) req = updatepb.Request{ Meta: &updatepb.Meta{UpdateId: updateID}, Input: &updatepb.Input{Name: t.Name()}, @@ -312,7 +310,7 @@ func TestMessageOutput(t *testing.T) { effects effect.Buffer store = mockEventStore{Controller: &effects} updateID = t.Name() + "-update-id" - upd = update.New(updateID, ignoreCompletion) + upd = update.New(updateID) req = updatepb.Request{ Meta: &updatepb.Meta{UpdateId: updateID}, Input: &updatepb.Input{Name: t.Name()}, @@ -332,7 +330,7 @@ func TestMessageOutput(t *testing.T) { require.Len(t, msgs, 1) }) t.Run("after requested", func(t *testing.T) { - upd := update.NewAccepted(updateID, ignoreCompletion) + upd := update.NewAccepted(updateID) msgs := make([]*protocolpb.Message, 0) upd.ReadOutgoingMessages(&msgs) require.Empty(t, msgs) @@ -343,7 +341,7 @@ func TestRejectAfterAcceptFails(t *testing.T) { t.Parallel() ctx := context.Background() updateID := t.Name() + "-update-id" - upd := update.NewAccepted(updateID, ignoreCompletion) + upd := update.NewAccepted(updateID) err := upd.OnMessage(ctx, &updatepb.Rejection{}, eventStoreUnused) var invalidArg *serviceerror.InvalidArgument require.ErrorAs(t, err, &invalidArg) @@ -364,7 +362,7 @@ func TestAcceptanceAndResponseInSameMessageBatch(t *testing.T) { req = updatepb.Request{Meta: &meta, Input: &updatepb.Input{Name: t.Name()}} acpt = updatepb.Acceptance{AcceptedRequest: &req, AcceptedRequestMessageId: "x"} resp = updatepb.Response{Meta: &meta, Outcome: successOutcome(t, "success!")} - upd = update.New(meta.UpdateId, func() { completed = true }) + upd = update.New(meta.UpdateId, update.ObserveCompletion(&completed)) ) require.NoError(t, upd.OnMessage(ctx, &req, store)) @@ -382,7 +380,7 @@ func TestDuplicateRequestNoError(t *testing.T) { t.Parallel() ctx := context.Background() updateID := t.Name() + "-update-id" - upd := update.NewAccepted(updateID, ignoreCompletion) + upd := update.NewAccepted(updateID) err := upd.OnMessage(ctx, &updatepb.Request{}, eventStoreUnused) require.NoError(t, err, "a second request message should be ignored, not cause an error") @@ -398,13 +396,13 @@ func TestMessageValidation(t *testing.T) { var invalidArg *serviceerror.InvalidArgument updateID := t.Name() + "-update-id" t.Run("invalid request msg", func(t *testing.T) { - upd := update.New("", ignoreCompletion) + upd := update.New("") err := upd.OnMessage(ctx, &updatepb.Request{}, eventStoreUnused) require.ErrorAs(t, err, &invalidArg) require.ErrorContains(t, err, "invalid") }) t.Run("invalid acceptance msg", func(t *testing.T) { - upd := update.New(updateID, ignoreCompletion) + upd := update.New(updateID) store := mockEventStore{Controller: effect.Immediate(ctx)} validReq := updatepb.Request{ Meta: &updatepb.Meta{UpdateId: updateID}, @@ -417,7 +415,7 @@ func TestMessageValidation(t *testing.T) { require.ErrorContains(t, err, "invalid") }) t.Run("invalid rejection msg", func(t *testing.T) { - upd := update.New(updateID, ignoreCompletion) + upd := update.New(updateID) store := mockEventStore{ Controller: effect.Immediate(ctx), } @@ -432,7 +430,7 @@ func TestMessageValidation(t *testing.T) { require.ErrorContains(t, err, "invalid") }) t.Run("invalid response msg", func(t *testing.T) { - upd := update.NewAccepted("", ignoreCompletion) + upd := update.NewAccepted("") err := upd.OnMessage( ctx, &updatepb.Response{}, @@ -460,7 +458,7 @@ func TestDoubleRollback(t *testing.T) { resp = updatepb.Response{Meta: &meta, Outcome: successOutcome(t, "success!")} ) - upd := update.New(meta.UpdateId, func() { completed = true }) + upd := update.New(meta.UpdateId, update.ObserveCompletion(&completed)) require.NoError(t, upd.OnMessage(ctx, &req, store)) effects.Apply(ctx) @@ -473,13 +471,13 @@ func TestDoubleRollback(t *testing.T) { t.Run("not accepted", func(t *testing.T) { ctx, cncl := context.WithTimeout(ctx, 5*time.Millisecond) - defer cncl() + t.Cleanup(cncl) _, err := upd.WaitAccepted(ctx) require.ErrorIs(t, err, context.DeadlineExceeded) }) t.Run("not completed", func(t *testing.T) { ctx, cncl := context.WithTimeout(ctx, 5*time.Millisecond) - defer cncl() + t.Cleanup(cncl) _, err := upd.WaitOutcome(ctx) require.ErrorIs(t, err, context.DeadlineExceeded) }) @@ -497,7 +495,7 @@ func TestRollbackCompletion(t *testing.T) { effects = effect.Buffer{} store = mockEventStore{Controller: &effects} updateID = t.Name() + "-update-id" - upd = update.NewAccepted(updateID, func() { completed = true }) + upd = update.NewAccepted(updateID, update.ObserveCompletion(&completed)) resp = updatepb.Response{ Meta: &updatepb.Meta{UpdateId: updateID}, Outcome: successOutcome(t, "success!"), @@ -512,7 +510,7 @@ func TestRollbackCompletion(t *testing.T) { t.Run("not completed", func(t *testing.T) { ctx, cncl := context.WithTimeout(ctx, 5*time.Millisecond) - defer cncl() + t.Cleanup(cncl) _, err := upd.WaitOutcome(ctx) require.ErrorIs(t, err, context.DeadlineExceeded) require.False(t, completed) @@ -529,7 +527,7 @@ func TestRejectionWithAcceptanceWaiter(t *testing.T) { ctx = context.Background() store = mockEventStore{Controller: effect.Immediate(ctx)} updateID = t.Name() + "-update-id" - upd = update.New(updateID, ignoreCompletion) + upd = update.New(updateID) req = updatepb.Request{ Meta: &updatepb.Meta{UpdateId: updateID}, Input: &updatepb.Input{Name: "not_empty"},