Skip to content

Commit

Permalink
Revert "Bring PollWorkflowExecutionUpdate to HistoryEngine (#4298)"
Browse files Browse the repository at this point in the history
This reverts commit 5af117e.
  • Loading branch information
wxing1292 committed May 11, 2023
1 parent 6cf952f commit 45fada1
Show file tree
Hide file tree
Showing 16 changed files with 328 additions and 1,050 deletions.
994 changes: 241 additions & 753 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

205 changes: 82 additions & 123 deletions api/historyservice/v1/service.pb.go

Large diffs are not rendered by default.

35 changes: 0 additions & 35 deletions api/historyservicemock/v1/service.pb.mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 0 additions & 24 deletions client/history/client_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 0 additions & 14 deletions client/history/metric_client_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 0 additions & 15 deletions client/history/retryable_client_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cmd/tools/rpcwrappers/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func makeGetHistoryClient(reqType reflect.Type) string {
if path := pathToField(t, "ShardId", "request", 1); path != "" {
return fmt.Sprintf("client, err := c.getClientForShardID(%s)", path)
}
if path := pathToField(t, "WorkflowId", "request", 4); path != "" {
if path := pathToField(t, "WorkflowId", "request", 3); path != "" {
return fmt.Sprintf("client, err := c.getClientForWorkflowID(request.NamespaceId, %s)", path)
}
if path := pathToField(t, "TaskToken", "request", 2); path != "" {
Expand Down
2 changes: 0 additions & 2 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,6 @@ const (
HistoryClientTerminateWorkflowExecutionScope = "HistoryClientTerminateWorkflowExecution"
// HistoryClientUpdateWorkflowExecutionScope tracks RPC calls to history service
HistoryClientUpdateWorkflowExecutionScope = "HistoryClientUpdateWorkflowExecution"
// HistoryClientPollWorkflowExecutionUpdateScope tracks RPC calls to history service
HistoryClientPollWorkflowExecutionUpdateScope = "HistoryClientPollPollWorkflowExecutionUpdate"
// HistoryClientDeleteWorkflowExecutionScope tracks RPC calls to history service
HistoryClientDeleteWorkflowExecutionScope = "HistoryClientDeleteWorkflowExecution"
// HistoryClientResetWorkflowExecutionScope tracks RPC calls to history service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,13 +648,4 @@ message StreamWorkflowReplicationMessagesResponse {
oneof attributes {
temporal.server.api.replication.v1.WorkflowReplicationMessages messages = 1;
}
}

message PollWorkflowExecutionUpdateRequest {
string namespace_id = 1;
temporal.api.workflowservice.v1.PollWorkflowExecutionUpdateRequest request = 2;
}

message PollWorkflowExecutionUpdateResponse {
temporal.api.workflowservice.v1.PollWorkflowExecutionUpdateResponse response = 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,6 @@ service HistoryService {
rpc UpdateWorkflowExecution(UpdateWorkflowExecutionRequest) returns (UpdateWorkflowExecutionResponse) {
}

// (-- api-linter: core::0134=disabled
// aip.dev/not-precedent: This service does not follow the update method API --)
rpc PollWorkflowExecutionUpdate(PollWorkflowExecutionUpdateRequest) returns (PollWorkflowExecutionUpdateResponse){
}

rpc StreamWorkflowReplicationMessages(stream StreamWorkflowReplicationMessagesRequest) returns (stream StreamWorkflowReplicationMessagesResponse) {
}
}
18 changes: 2 additions & 16 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ import (
"go.temporal.io/server/api/matchingservice/v1"
schedspb "go.temporal.io/server/api/schedule/v1"
tokenspb "go.temporal.io/server/api/token/v1"
"go.temporal.io/server/client/frontend"
"go.temporal.io/server/common"
"go.temporal.io/server/common/archiver"
"go.temporal.io/server/common/archiver/provider"
Expand Down Expand Up @@ -3485,7 +3484,7 @@ func (wh *WorkflowHandler) PollWorkflowExecutionUpdate(
}
enums.SetDefaultUpdateWorkflowExecutionLifecycleStage(&request.GetWaitPolicy().LifecycleStage)

nsID, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace()))
_, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace()))
if err != nil {
return nil, err
}
Expand All @@ -3494,20 +3493,7 @@ func (wh *WorkflowHandler) PollWorkflowExecutionUpdate(
return nil, errUpdateWorkflowExecutionAPINotAllowed
}

ctx, cancel := context.WithTimeout(ctx, frontend.DefaultLongPollTimeout)
defer cancel()

histResp, err := wh.historyClient.PollWorkflowExecutionUpdate(
ctx,
&historyservice.PollWorkflowExecutionUpdateRequest{
NamespaceId: nsID.String(),
Request: request,
},
)
if err != nil {
return nil, err
}
return histResp.GetResponse(), nil
return nil, serviceerror.NewUnimplemented("PollWorkflowExecutionUpdate is not implemented")
}

func (wh *WorkflowHandler) UpdateWorkerBuildIdCompatibility(ctx context.Context, request *workflowservice.UpdateWorkerBuildIdCompatibilityRequest) (_ *workflowservice.UpdateWorkerBuildIdCompatibilityResponse, retError error) {
Expand Down
1 change: 0 additions & 1 deletion service/history/configs/quotas.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ var (
"GetReplicationStatus": 0,
"DeleteWorkflowVisibilityRecord": 0,
"UpdateWorkflowExecution": 0,
"PollWorkflowExecutionUpdate": 0,
"StreamWorkflowReplicationMessages": 0,
}

Expand Down
27 changes: 0 additions & 27 deletions service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1854,33 +1854,6 @@ func (h *Handler) UpdateWorkflowExecution(
return engine.UpdateWorkflowExecution(ctx, request)
}

func (h *Handler) PollWorkflowExecutionUpdate(
ctx context.Context,
request *historyservice.PollWorkflowExecutionUpdateRequest,
) (_ *historyservice.PollWorkflowExecutionUpdateResponse, retErr error) {
defer log.CapturePanic(h.logger, &retErr)
h.startWG.Wait()

if h.isStopped() {
return nil, errShuttingDown
}

shardContext, err := h.controller.GetShardByNamespaceWorkflow(
namespace.ID(request.GetNamespaceId()),
request.GetRequest().GetUpdateRef().GetWorkflowExecution().GetWorkflowId(),
)
if err != nil {
return nil, h.convertError(err)
}

engine, err := shardContext.GetEngine(ctx)
if err != nil {
return nil, h.convertError(err)
}

return engine.PollWorkflowExecutionUpdate(ctx, request)
}

func (h *Handler) StreamWorkflowReplicationMessages(
server historyservice.HistoryService_StreamWorkflowReplicationMessagesServer,
) (retError error) {
Expand Down
9 changes: 1 addition & 8 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"go.opentelemetry.io/otel/trace"
commonpb "go.temporal.io/api/common/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"

"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/api/matchingservice/v1"
Expand Down Expand Up @@ -542,14 +541,8 @@ func (e *historyEngineImpl) UpdateWorkflowExecution(
ctx context.Context,
req *historyservice.UpdateWorkflowExecutionRequest,
) (*historyservice.UpdateWorkflowExecutionResponse, error) {
return updateworkflow.Invoke(ctx, req, e.shard, e.workflowConsistencyChecker, e.matchingClient)
}

func (e *historyEngineImpl) PollWorkflowExecutionUpdate(
ctx context.Context,
req *historyservice.PollWorkflowExecutionUpdateRequest,
) (*historyservice.PollWorkflowExecutionUpdateResponse, error) {
return nil, serviceerror.NewUnimplemented("PollWorkflowExecutionUpdate not implemented")
return updateworkflow.Invoke(ctx, req, e.shard, e.workflowConsistencyChecker, e.matchingClient)
}

// RemoveSignalMutableState remove the signal request id in signal_requested for deduplicate
Expand Down
1 change: 0 additions & 1 deletion service/history/shard/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ type (
GenerateLastHistoryReplicationTasks(ctx context.Context, request *historyservice.GenerateLastHistoryReplicationTasksRequest) (*historyservice.GenerateLastHistoryReplicationTasksResponse, error)
GetReplicationStatus(ctx context.Context, request *historyservice.GetReplicationStatusRequest) (*historyservice.ShardReplicationStatus, error)
UpdateWorkflowExecution(ctx context.Context, request *historyservice.UpdateWorkflowExecutionRequest) (*historyservice.UpdateWorkflowExecutionResponse, error)
PollWorkflowExecutionUpdate(ctx context.Context, request *historyservice.PollWorkflowExecutionUpdateRequest) (*historyservice.PollWorkflowExecutionUpdateResponse, error)

NotifyNewHistoryEvent(event *events.Notification)
NotifyNewTasks(tasks map[tasks.Category][]tasks.Task)
Expand Down
15 changes: 0 additions & 15 deletions service/history/shard/engine_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 45fada1

Please sign in to comment.