diff --git a/service/history/api/pollupdate/api.go b/service/history/api/pollupdate/api.go new file mode 100644 index 00000000000..78f2b53636d --- /dev/null +++ b/service/history/api/pollupdate/api.go @@ -0,0 +1,82 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package pollupdate + +import ( + "context" + "fmt" + + "go.temporal.io/api/serviceerror" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/common/definition" + "go.temporal.io/server/service/history/api" + "go.temporal.io/server/service/history/workflow" + "go.temporal.io/server/service/history/workflow/update" +) + +func Invoke( + ctx context.Context, + req *historyservice.PollWorkflowExecutionUpdateRequest, + ctxLookup api.WorkflowConsistencyChecker, +) (*historyservice.PollWorkflowExecutionUpdateResponse, error) { + updateRef := req.GetRequest().GetUpdateRef() + wfexec := updateRef.GetWorkflowExecution() + upd, ok, err := func() (*update.Update, bool, error) { + wfctx, err := ctxLookup.GetWorkflowContext( + ctx, + nil, + api.BypassMutableStateConsistencyPredicate, + definition.NewWorkflowKey( + req.GetNamespaceId(), + wfexec.GetWorkflowId(), + wfexec.GetRunId(), + ), + workflow.LockPriorityHigh, + ) + if err != nil { + return nil, false, err + } + release := wfctx.GetReleaseFn() + defer release(nil) + upd, found := wfctx.GetUpdateRegistry(ctx).Find(ctx, updateRef.UpdateId) + return upd, found, nil + }() + if err != nil { + return nil, err + } + if !ok { + return nil, serviceerror.NewNotFound(fmt.Sprintf("update %q not found", updateRef.GetUpdateId())) + } + outcome, err := upd.WaitOutcome(ctx) + if err != nil { + return nil, err + } + return &historyservice.PollWorkflowExecutionUpdateResponse{ + Response: &workflowservice.PollWorkflowExecutionUpdateResponse{ + Outcome: outcome, + }, + }, nil +} diff --git a/service/history/api/pollupdate/api_test.go b/service/history/api/pollupdate/api_test.go new file mode 100644 index 00000000000..7d48ebf903f --- /dev/null +++ b/service/history/api/pollupdate/api_test.go @@ -0,0 +1,184 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package pollupdate_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + commonpb "go.temporal.io/api/common/v1" + failurepb "go.temporal.io/api/failure/v1" + "go.temporal.io/api/serviceerror" + updatepb "go.temporal.io/api/update/v1" + "go.temporal.io/api/workflowservice/v1" + clockspb "go.temporal.io/server/api/clock/v1" + "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/common/definition" + "go.temporal.io/server/service/history/api" + "go.temporal.io/server/service/history/api/pollupdate" + "go.temporal.io/server/service/history/workflow" + wcache "go.temporal.io/server/service/history/workflow/cache" + "go.temporal.io/server/service/history/workflow/update" +) + +type ( + mockWFConsistencyChecker struct { + api.WorkflowConsistencyChecker + GetWorkflowContextFunc func( + ctx context.Context, + reqClock *clockspb.VectorClock, + consistencyPredicate api.MutableStateConsistencyPredicate, + workflowKey definition.WorkflowKey, + lockPriority workflow.LockPriority, + ) (api.WorkflowContext, error) + } + + mockAPICtx struct { + api.WorkflowContext + GetUpdateRegistryFunc func(context.Context) update.Registry + GetReleaseFnFunc func() wcache.ReleaseCacheFunc + } + + mockReg struct { + update.Registry + FindFunc func(context.Context, string) (*update.Update, bool) + } + + mockUpdateEventStore struct { + update.EventStore + } +) + +func (mockUpdateEventStore) OnAfterCommit(f func(context.Context)) { f(context.TODO()) } +func (mockUpdateEventStore) OnAfterRollback(f func(context.Context)) {} + +func (m mockWFConsistencyChecker) GetWorkflowContext( + ctx context.Context, + clock *clockspb.VectorClock, + pred api.MutableStateConsistencyPredicate, + wfKey definition.WorkflowKey, + prio workflow.LockPriority, +) (api.WorkflowContext, error) { + return m.GetWorkflowContextFunc(ctx, clock, pred, wfKey, prio) +} + +func (m mockAPICtx) GetReleaseFn() wcache.ReleaseCacheFunc { + return m.GetReleaseFnFunc() +} + +func (m mockAPICtx) GetUpdateRegistry(ctx context.Context) update.Registry { + return m.GetUpdateRegistryFunc(ctx) +} + +func (m mockReg) Find(ctx context.Context, updateID string) (*update.Update, bool) { + return m.FindFunc(ctx, updateID) +} + +func TestPollOutcome(t *testing.T) { + reg := mockReg{} + apiCtx := mockAPICtx{ + GetReleaseFnFunc: func() wcache.ReleaseCacheFunc { return func(error) {} }, + GetUpdateRegistryFunc: func(context.Context) update.Registry { + return reg + }, + } + wfcc := mockWFConsistencyChecker{ + GetWorkflowContextFunc: func( + ctx context.Context, + reqClock *clockspb.VectorClock, + consistencyPredicate api.MutableStateConsistencyPredicate, + workflowKey definition.WorkflowKey, + lockPriority workflow.LockPriority, + ) (api.WorkflowContext, error) { + return apiCtx, nil + }, + } + + updateID := t.Name() + "-update-id" + req := historyservice.PollWorkflowExecutionUpdateRequest{ + Request: &workflowservice.PollWorkflowExecutionUpdateRequest{ + UpdateRef: &updatepb.UpdateRef{ + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: t.Name() + "-workflow-id", + RunId: t.Name() + "-run-id", + }, + UpdateId: updateID, + }, + }, + } + + t.Run("update not found", func(t *testing.T) { + reg.FindFunc = func(ctx context.Context, updateID string) (*update.Update, bool) { + return nil, false + } + _, err := pollupdate.Invoke(context.TODO(), &req, wfcc) + var notfound *serviceerror.NotFound + require.ErrorAs(t, err, ¬found) + }) + t.Run("future timeout", func(t *testing.T) { + reg.FindFunc = func(ctx context.Context, updateID string) (*update.Update, bool) { + return update.New(updateID, func() {}), true + } + ctx, cncl := context.WithTimeout(context.Background(), 5*time.Millisecond) + defer cncl() + _, err := pollupdate.Invoke(ctx, &req, wfcc) + require.Error(t, err) + }) + t.Run("get an outcome", func(t *testing.T) { + upd := update.New(updateID, func() {}) + reg.FindFunc = func(ctx context.Context, updateID string) (*update.Update, bool) { + return upd, true + } + reqMsg := updatepb.Request{ + Meta: &updatepb.Meta{UpdateId: updateID}, + Input: &updatepb.Input{Name: "not_empty"}, + } + fail := failurepb.Failure{Message: "intentional failure in " + t.Name()} + wantOutcome := updatepb.Outcome{Value: &updatepb.Outcome_Failure{Failure: &fail}} + rejMsg := updatepb.Rejection{ + RejectedRequestMessageId: updateID + "/request", + RejectedRequest: &reqMsg, + Failure: &fail, + } + + errCh := make(chan error, 1) + respCh := make(chan *historyservice.PollWorkflowExecutionUpdateResponse, 1) + go func() { + resp, err := pollupdate.Invoke(context.TODO(), &req, wfcc) + errCh <- err + respCh <- resp + }() + + evStore := mockUpdateEventStore{} + require.NoError(t, upd.OnMessage(context.TODO(), &reqMsg, evStore)) + require.NoError(t, upd.OnMessage(context.TODO(), &rejMsg, evStore)) + + require.NoError(t, <-errCh) + resp := <-respCh + require.Equal(t, &wantOutcome, resp.GetResponse().Outcome) + }) +} diff --git a/service/history/api/updateworkflow/api.go b/service/history/api/updateworkflow/api.go index 47c3d77a29c..6ff920b50fb 100644 --- a/service/history/api/updateworkflow/api.go +++ b/service/history/api/updateworkflow/api.go @@ -26,12 +26,15 @@ package updateworkflow import ( "context" + "fmt" "time" commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/api/serviceerror" updatepb "go.temporal.io/api/update/v1" "go.temporal.io/api/workflowservice/v1" + enumspb "go.temporal.io/api/enums/v1" enumsspb "go.temporal.io/server/api/enums/v1" "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/api/matchingservice/v1" @@ -42,6 +45,7 @@ import ( "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/workflow" + "go.temporal.io/server/service/history/workflow/update" ) func Invoke( @@ -52,6 +56,28 @@ func Invoke( matchingClient matchingservice.MatchingServiceClient, ) (_ *historyservice.UpdateWorkflowExecutionResponse, retErr error) { + var waitLifecycleStage func(ctx context.Context, u *update.Update) (*updatepb.Outcome, error) + waitStage := req.GetRequest().GetWaitPolicy().GetLifecycleStage() + switch waitStage { + case enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED: + waitLifecycleStage = func( + ctx context.Context, + u *update.Update, + ) (*updatepb.Outcome, error) { + return u.WaitAccepted(ctx) + } + case enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED: + waitLifecycleStage = func( + ctx context.Context, + u *update.Update, + ) (*updatepb.Outcome, error) { + return u.WaitOutcome(ctx) + } + default: + return nil, serviceerror.NewUnimplemented( + fmt.Sprintf("%v is not implemented", waitStage)) + } + weCtx, err := workflowConsistencyChecker.GetWorkflowContext( ctx, nil, @@ -114,7 +140,7 @@ func Invoke( weCtx.GetReleaseFn()(nil) } - updOutcome, err := upd.WaitOutcome(ctx) + updOutcome, err := waitLifecycleStage(ctx, upd) if err != nil { return nil, err } diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 7fcfa0f8ca3..5d80b193d0f 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -32,7 +32,6 @@ import ( "go.opentelemetry.io/otel/trace" commonpb "go.temporal.io/api/common/v1" historypb "go.temporal.io/api/history/v1" - "go.temporal.io/api/serviceerror" "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/api/matchingservice/v1" @@ -58,6 +57,7 @@ import ( "go.temporal.io/server/service/history/api/deleteworkflow" "go.temporal.io/server/service/history/api/describemutablestate" "go.temporal.io/server/service/history/api/describeworkflow" + "go.temporal.io/server/service/history/api/pollupdate" "go.temporal.io/server/service/history/api/queryworkflow" "go.temporal.io/server/service/history/api/reapplyevents" "go.temporal.io/server/service/history/api/recordactivitytaskheartbeat" @@ -549,7 +549,7 @@ func (e *historyEngineImpl) PollWorkflowExecutionUpdate( ctx context.Context, req *historyservice.PollWorkflowExecutionUpdateRequest, ) (*historyservice.PollWorkflowExecutionUpdateResponse, error) { - return nil, serviceerror.NewUnimplemented("PollWorkflowExecutionUpdate not implemented") + return pollupdate.Invoke(ctx, req, e.workflowConsistencyChecker) } // RemoveSignalMutableState remove the signal request id in signal_requested for deduplicate diff --git a/service/history/workflow/update/registry.go b/service/history/workflow/update/registry.go index 268110335ac..540aee3d62a 100644 --- a/service/history/workflow/update/registry.go +++ b/service/history/workflow/update/registry.go @@ -36,6 +36,7 @@ import ( "go.temporal.io/api/serviceerror" updatepb "go.temporal.io/api/update/v1" persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common/future" "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" ) @@ -245,9 +246,12 @@ func (r *RegistryImpl) findLocked(ctx context.Context, id string) (*Update, bool if info.GetCompletedPointer() != nil { // Completed, create the Update object but do not add to registry. this // should not happen often. - return newCompleted(id, func(ctx context.Context) (*updatepb.Outcome, error) { - return r.store.GetUpdateOutcome(ctx, id) - }, withInstrumentation(&r.instrumentation)), true + fut := future.NewReadyFuture(r.store.GetUpdateOutcome(ctx, id)) + return newCompleted( + id, + fut, + withInstrumentation(&r.instrumentation), + ), true } } return nil, false diff --git a/service/history/workflow/update/registry_test.go b/service/history/workflow/update/registry_test.go index 9066011e09f..c63270f60a7 100644 --- a/service/history/workflow/update/registry_test.go +++ b/service/history/workflow/update/registry_test.go @@ -26,6 +26,7 @@ package update_test import ( "context" + "fmt" "testing" "github.com/gogo/protobuf/proto" @@ -376,6 +377,43 @@ func TestInFlightLimit(t *testing.T) { }) } +func TestStorageErrorWhenLookingUpCompletedOutcome(t *testing.T) { + t.Parallel() + completedUpdateID := t.Name() + "-completed-update-id" + expectError := fmt.Errorf("expected error in %s", t.Name()) + regStore := mockUpdateStore{ + GetAcceptedWorkflowExecutionUpdateIDsFunc: func( + context.Context, + ) []string { + return nil + }, + GetUpdateInfoFunc: func( + ctx context.Context, + updateID string, + ) (*persistencespb.UpdateInfo, bool) { + if updateID == completedUpdateID { + return &persistencespb.UpdateInfo{ + Value: &persistencespb.UpdateInfo_CompletedPointer{ + CompletedPointer: &historyspb.HistoryEventPointer{EventId: 123}, + }, + }, true + } + return nil, false + }, + GetUpdateOutcomeFunc: func( + ctx context.Context, + updateID string, + ) (*updatepb.Outcome, error) { + return nil, expectError + }, + } + reg := update.NewRegistry(regStore) + upd, found := reg.Find(context.TODO(), completedUpdateID) + require.True(t, found) + _, err := upd.WaitOutcome(context.TODO()) + require.ErrorIs(t, expectError, err) +} + func mustMarshalAny(t *testing.T, pb proto.Message) *types.Any { t.Helper() a, err := types.MarshalAny(pb) diff --git a/service/history/workflow/update/update.go b/service/history/workflow/update/update.go index 1ce2a857625..d0616dd7eae 100644 --- a/service/history/workflow/update/update.go +++ b/service/history/workflow/update/update.go @@ -126,7 +126,7 @@ func newAccepted(id string, onComplete func(), opts ...updateOpt) *Update { func newCompleted( id string, - fetchOutcome func(ctx context.Context) (*updatepb.Outcome, error), + outcomeFuture *future.ReadyFutureImpl[*updatepb.Outcome], opts ...updateOpt, ) *Update { upd := &Update{ @@ -134,7 +134,7 @@ func newCompleted( state: stateCompleted, instrumentation: &noopInstrumentation, accepted: future.NewReadyFuture[*failurepb.Failure](nil, nil), - outcome: lazyOutcome(fetchOutcome), + outcome: outcomeFuture, } for _, opt := range opts { opt(upd) diff --git a/service/history/workflow/update/update_test.go b/service/history/workflow/update/update_test.go index 1d081ea44d7..da4b154b88f 100644 --- a/service/history/workflow/update/update_test.go +++ b/service/history/workflow/update/update_test.go @@ -378,27 +378,6 @@ func TestAcceptanceAndResponseInSameMessageBatch(t *testing.T) { require.True(t, completed) } -func TestCompletedLazyOutcomeRead(t *testing.T) { - t.Parallel() - ctx := context.Background() - updateID := t.Name() + "-update-id" - outcome := successOutcome(t, "success!") - fetched := false - upd := update.NewCompleted(updateID, func(context.Context) (*updatepb.Outcome, error) { - fetched = true - return outcome, nil - }) - require.False(t, fetched) - acceptedOutcome, err := upd.WaitAccepted(ctx) - require.NoError(t, err) - require.Equal(t, outcome, acceptedOutcome) - - got, err := upd.WaitOutcome(ctx) - require.True(t, fetched) - require.NoError(t, err) - require.Equal(t, outcome, got) -} - func TestDuplicateRequestNoError(t *testing.T) { t.Parallel() ctx := context.Background() diff --git a/service/history/workflow/update/util.go b/service/history/workflow/update/util.go index babeb8fdcfd..c5d2297f56c 100644 --- a/service/history/workflow/update/util.go +++ b/service/history/workflow/update/util.go @@ -25,12 +25,10 @@ package update import ( - "context" "fmt" "go.opentelemetry.io/otel/trace" "go.temporal.io/api/serviceerror" - updatepb "go.temporal.io/api/update/v1" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" @@ -39,9 +37,6 @@ import ( const libraryName = "go.temporal.io/service/history/workflow/update" type ( - // lazyOutcome adapts a func to the future.Future[*updatepb.Outcome] interface - lazyOutcome func(context.Context) (*updatepb.Outcome, error) - instrumentation struct { log log.Logger metrics metrics.Handler @@ -57,14 +52,6 @@ var ( } ) -func (lo lazyOutcome) Get(ctx context.Context) (*updatepb.Outcome, error) { - return lo(ctx) -} - -func (lazyOutcome) Ready() bool { - return true -} - func invalidArgf(tmpl string, args ...any) error { return serviceerror.NewInvalidArgument(fmt.Sprintf(tmpl, args...)) }