From 11659f804ff8d51f898df053e540a6d1c07c7159 Mon Sep 17 00:00:00 2001 From: Matt McShane Date: Mon, 8 May 2023 21:44:17 -0400 Subject: [PATCH 1/6] Fully implement the PollUpdate api Look up the referenced update and wait on its outcome. --- service/history/api/pollupdate/api.go | 81 +++++++++ service/history/api/pollupdate/api_test.go | 184 +++++++++++++++++++++ service/history/api/updateworkflow/api.go | 16 +- service/history/historyEngine.go | 4 +- 4 files changed, 282 insertions(+), 3 deletions(-) create mode 100644 service/history/api/pollupdate/api.go create mode 100644 service/history/api/pollupdate/api_test.go diff --git a/service/history/api/pollupdate/api.go b/service/history/api/pollupdate/api.go new file mode 100644 index 00000000000..a80ef8a18aa --- /dev/null +++ b/service/history/api/pollupdate/api.go @@ -0,0 +1,81 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package pollupdate + +import ( + "context" + "fmt" + + "go.temporal.io/api/serviceerror" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/common/definition" + "go.temporal.io/server/service/history/api" + "go.temporal.io/server/service/history/workflow" + "go.temporal.io/server/service/history/workflow/update" +) + +func Invoke( + ctx context.Context, + req *historyservice.PollWorkflowExecutionUpdateRequest, + ctxLookup api.WorkflowConsistencyChecker, +) (*historyservice.PollWorkflowExecutionUpdateResponse, error) { + updateRef := req.GetRequest().GetUpdateRef() + wfexec := updateRef.GetWorkflowExecution() + upd, ok, err := func() (*update.Update, bool, error) { + wfctx, err := ctxLookup.GetWorkflowContext( + ctx, + nil, + api.BypassMutableStateConsistencyPredicate, + definition.NewWorkflowKey( + req.GetNamespaceId(), + wfexec.GetWorkflowId(), + wfexec.GetRunId(), + ), + workflow.LockPriorityHigh, + ) + if err != nil { + return nil, false, err + } + defer wfctx.GetReleaseFn()(nil) + upd, found := wfctx.GetUpdateRegistry(ctx).Find(ctx, updateRef.UpdateId) + return upd, found, nil + }() + if err != nil { + return nil, err + } + if !ok { + return nil, serviceerror.NewNotFound(fmt.Sprintf("update %q not found", updateRef.GetUpdateId())) + } + outcome, err := upd.WaitOutcome(ctx) + if err != nil { + return nil, err + } + return &historyservice.PollWorkflowExecutionUpdateResponse{ + Response: &workflowservice.PollWorkflowExecutionUpdateResponse{ + Outcome: outcome, + }, + }, nil +} diff --git a/service/history/api/pollupdate/api_test.go b/service/history/api/pollupdate/api_test.go new file mode 100644 index 00000000000..e920054d706 --- /dev/null +++ b/service/history/api/pollupdate/api_test.go @@ -0,0 +1,184 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package pollupdate_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + commonpb "go.temporal.io/api/common/v1" + failurepb "go.temporal.io/api/failure/v1" + "go.temporal.io/api/serviceerror" + updatepb "go.temporal.io/api/update/v1" + "go.temporal.io/api/workflowservice/v1" + clockspb "go.temporal.io/server/api/clock/v1" + "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/common/definition" + "go.temporal.io/server/service/history/api" + "go.temporal.io/server/service/history/api/pollupdate" + "go.temporal.io/server/service/history/workflow" + wcache "go.temporal.io/server/service/history/workflow/cache" + "go.temporal.io/server/service/history/workflow/update" +) + +type ( + mockWFConsistencyChecker struct { + api.WorkflowConsistencyChecker + GetWorkflowContextFunc func( + ctx context.Context, + reqClock *clockspb.VectorClock, + consistencyPredicate api.MutableStateConsistencyPredicate, + workflowKey definition.WorkflowKey, + lockPriority workflow.LockPriority, + ) (api.WorkflowContext, error) + } + + mockAPICtx struct { + api.WorkflowContext + GetUpdateRegistryFunc func(context.Context) update.Registry + GetReleaseFnFunc func() wcache.ReleaseCacheFunc + } + + mockReg struct { + update.Registry + FindFunc func(context.Context, string) (*update.Update, bool) + } + + mockUpdateEventStore struct { + update.EventStore + } +) + +func (mockUpdateEventStore) OnAfterCommit(f func(context.Context)) { f(context.TODO()) } +func (mockUpdateEventStore) OnAfterRollback(f func(context.Context)) {} + +func (m mockWFConsistencyChecker) GetWorkflowContext( + ctx context.Context, + clock *clockspb.VectorClock, + pred api.MutableStateConsistencyPredicate, + wfKey definition.WorkflowKey, + prio workflow.LockPriority, +) (api.WorkflowContext, error) { + return m.GetWorkflowContextFunc(ctx, clock, pred, wfKey, prio) +} + +func (m mockAPICtx) GetReleaseFn() wcache.ReleaseCacheFunc { + return m.GetReleaseFnFunc() +} + +func (m mockAPICtx) GetUpdateRegistry(ctx context.Context) update.Registry { + return m.GetUpdateRegistryFunc(ctx) +} + +func (m mockReg) Find(ctx context.Context, updateID string) (*update.Update, bool) { + return m.FindFunc(ctx, updateID) +} + +func TestPollOutcome(t *testing.T) { + reg := mockReg{} + apiCtx := mockAPICtx{ + GetReleaseFnFunc: func() wcache.ReleaseCacheFunc { return func(error) {} }, + GetUpdateRegistryFunc: func(context.Context) update.Registry { + return reg + }, + } + wfcc := mockWFConsistencyChecker{ + GetWorkflowContextFunc: func( + ctx context.Context, + reqClock *clockspb.VectorClock, + consistencyPredicate api.MutableStateConsistencyPredicate, + workflowKey definition.WorkflowKey, + lockPriority workflow.LockPriority, + ) (api.WorkflowContext, error) { + return apiCtx, nil + }, + } + + updateID := t.Name() + "-update-id" + req := historyservice.PollWorkflowExecutionUpdateRequest{ + Request: &workflowservice.PollWorkflowExecutionUpdateRequest{ + UpdateRef: &updatepb.UpdateRef{ + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: t.Name() + "-workflow-id", + RunId: t.Name() + "-run-id", + }, + UpdateId: updateID, + }, + }, + } + + t.Run("update not found", func(t *testing.T) { + reg.FindFunc = func(ctx context.Context, updateID string) (*update.Update, bool) { + return nil, false + } + _, err := pollupdate.Invoke(context.TODO(), &req, wfcc) + var notfound *serviceerror.NotFound + require.ErrorAs(t, err, ¬found) + }) + t.Run("future timeout", func(t *testing.T) { + reg.FindFunc = func(ctx context.Context, updateID string) (*update.Update, bool) { + return update.New(updateID, func() {}), true + } + ctx, cncl := context.WithTimeout(context.Background(), 5*time.Millisecond) + defer cncl() + _, err := pollupdate.Invoke(ctx, &req, wfcc) + require.Error(t, err) + }) + t.Run("get an outcome", func(t *testing.T) { + upd := update.New(updateID, func() {}) + reg.FindFunc = func(ctx context.Context, updateID string) (*update.Update, bool) { + return upd, true + } + reqMsg := updatepb.Request{ + Meta: &updatepb.Meta{UpdateId: updateID}, + Input: &updatepb.Input{Name: "not_empty"}, + } + fail := failurepb.Failure{Message: "intentional failure in " + t.Name()} + wantOutcome := updatepb.Outcome{Value: &updatepb.Outcome_Failure{Failure: &fail}} + rejMsg := updatepb.Rejection{ + RejectedRequestMessageId: updateID + "/request", + RejectedRequest: &reqMsg, + Failure: &fail, + } + + errCh := make(chan error) + respCh := make(chan *historyservice.PollWorkflowExecutionUpdateResponse) + go func() { + resp, err := pollupdate.Invoke(context.TODO(), &req, wfcc) + errCh <- err + respCh <- resp + }() + + evStore := mockUpdateEventStore{} + require.NoError(t, upd.OnMessage(context.TODO(), &reqMsg, evStore)) + require.NoError(t, upd.OnMessage(context.TODO(), &rejMsg, evStore)) + + require.NoError(t, <-errCh) + resp := <-respCh + require.Equal(t, &wantOutcome, resp.GetResponse().Outcome) + }) +} diff --git a/service/history/api/updateworkflow/api.go b/service/history/api/updateworkflow/api.go index 47c3d77a29c..962c2b61a8c 100644 --- a/service/history/api/updateworkflow/api.go +++ b/service/history/api/updateworkflow/api.go @@ -26,12 +26,15 @@ package updateworkflow import ( "context" + "fmt" "time" commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/api/serviceerror" updatepb "go.temporal.io/api/update/v1" "go.temporal.io/api/workflowservice/v1" + enumspb "go.temporal.io/api/enums/v1" enumsspb "go.temporal.io/server/api/enums/v1" "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/api/matchingservice/v1" @@ -52,6 +55,13 @@ func Invoke( matchingClient matchingservice.MatchingServiceClient, ) (_ *historyservice.UpdateWorkflowExecutionResponse, retErr error) { + waitStage := req.GetRequest().GetWaitPolicy().GetLifecycleStage() + if waitStage != enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED && + waitStage != enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED { + return nil, serviceerror.NewUnimplemented( + fmt.Sprintf("%v is not implemented", waitStage)) + } + weCtx, err := workflowConsistencyChecker.GetWorkflowContext( ctx, nil, @@ -114,7 +124,11 @@ func Invoke( weCtx.GetReleaseFn()(nil) } - updOutcome, err := upd.WaitOutcome(ctx) + waitf := upd.WaitOutcome + if waitStage == enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED { + waitf = upd.WaitAccepted + } + updOutcome, err := waitf(ctx) if err != nil { return nil, err } diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 7fcfa0f8ca3..5d80b193d0f 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -32,7 +32,6 @@ import ( "go.opentelemetry.io/otel/trace" commonpb "go.temporal.io/api/common/v1" historypb "go.temporal.io/api/history/v1" - "go.temporal.io/api/serviceerror" "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/api/matchingservice/v1" @@ -58,6 +57,7 @@ import ( "go.temporal.io/server/service/history/api/deleteworkflow" "go.temporal.io/server/service/history/api/describemutablestate" "go.temporal.io/server/service/history/api/describeworkflow" + "go.temporal.io/server/service/history/api/pollupdate" "go.temporal.io/server/service/history/api/queryworkflow" "go.temporal.io/server/service/history/api/reapplyevents" "go.temporal.io/server/service/history/api/recordactivitytaskheartbeat" @@ -549,7 +549,7 @@ func (e *historyEngineImpl) PollWorkflowExecutionUpdate( ctx context.Context, req *historyservice.PollWorkflowExecutionUpdateRequest, ) (*historyservice.PollWorkflowExecutionUpdateResponse, error) { - return nil, serviceerror.NewUnimplemented("PollWorkflowExecutionUpdate not implemented") + return pollupdate.Invoke(ctx, req, e.workflowConsistencyChecker) } // RemoveSignalMutableState remove the signal request id in signal_requested for deduplicate From 2a20bf15e1aebaec7ec325fff57cef175b0f988b Mon Sep 17 00:00:00 2001 From: Matt McShane Date: Wed, 10 May 2023 17:50:08 -0400 Subject: [PATCH 2/6] Remove update outcome lazy fetching Can't control when WaitOutcome is called and it may reasonably be called at a point in time when the workflow context lock is not held so rather than loading the outcome lazily, do it when the Update object is first loaded. --- service/history/api/pollupdate/api.go | 3 +- service/history/workflow/update/registry.go | 9 +++-- .../history/workflow/update/registry_test.go | 38 +++++++++++++++++++ service/history/workflow/update/update.go | 10 ++++- .../history/workflow/update/update_test.go | 21 ---------- service/history/workflow/update/util.go | 17 +++------ 6 files changed, 59 insertions(+), 39 deletions(-) diff --git a/service/history/api/pollupdate/api.go b/service/history/api/pollupdate/api.go index a80ef8a18aa..78f2b53636d 100644 --- a/service/history/api/pollupdate/api.go +++ b/service/history/api/pollupdate/api.go @@ -59,7 +59,8 @@ func Invoke( if err != nil { return nil, false, err } - defer wfctx.GetReleaseFn()(nil) + release := wfctx.GetReleaseFn() + defer release(nil) upd, found := wfctx.GetUpdateRegistry(ctx).Find(ctx, updateRef.UpdateId) return upd, found, nil }() diff --git a/service/history/workflow/update/registry.go b/service/history/workflow/update/registry.go index 268110335ac..a0364c47f8b 100644 --- a/service/history/workflow/update/registry.go +++ b/service/history/workflow/update/registry.go @@ -243,11 +243,14 @@ func (r *RegistryImpl) findLocked(ctx context.Context, id string) (*Update, bool if info, ok := r.store.GetUpdateInfo(ctx, id); ok { if info.GetCompletedPointer() != nil { + outcome, err := r.store.GetUpdateOutcome(ctx, id) // Completed, create the Update object but do not add to registry. this // should not happen often. - return newCompleted(id, func(ctx context.Context) (*updatepb.Outcome, error) { - return r.store.GetUpdateOutcome(ctx, id) - }, withInstrumentation(&r.instrumentation)), true + return newCompleted( + id, + outcomeOrErr{outcome: outcome, err: err}, + withInstrumentation(&r.instrumentation), + ), true } } return nil, false diff --git a/service/history/workflow/update/registry_test.go b/service/history/workflow/update/registry_test.go index 9066011e09f..c63270f60a7 100644 --- a/service/history/workflow/update/registry_test.go +++ b/service/history/workflow/update/registry_test.go @@ -26,6 +26,7 @@ package update_test import ( "context" + "fmt" "testing" "github.com/gogo/protobuf/proto" @@ -376,6 +377,43 @@ func TestInFlightLimit(t *testing.T) { }) } +func TestStorageErrorWhenLookingUpCompletedOutcome(t *testing.T) { + t.Parallel() + completedUpdateID := t.Name() + "-completed-update-id" + expectError := fmt.Errorf("expected error in %s", t.Name()) + regStore := mockUpdateStore{ + GetAcceptedWorkflowExecutionUpdateIDsFunc: func( + context.Context, + ) []string { + return nil + }, + GetUpdateInfoFunc: func( + ctx context.Context, + updateID string, + ) (*persistencespb.UpdateInfo, bool) { + if updateID == completedUpdateID { + return &persistencespb.UpdateInfo{ + Value: &persistencespb.UpdateInfo_CompletedPointer{ + CompletedPointer: &historyspb.HistoryEventPointer{EventId: 123}, + }, + }, true + } + return nil, false + }, + GetUpdateOutcomeFunc: func( + ctx context.Context, + updateID string, + ) (*updatepb.Outcome, error) { + return nil, expectError + }, + } + reg := update.NewRegistry(regStore) + upd, found := reg.Find(context.TODO(), completedUpdateID) + require.True(t, found) + _, err := upd.WaitOutcome(context.TODO()) + require.ErrorIs(t, expectError, err) +} + func mustMarshalAny(t *testing.T, pb proto.Message) *types.Any { t.Helper() a, err := types.MarshalAny(pb) diff --git a/service/history/workflow/update/update.go b/service/history/workflow/update/update.go index 1ce2a857625..f38139d5449 100644 --- a/service/history/workflow/update/update.go +++ b/service/history/workflow/update/update.go @@ -126,15 +126,21 @@ func newAccepted(id string, onComplete func(), opts ...updateOpt) *Update { func newCompleted( id string, - fetchOutcome func(ctx context.Context) (*updatepb.Outcome, error), + fetchResult outcomeOrErr, opts ...updateOpt, ) *Update { + makeOutcomeFuture := func() future.Future[*updatepb.Outcome] { + if fetchResult.err != nil { + return future.NewReadyFuture[*updatepb.Outcome](nil, fetchResult.err) + } + return future.NewReadyFuture(fetchResult.outcome, nil) + } upd := &Update{ id: id, state: stateCompleted, instrumentation: &noopInstrumentation, accepted: future.NewReadyFuture[*failurepb.Failure](nil, nil), - outcome: lazyOutcome(fetchOutcome), + outcome: makeOutcomeFuture(), } for _, opt := range opts { opt(upd) diff --git a/service/history/workflow/update/update_test.go b/service/history/workflow/update/update_test.go index 1d081ea44d7..da4b154b88f 100644 --- a/service/history/workflow/update/update_test.go +++ b/service/history/workflow/update/update_test.go @@ -378,27 +378,6 @@ func TestAcceptanceAndResponseInSameMessageBatch(t *testing.T) { require.True(t, completed) } -func TestCompletedLazyOutcomeRead(t *testing.T) { - t.Parallel() - ctx := context.Background() - updateID := t.Name() + "-update-id" - outcome := successOutcome(t, "success!") - fetched := false - upd := update.NewCompleted(updateID, func(context.Context) (*updatepb.Outcome, error) { - fetched = true - return outcome, nil - }) - require.False(t, fetched) - acceptedOutcome, err := upd.WaitAccepted(ctx) - require.NoError(t, err) - require.Equal(t, outcome, acceptedOutcome) - - got, err := upd.WaitOutcome(ctx) - require.True(t, fetched) - require.NoError(t, err) - require.Equal(t, outcome, got) -} - func TestDuplicateRequestNoError(t *testing.T) { t.Parallel() ctx := context.Background() diff --git a/service/history/workflow/update/util.go b/service/history/workflow/update/util.go index babeb8fdcfd..4cc9dac2343 100644 --- a/service/history/workflow/update/util.go +++ b/service/history/workflow/update/util.go @@ -25,7 +25,6 @@ package update import ( - "context" "fmt" "go.opentelemetry.io/otel/trace" @@ -39,14 +38,16 @@ import ( const libraryName = "go.temporal.io/service/history/workflow/update" type ( - // lazyOutcome adapts a func to the future.Future[*updatepb.Outcome] interface - lazyOutcome func(context.Context) (*updatepb.Outcome, error) - instrumentation struct { log log.Logger metrics metrics.Handler tracer trace.Tracer } + + outcomeOrErr struct { + outcome *updatepb.Outcome + err error + } ) var ( @@ -57,14 +58,6 @@ var ( } ) -func (lo lazyOutcome) Get(ctx context.Context) (*updatepb.Outcome, error) { - return lo(ctx) -} - -func (lazyOutcome) Ready() bool { - return true -} - func invalidArgf(tmpl string, args ...any) error { return serviceerror.NewInvalidArgument(fmt.Sprintf(tmpl, args...)) } From 9b717fdcb46406ff5d06d9023b150863faaa1294 Mon Sep 17 00:00:00 2001 From: Matt McShane Date: Thu, 11 May 2023 10:20:55 -0400 Subject: [PATCH 3/6] Use ReadyFuture as the parameter type to update.newCompleted --- service/history/workflow/update/registry.go | 5 +++-- service/history/workflow/update/update.go | 10 ++-------- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/service/history/workflow/update/registry.go b/service/history/workflow/update/registry.go index a0364c47f8b..540aee3d62a 100644 --- a/service/history/workflow/update/registry.go +++ b/service/history/workflow/update/registry.go @@ -36,6 +36,7 @@ import ( "go.temporal.io/api/serviceerror" updatepb "go.temporal.io/api/update/v1" persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common/future" "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" ) @@ -243,12 +244,12 @@ func (r *RegistryImpl) findLocked(ctx context.Context, id string) (*Update, bool if info, ok := r.store.GetUpdateInfo(ctx, id); ok { if info.GetCompletedPointer() != nil { - outcome, err := r.store.GetUpdateOutcome(ctx, id) // Completed, create the Update object but do not add to registry. this // should not happen often. + fut := future.NewReadyFuture(r.store.GetUpdateOutcome(ctx, id)) return newCompleted( id, - outcomeOrErr{outcome: outcome, err: err}, + fut, withInstrumentation(&r.instrumentation), ), true } diff --git a/service/history/workflow/update/update.go b/service/history/workflow/update/update.go index f38139d5449..d0616dd7eae 100644 --- a/service/history/workflow/update/update.go +++ b/service/history/workflow/update/update.go @@ -126,21 +126,15 @@ func newAccepted(id string, onComplete func(), opts ...updateOpt) *Update { func newCompleted( id string, - fetchResult outcomeOrErr, + outcomeFuture *future.ReadyFutureImpl[*updatepb.Outcome], opts ...updateOpt, ) *Update { - makeOutcomeFuture := func() future.Future[*updatepb.Outcome] { - if fetchResult.err != nil { - return future.NewReadyFuture[*updatepb.Outcome](nil, fetchResult.err) - } - return future.NewReadyFuture(fetchResult.outcome, nil) - } upd := &Update{ id: id, state: stateCompleted, instrumentation: &noopInstrumentation, accepted: future.NewReadyFuture[*failurepb.Failure](nil, nil), - outcome: makeOutcomeFuture(), + outcome: outcomeFuture, } for _, opt := range opts { opt(upd) From 956f9e689204c6295d7a2d0da8f95a8bffcfa459 Mon Sep 17 00:00:00 2001 From: Matt McShane Date: Thu, 11 May 2023 10:31:14 -0400 Subject: [PATCH 4/6] combine validation with selecting the wait function --- service/history/api/updateworkflow/api.go | 26 +++++++++++++++++------ 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/service/history/api/updateworkflow/api.go b/service/history/api/updateworkflow/api.go index 962c2b61a8c..6ff920b50fb 100644 --- a/service/history/api/updateworkflow/api.go +++ b/service/history/api/updateworkflow/api.go @@ -45,6 +45,7 @@ import ( "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/workflow" + "go.temporal.io/server/service/history/workflow/update" ) func Invoke( @@ -55,9 +56,24 @@ func Invoke( matchingClient matchingservice.MatchingServiceClient, ) (_ *historyservice.UpdateWorkflowExecutionResponse, retErr error) { + var waitLifecycleStage func(ctx context.Context, u *update.Update) (*updatepb.Outcome, error) waitStage := req.GetRequest().GetWaitPolicy().GetLifecycleStage() - if waitStage != enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED && - waitStage != enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED { + switch waitStage { + case enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED: + waitLifecycleStage = func( + ctx context.Context, + u *update.Update, + ) (*updatepb.Outcome, error) { + return u.WaitAccepted(ctx) + } + case enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED: + waitLifecycleStage = func( + ctx context.Context, + u *update.Update, + ) (*updatepb.Outcome, error) { + return u.WaitOutcome(ctx) + } + default: return nil, serviceerror.NewUnimplemented( fmt.Sprintf("%v is not implemented", waitStage)) } @@ -124,11 +140,7 @@ func Invoke( weCtx.GetReleaseFn()(nil) } - waitf := upd.WaitOutcome - if waitStage == enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED { - waitf = upd.WaitAccepted - } - updOutcome, err := waitf(ctx) + updOutcome, err := waitLifecycleStage(ctx, upd) if err != nil { return nil, err } From b3c11b8c61708d5eaa7d18da00081fb5069b48cf Mon Sep 17 00:00:00 2001 From: Matt McShane Date: Thu, 11 May 2023 10:36:51 -0400 Subject: [PATCH 5/6] buffer error channels in test to avoid leaking a goro --- service/history/api/pollupdate/api_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/service/history/api/pollupdate/api_test.go b/service/history/api/pollupdate/api_test.go index e920054d706..7d48ebf903f 100644 --- a/service/history/api/pollupdate/api_test.go +++ b/service/history/api/pollupdate/api_test.go @@ -165,8 +165,8 @@ func TestPollOutcome(t *testing.T) { Failure: &fail, } - errCh := make(chan error) - respCh := make(chan *historyservice.PollWorkflowExecutionUpdateResponse) + errCh := make(chan error, 1) + respCh := make(chan *historyservice.PollWorkflowExecutionUpdateResponse, 1) go func() { resp, err := pollupdate.Invoke(context.TODO(), &req, wfcc) errCh <- err From f9dad4f5e6b480386c95ea50cb8794825741f39a Mon Sep 17 00:00:00 2001 From: Matt McShane Date: Thu, 11 May 2023 14:32:07 -0400 Subject: [PATCH 6/6] Remove unused struct --- service/history/workflow/update/util.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/service/history/workflow/update/util.go b/service/history/workflow/update/util.go index 4cc9dac2343..c5d2297f56c 100644 --- a/service/history/workflow/update/util.go +++ b/service/history/workflow/update/util.go @@ -29,7 +29,6 @@ import ( "go.opentelemetry.io/otel/trace" "go.temporal.io/api/serviceerror" - updatepb "go.temporal.io/api/update/v1" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" @@ -43,11 +42,6 @@ type ( metrics metrics.Handler tracer trace.Tracer } - - outcomeOrErr struct { - outcome *updatepb.Outcome - err error - } ) var (