diff --git a/service/history/handler.go b/service/history/handler.go index 689bf6e0eee..d8e0e293ffb 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -310,34 +310,34 @@ func (h *handlerImpl) RecordActivityTaskHeartbeat( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "") + return nil, h.error(errDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "") + return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") } heartbeatRequest := wrappedRequest.HeartbeatRequest token, err0 := h.tokenSerializer.Deserialize(heartbeatRequest.TaskToken) if err0 != nil { err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)} - return nil, h.error(err0, scope, domainID, "") + return nil, h.error(err0, scope, domainID, "", "") } err0 = validateTaskToken(token) if err0 != nil { - return nil, h.error(err0, scope, domainID, "") + return nil, h.error(err0, scope, domainID, "", "") } workflowID := token.WorkflowID engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return nil, h.error(err1, scope, domainID, workflowID) + return nil, h.error(err1, scope, domainID, workflowID, "") } response, err2 := engine.RecordActivityTaskHeartbeat(ctx, wrappedRequest) if err2 != nil { - return nil, h.error(err2, scope, domainID, workflowID) + return nil, h.error(err2, scope, domainID, workflowID, "") } return response, nil @@ -369,21 +369,21 @@ func (h *handlerImpl) RecordActivityTaskStarted( ) if recordRequest.GetDomainUUID() == "" { - return nil, h.error(errDomainNotSet, scope, domainID, workflowID) + return nil, h.error(errDomainNotSet, scope, domainID, workflowID, "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, workflowID) + return nil, h.error(errHistoryHostThrottle, scope, domainID, workflowID, "") } engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return nil, h.error(err1, scope, domainID, workflowID) + return nil, h.error(err1, scope, domainID, workflowID, "") } response, err2 := engine.RecordActivityTaskStarted(ctx, recordRequest) if err2 != nil { - return nil, h.error(err2, scope, domainID, workflowID) + return nil, h.error(err2, scope, domainID, workflowID, "") } return response, nil @@ -404,6 +404,7 @@ func (h *handlerImpl) RecordDecisionTaskStarted( domainID := recordRequest.GetDomainUUID() workflowExecution := recordRequest.WorkflowExecution workflowID := workflowExecution.GetWorkflowID() + runID := workflowExecution.GetRunID() h.emitInfoOrDebugLog( domainID, @@ -415,15 +416,15 @@ func (h *handlerImpl) RecordDecisionTaskStarted( ) if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, workflowID) + return nil, h.error(errDomainNotSet, scope, domainID, workflowID, runID) } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, workflowID) + return nil, h.error(errHistoryHostThrottle, scope, domainID, workflowID, runID) } if recordRequest.PollRequest == nil || recordRequest.PollRequest.TaskList.GetName() == "" { - return nil, h.error(errTaskListNotSet, scope, domainID, workflowID) + return nil, h.error(errTaskListNotSet, scope, domainID, workflowID, runID) } engine, err1 := h.controller.GetEngine(workflowID) @@ -431,15 +432,16 @@ func (h *handlerImpl) RecordDecisionTaskStarted( h.GetLogger().Error("RecordDecisionTaskStarted failed.", tag.Error(err1), tag.WorkflowID(recordRequest.WorkflowExecution.GetWorkflowID()), + tag.WorkflowRunID(runID), tag.WorkflowRunID(recordRequest.WorkflowExecution.GetRunID()), tag.WorkflowScheduleID(recordRequest.GetScheduleID()), ) - return nil, h.error(err1, scope, domainID, workflowID) + return nil, h.error(err1, scope, domainID, workflowID, runID) } response, err2 := engine.RecordDecisionTaskStarted(ctx, recordRequest) if err2 != nil { - return nil, h.error(err2, scope, domainID, workflowID) + return nil, h.error(err2, scope, domainID, workflowID, runID) } return response, nil @@ -459,34 +461,35 @@ func (h *handlerImpl) RespondActivityTaskCompleted( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "") + return h.error(errDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "") + return h.error(errHistoryHostThrottle, scope, domainID, "", "") } completeRequest := wrappedRequest.CompleteRequest token, err0 := h.tokenSerializer.Deserialize(completeRequest.TaskToken) if err0 != nil { err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)} - return h.error(err0, scope, domainID, "") + return h.error(err0, scope, domainID, "", "") } err0 = validateTaskToken(token) if err0 != nil { - return h.error(err0, scope, domainID, "") + return h.error(err0, scope, domainID, "", "") } workflowID := token.WorkflowID + runID := token.RunID engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return h.error(err1, scope, domainID, workflowID) + return h.error(err1, scope, domainID, workflowID, runID) } err2 := engine.RespondActivityTaskCompleted(ctx, wrappedRequest) if err2 != nil { - return h.error(err2, scope, domainID, workflowID) + return h.error(err2, scope, domainID, workflowID, runID) } return nil @@ -506,34 +509,35 @@ func (h *handlerImpl) RespondActivityTaskFailed( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "") + return h.error(errDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "") + return h.error(errHistoryHostThrottle, scope, domainID, "", "") } failRequest := wrappedRequest.FailedRequest token, err0 := h.tokenSerializer.Deserialize(failRequest.TaskToken) if err0 != nil { err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)} - return h.error(err0, scope, domainID, "") + return h.error(err0, scope, domainID, "", "") } err0 = validateTaskToken(token) if err0 != nil { - return h.error(err0, scope, domainID, "") + return h.error(err0, scope, domainID, "", "") } workflowID := token.WorkflowID + runID := token.RunID engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return h.error(err1, scope, domainID, workflowID) + return h.error(err1, scope, domainID, workflowID, runID) } err2 := engine.RespondActivityTaskFailed(ctx, wrappedRequest) if err2 != nil { - return h.error(err2, scope, domainID, workflowID) + return h.error(err2, scope, domainID, workflowID, runID) } return nil @@ -553,34 +557,35 @@ func (h *handlerImpl) RespondActivityTaskCanceled( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "") + return h.error(errDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "") + return h.error(errHistoryHostThrottle, scope, domainID, "", "") } cancelRequest := wrappedRequest.CancelRequest token, err0 := h.tokenSerializer.Deserialize(cancelRequest.TaskToken) if err0 != nil { err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)} - return h.error(err0, scope, domainID, "") + return h.error(err0, scope, domainID, "", "") } err0 = validateTaskToken(token) if err0 != nil { - return h.error(err0, scope, domainID, "") + return h.error(err0, scope, domainID, "", "") } workflowID := token.WorkflowID + runID := token.RunID engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return h.error(err1, scope, domainID, workflowID) + return h.error(err1, scope, domainID, workflowID, runID) } err2 := engine.RespondActivityTaskCanceled(ctx, wrappedRequest) if err2 != nil { - return h.error(err2, scope, domainID, workflowID) + return h.error(err2, scope, domainID, workflowID, runID) } return nil @@ -600,11 +605,11 @@ func (h *handlerImpl) RespondDecisionTaskCompleted( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "") + return nil, h.error(errDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "") + return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") } completeRequest := wrappedRequest.CompleteRequest @@ -614,7 +619,7 @@ func (h *handlerImpl) RespondDecisionTaskCompleted( token, err0 := h.tokenSerializer.Deserialize(completeRequest.TaskToken) if err0 != nil { err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)} - return nil, h.error(err0, scope, domainID, "") + return nil, h.error(err0, scope, domainID, "", "") } h.GetLogger().Debug(fmt.Sprintf("RespondDecisionTaskCompleted. DomainID: %v, WorkflowID: %v, RunID: %v, ScheduleID: %v", @@ -625,18 +630,19 @@ func (h *handlerImpl) RespondDecisionTaskCompleted( err0 = validateTaskToken(token) if err0 != nil { - return nil, h.error(err0, scope, domainID, "") + return nil, h.error(err0, scope, domainID, "", "") } workflowID := token.WorkflowID + runID := token.RunID engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return nil, h.error(err1, scope, domainID, workflowID) + return nil, h.error(err1, scope, domainID, workflowID, runID) } response, err2 := engine.RespondDecisionTaskCompleted(ctx, wrappedRequest) if err2 != nil { - return nil, h.error(err2, scope, domainID, workflowID) + return nil, h.error(err2, scope, domainID, workflowID, runID) } return response, nil @@ -656,18 +662,18 @@ func (h *handlerImpl) RespondDecisionTaskFailed( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "") + return h.error(errDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "") + return h.error(errHistoryHostThrottle, scope, domainID, "", "") } failedRequest := wrappedRequest.FailedRequest token, err0 := h.tokenSerializer.Deserialize(failedRequest.TaskToken) if err0 != nil { err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)} - return h.error(err0, scope, domainID, "") + return h.error(err0, scope, domainID, "", "") } h.GetLogger().Debug(fmt.Sprintf("RespondDecisionTaskFailed. DomainID: %v, WorkflowID: %v, RunID: %v, ScheduleID: %v", @@ -691,18 +697,19 @@ func (h *handlerImpl) RespondDecisionTaskFailed( } err0 = validateTaskToken(token) if err0 != nil { - return h.error(err0, scope, domainID, "") + return h.error(err0, scope, domainID, "", "") } workflowID := token.WorkflowID + runID := token.RunID engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return h.error(err1, scope, domainID, workflowID) + return h.error(err1, scope, domainID, workflowID, runID) } err2 := engine.RespondDecisionTaskFailed(ctx, wrappedRequest) if err2 != nil { - return h.error(err2, scope, domainID, workflowID) + return h.error(err2, scope, domainID, workflowID, runID) } return nil @@ -722,23 +729,24 @@ func (h *handlerImpl) StartWorkflowExecution( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "") + return nil, h.error(errDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "") + return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") } startRequest := wrappedRequest.StartRequest workflowID := startRequest.GetWorkflowID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return nil, h.error(err1, scope, domainID, workflowID) + return nil, h.error(err1, scope, domainID, workflowID, "") } response, err2 := engine.StartWorkflowExecution(ctx, wrappedRequest) + runID := response.GetRunID() if err2 != nil { - return nil, h.error(err2, scope, domainID, workflowID) + return nil, h.error(err2, scope, domainID, workflowID, runID) } return response, nil @@ -834,7 +842,7 @@ func (h *handlerImpl) ResetQueue( engine, err := h.controller.GetEngineForShard(int(request.GetShardID())) if err != nil { - return h.error(err, scope, "", "") + return h.error(err, scope, "", "", "") } switch taskType := common.TaskType(request.GetType()); taskType { @@ -849,7 +857,7 @@ func (h *handlerImpl) ResetQueue( } if err != nil { - return h.error(err, scope, "", "") + return h.error(err, scope, "", "", "") } return nil } @@ -868,7 +876,7 @@ func (h *handlerImpl) DescribeQueue( engine, err := h.controller.GetEngineForShard(int(request.GetShardID())) if err != nil { - return nil, h.error(err, scope, "", "") + return nil, h.error(err, scope, "", "", "") } switch taskType := common.TaskType(request.GetType()); taskType { @@ -883,7 +891,7 @@ func (h *handlerImpl) DescribeQueue( } if err != nil { - return nil, h.error(err, scope, "", "") + return nil, h.error(err, scope, "", "", "") } return resp, nil } @@ -902,19 +910,20 @@ func (h *handlerImpl) DescribeMutableState( domainID := request.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "") + return nil, h.error(errDomainNotSet, scope, domainID, "", "") } workflowExecution := request.Execution workflowID := workflowExecution.GetWorkflowID() + runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return nil, h.error(err1, scope, domainID, workflowID) + return nil, h.error(err1, scope, domainID, workflowID, runID) } resp, err2 := engine.DescribeMutableState(ctx, request) if err2 != nil { - return nil, h.error(err2, scope, domainID, workflowID) + return nil, h.error(err2, scope, domainID, workflowID, runID) } return resp, nil } @@ -933,23 +942,24 @@ func (h *handlerImpl) GetMutableState( domainID := getRequest.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "") + return nil, h.error(errDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "") + return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") } workflowExecution := getRequest.Execution workflowID := workflowExecution.GetWorkflowID() + runID := workflowExecution.GetWorkflowID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return nil, h.error(err1, scope, domainID, workflowID) + return nil, h.error(err1, scope, domainID, workflowID, runID) } resp, err2 := engine.GetMutableState(ctx, getRequest) if err2 != nil { - return nil, h.error(err2, scope, domainID, workflowID) + return nil, h.error(err2, scope, domainID, workflowID, runID) } return resp, nil } @@ -968,23 +978,24 @@ func (h *handlerImpl) PollMutableState( domainID := getRequest.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "") + return nil, h.error(errDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "") + return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") } workflowExecution := getRequest.Execution workflowID := workflowExecution.GetWorkflowID() + runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return nil, h.error(err1, scope, domainID, workflowID) + return nil, h.error(err1, scope, domainID, workflowID, runID) } resp, err2 := engine.PollMutableState(ctx, getRequest) if err2 != nil { - return nil, h.error(err2, scope, domainID, workflowID) + return nil, h.error(err2, scope, domainID, workflowID, runID) } return resp, nil } @@ -1003,23 +1014,24 @@ func (h *handlerImpl) DescribeWorkflowExecution( domainID := request.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "") + return nil, h.error(errDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "") + return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") } workflowExecution := request.Request.Execution workflowID := workflowExecution.GetWorkflowID() + runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return nil, h.error(err1, scope, domainID, workflowID) + return nil, h.error(err1, scope, domainID, workflowID, runID) } resp, err2 := engine.DescribeWorkflowExecution(ctx, request) if err2 != nil { - return nil, h.error(err2, scope, domainID, workflowID) + return nil, h.error(err2, scope, domainID, workflowID, runID) } return resp, nil } @@ -1042,11 +1054,11 @@ func (h *handlerImpl) RequestCancelWorkflowExecution( domainID := request.GetDomainUUID() if domainID == "" || request.CancelRequest.GetDomain() == "" { - return h.error(errDomainNotSet, scope, domainID, "") + return h.error(errDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "") + return h.error(errHistoryHostThrottle, scope, domainID, "", "") } cancelRequest := request.CancelRequest @@ -1057,14 +1069,15 @@ func (h *handlerImpl) RequestCancelWorkflowExecution( cancelRequest.WorkflowExecution.GetRunID())) workflowID := cancelRequest.WorkflowExecution.GetWorkflowID() + runID := cancelRequest.WorkflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return h.error(err1, scope, domainID, workflowID) + return h.error(err1, scope, domainID, workflowID, runID) } err2 := engine.RequestCancelWorkflowExecution(ctx, request) if err2 != nil { - return h.error(err2, scope, domainID, workflowID) + return h.error(err2, scope, domainID, workflowID, runID) } return nil @@ -1089,23 +1102,24 @@ func (h *handlerImpl) SignalWorkflowExecution( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "") + return h.error(errDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "") + return h.error(errHistoryHostThrottle, scope, domainID, "", "") } workflowExecution := wrappedRequest.SignalRequest.WorkflowExecution workflowID := workflowExecution.GetWorkflowID() + runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return h.error(err1, scope, domainID, workflowID) + return h.error(err1, scope, domainID, workflowID, runID) } err2 := engine.SignalWorkflowExecution(ctx, wrappedRequest) if err2 != nil { - return h.error(err2, scope, domainID, workflowID) + return h.error(err2, scope, domainID, workflowID, runID) } return nil @@ -1133,18 +1147,18 @@ func (h *handlerImpl) SignalWithStartWorkflowExecution( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "") + return nil, h.error(errDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "") + return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") } signalWithStartRequest := wrappedRequest.SignalWithStartRequest workflowID := signalWithStartRequest.GetWorkflowID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return nil, h.error(err1, scope, domainID, workflowID) + return nil, h.error(err1, scope, domainID, workflowID, "") } resp, err2 := engine.SignalWithStartWorkflowExecution(ctx, wrappedRequest) @@ -1161,12 +1175,12 @@ func (h *handlerImpl) SignalWithStartWorkflowExecution( var e1 *persistence.WorkflowExecutionAlreadyStartedError var e2 *persistence.CurrentWorkflowConditionFailedError if !errors.As(err2, &e1) && !errors.As(err2, &e2) { - return nil, h.error(err2, scope, domainID, workflowID) + return nil, h.error(err2, scope, domainID, workflowID, resp.GetRunID()) } resp, err2 = engine.SignalWithStartWorkflowExecution(ctx, wrappedRequest) if err2 != nil { - return nil, h.error(err2, scope, domainID, workflowID) + return nil, h.error(err2, scope, domainID, workflowID, resp.GetRunID()) } return resp, nil } @@ -1190,23 +1204,24 @@ func (h *handlerImpl) RemoveSignalMutableState( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "") + return h.error(errDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "") + return h.error(errHistoryHostThrottle, scope, domainID, "", "") } workflowExecution := wrappedRequest.WorkflowExecution workflowID := workflowExecution.GetWorkflowID() + runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return h.error(err1, scope, domainID, workflowID) + return h.error(err1, scope, domainID, workflowID, runID) } err2 := engine.RemoveSignalMutableState(ctx, wrappedRequest) if err2 != nil { - return h.error(err2, scope, domainID, workflowID) + return h.error(err2, scope, domainID, workflowID, runID) } return nil @@ -1231,23 +1246,24 @@ func (h *handlerImpl) TerminateWorkflowExecution( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "") + return h.error(errDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "") + return h.error(errHistoryHostThrottle, scope, domainID, "", "") } workflowExecution := wrappedRequest.TerminateRequest.WorkflowExecution workflowID := workflowExecution.GetWorkflowID() + runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return h.error(err1, scope, domainID, workflowID) + return h.error(err1, scope, domainID, workflowID, runID) } err2 := engine.TerminateWorkflowExecution(ctx, wrappedRequest) if err2 != nil { - return h.error(err2, scope, domainID, workflowID) + return h.error(err2, scope, domainID, workflowID, runID) } return nil @@ -1272,23 +1288,24 @@ func (h *handlerImpl) ResetWorkflowExecution( domainID := wrappedRequest.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "") + return nil, h.error(errDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "") + return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") } workflowExecution := wrappedRequest.ResetRequest.WorkflowExecution workflowID := workflowExecution.GetWorkflowID() + runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return nil, h.error(err1, scope, domainID, workflowID) + return nil, h.error(err1, scope, domainID, workflowID, runID) } resp, err2 := engine.ResetWorkflowExecution(ctx, wrappedRequest) if err2 != nil { - return nil, h.error(err2, scope, domainID, workflowID) + return nil, h.error(err2, scope, domainID, workflowID, runID) } return resp, nil @@ -1311,22 +1328,23 @@ func (h *handlerImpl) QueryWorkflow( domainID := request.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "") + return nil, h.error(errDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "") + return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") } workflowID := request.GetRequest().GetExecution().GetWorkflowID() + runID := request.GetRequest().GetExecution().GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return nil, h.error(err1, scope, domainID, workflowID) + return nil, h.error(err1, scope, domainID, workflowID, runID) } resp, err2 := engine.QueryWorkflow(ctx, request) if err2 != nil { - return nil, h.error(err2, scope, domainID, workflowID) + return nil, h.error(err2, scope, domainID, workflowID, runID) } return resp, nil @@ -1353,27 +1371,28 @@ func (h *handlerImpl) ScheduleDecisionTask( domainID := request.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "") + return h.error(errDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "") + return h.error(errHistoryHostThrottle, scope, domainID, "", "") } if request.WorkflowExecution == nil { - return h.error(errWorkflowExecutionNotSet, scope, domainID, "") + return h.error(errWorkflowExecutionNotSet, scope, domainID, "", "") } workflowExecution := request.WorkflowExecution workflowID := workflowExecution.GetWorkflowID() + runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return h.error(err1, scope, domainID, workflowID) + return h.error(err1, scope, domainID, workflowID, runID) } err2 := engine.ScheduleDecisionTask(ctx, request) if err2 != nil { - return h.error(err2, scope, domainID, workflowID) + return h.error(err2, scope, domainID, workflowID, runID) } return nil @@ -1398,27 +1417,28 @@ func (h *handlerImpl) RecordChildExecutionCompleted( domainID := request.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "") + return h.error(errDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "") + return h.error(errHistoryHostThrottle, scope, domainID, "", "") } if request.WorkflowExecution == nil { - return h.error(errWorkflowExecutionNotSet, scope, domainID, "") + return h.error(errWorkflowExecutionNotSet, scope, domainID, "", "") } workflowExecution := request.WorkflowExecution workflowID := workflowExecution.GetWorkflowID() + runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return h.error(err1, scope, domainID, workflowID) + return h.error(err1, scope, domainID, workflowID, runID) } err2 := engine.RecordChildExecutionCompleted(ctx, request) if err2 != nil { - return h.error(err2, scope, domainID, workflowID) + return h.error(err2, scope, domainID, workflowID, runID) } return nil @@ -1448,22 +1468,23 @@ func (h *handlerImpl) ResetStickyTaskList( domainID := resetRequest.GetDomainUUID() if domainID == "" { - return nil, h.error(errDomainNotSet, scope, domainID, "") + return nil, h.error(errDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return nil, h.error(errHistoryHostThrottle, scope, domainID, "") + return nil, h.error(errHistoryHostThrottle, scope, domainID, "", "") } workflowID := resetRequest.Execution.GetWorkflowID() + runID := resetRequest.Execution.GetRunID() engine, err := h.controller.GetEngine(workflowID) if err != nil { - return nil, h.error(err, scope, domainID, workflowID) + return nil, h.error(err, scope, domainID, workflowID, runID) } resp, err = engine.ResetStickyTaskList(ctx, resetRequest) if err != nil { - return nil, h.error(err, scope, domainID, workflowID) + return nil, h.error(err, scope, domainID, workflowID, runID) } return resp, nil @@ -1487,23 +1508,24 @@ func (h *handlerImpl) ReplicateEventsV2( domainID := replicateRequest.GetDomainUUID() if domainID == "" { - return h.error(errDomainNotSet, scope, domainID, "") + return h.error(errDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "") + return h.error(errHistoryHostThrottle, scope, domainID, "", "") } workflowExecution := replicateRequest.WorkflowExecution workflowID := workflowExecution.GetWorkflowID() + runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { - return h.error(err1, scope, domainID, workflowID) + return h.error(err1, scope, domainID, workflowID, runID) } err2 := engine.ReplicateEventsV2(ctx, replicateRequest) if err2 != nil { - return h.error(err2, scope, domainID, workflowID) + return h.error(err2, scope, domainID, workflowID, runID) } return nil @@ -1526,26 +1548,26 @@ func (h *handlerImpl) SyncShardStatus( } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, "", "") + return h.error(errHistoryHostThrottle, scope, "", "", "") } if syncShardStatusRequest.SourceCluster == "" { - return h.error(errSourceClusterNotSet, scope, "", "") + return h.error(errSourceClusterNotSet, scope, "", "", "") } if syncShardStatusRequest.Timestamp == nil { - return h.error(errTimestampNotSet, scope, "", "") + return h.error(errTimestampNotSet, scope, "", "", "") } // shard ID is already provided in the request engine, err := h.controller.GetEngineForShard(int(syncShardStatusRequest.GetShardID())) if err != nil { - return h.error(err, scope, "", "") + return h.error(err, scope, "", "", "") } err = engine.SyncShardStatus(ctx, syncShardStatusRequest) if err != nil { - return h.error(err, scope, "", "") + return h.error(err, scope, "", "", "") } return nil @@ -1569,30 +1591,31 @@ func (h *handlerImpl) SyncActivity( domainID := syncActivityRequest.GetDomainID() if syncActivityRequest.DomainID == "" || uuid.Parse(syncActivityRequest.GetDomainID()) == nil { - return h.error(errDomainNotSet, scope, domainID, "") + return h.error(errDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { - return h.error(errHistoryHostThrottle, scope, domainID, "") + return h.error(errHistoryHostThrottle, scope, domainID, "", "") } if syncActivityRequest.WorkflowID == "" { - return h.error(errWorkflowIDNotSet, scope, domainID, "") + return h.error(errWorkflowIDNotSet, scope, domainID, "", "") } if syncActivityRequest.RunID == "" || uuid.Parse(syncActivityRequest.GetRunID()) == nil { - return h.error(errRunIDNotValid, scope, domainID, "") + return h.error(errRunIDNotValid, scope, domainID, "", "") } workflowID := syncActivityRequest.GetWorkflowID() + runID := syncActivityRequest.GetRunID() engine, err := h.controller.GetEngine(workflowID) if err != nil { - return h.error(err, scope, domainID, workflowID) + return h.error(err, scope, domainID, workflowID, runID) } err = engine.SyncActivity(ctx, syncActivityRequest) if err != nil { - return h.error(err, scope, domainID, workflowID) + return h.error(err, scope, domainID, workflowID, runID) } return nil @@ -1767,9 +1790,10 @@ func (h *handlerImpl) ReapplyEvents( domainID := request.GetDomainUUID() workflowID := request.GetRequest().GetWorkflowExecution().GetWorkflowID() + runID := request.GetRequest().GetWorkflowExecution().GetRunID() engine, err := h.controller.GetEngine(workflowID) if err != nil { - return h.error(err, scope, domainID, workflowID) + return h.error(err, scope, domainID, workflowID, runID) } // deserialize history event object historyEvents, err := h.GetPayloadSerializer().DeserializeBatchEvents(&persistence.DataBlob{ @@ -1777,7 +1801,7 @@ func (h *handlerImpl) ReapplyEvents( Data: request.GetRequest().GetEvents().GetData(), }) if err != nil { - return h.error(err, scope, domainID, workflowID) + return h.error(err, scope, domainID, workflowID, runID) } execution := request.GetRequest().GetWorkflowExecution() @@ -1788,7 +1812,7 @@ func (h *handlerImpl) ReapplyEvents( execution.GetRunID(), historyEvents, ); err != nil { - return h.error(err, scope, domainID, workflowID) + return h.error(err, scope, domainID, workflowID, runID) } return nil } @@ -1836,7 +1860,7 @@ func (h *handlerImpl) CountDLQMessages( } err := g.Wait() - return &types.HistoryCountDLQMessagesResponse{Entries: entries}, h.error(err, scope, "", "") + return &types.HistoryCountDLQMessagesResponse{Entries: entries}, h.error(err, scope, "", "", "") } // ReadDLQMessages reads replication DLQ messages @@ -1857,7 +1881,7 @@ func (h *handlerImpl) ReadDLQMessages( engine, err := h.controller.GetEngineForShard(int(request.GetShardID())) if err != nil { - return nil, h.error(err, scope, "", "") + return nil, h.error(err, scope, "", "", "") } return engine.ReadDLQMessages(ctx, request) @@ -1881,7 +1905,7 @@ func (h *handlerImpl) PurgeDLQMessages( engine, err := h.controller.GetEngineForShard(int(request.GetShardID())) if err != nil { - return h.error(err, scope, "", "") + return h.error(err, scope, "", "", "") } return engine.PurgeDLQMessages(ctx, request) @@ -1905,7 +1929,7 @@ func (h *handlerImpl) MergeDLQMessages( engine, err := h.controller.GetEngineForShard(int(request.GetShardID())) if err != nil { - return nil, h.error(err, scope, "", "") + return nil, h.error(err, scope, "", "", "") } return engine.MergeDLQMessages(ctx, request) @@ -1926,9 +1950,10 @@ func (h *handlerImpl) RefreshWorkflowTasks( domainID := request.DomainUIID execution := request.GetRequest().GetExecution() workflowID := execution.GetWorkflowID() + runID := execution.GetWorkflowID() engine, err := h.controller.GetEngine(workflowID) if err != nil { - return h.error(err, scope, domainID, workflowID) + return h.error(err, scope, domainID, workflowID, runID) } err = engine.RefreshWorkflowTasks( @@ -1941,7 +1966,7 @@ func (h *handlerImpl) RefreshWorkflowTasks( ) if err != nil { - return h.error(err, scope, domainID, workflowID) + return h.error(err, scope, domainID, workflowID, runID) } return nil @@ -2043,12 +2068,12 @@ func (h *handlerImpl) RespondCrossClusterTasksCompleted( engine, err := h.controller.GetEngineForShard(int(request.GetShardID())) if err != nil { - return nil, h.error(err, scope, "", "") + return nil, h.error(err, scope, "", "", "") } err = engine.RespondCrossClusterTasksCompleted(ctx, request.TargetCluster, request.TaskResponses) if err != nil { - return nil, h.error(err, scope, "", "") + return nil, h.error(err, scope, "", "", "") } response := &types.RespondCrossClusterTasksCompletedResponse{} @@ -2058,7 +2083,7 @@ func (h *handlerImpl) RespondCrossClusterTasksCompleted( response.Tasks, err = engine.GetCrossClusterTasks(fetchTaskCtx, request.TargetCluster) if err != nil { - return nil, h.error(err, scope, "", "") + return nil, h.error(err, scope, "", "", "") } } return response, nil @@ -2080,7 +2105,7 @@ func (h *handlerImpl) GetFailoverInfo( resp, err := h.failoverCoordinator.GetFailoverInfo(request.GetDomainID()) if err != nil { - return nil, h.error(err, scope, request.GetDomainID(), "") + return nil, h.error(err, scope, request.GetDomainID(), "", "") } return resp, nil } @@ -2112,51 +2137,74 @@ func (h *handlerImpl) updateErrorMetric( scope metrics.Scope, domainID string, workflowID string, + runID string, err error, ) { - if err == context.DeadlineExceeded || err == context.Canceled { + var yarpcE *yarpcerrors.Status + + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { scope.IncCounter(metrics.CadenceErrContextTimeoutCounter) return } - switch err := err.(type) { - case *types.ShardOwnershipLostError: + if errors.Is(err, types.ShardOwnershipLostError{}) { scope.IncCounter(metrics.CadenceErrShardOwnershipLostCounter) - case *types.EventAlreadyStartedError: + + } else if errors.Is(err, types.EventAlreadyStartedError{}) { scope.IncCounter(metrics.CadenceErrEventAlreadyStartedCounter) - case *types.BadRequestError: + + } else if errors.Is(err, types.BadRequestError{}) { scope.IncCounter(metrics.CadenceErrBadRequestCounter) - case *types.DomainNotActiveError: + + } else if errors.Is(err, types.DomainNotActiveError{}) { scope.IncCounter(metrics.CadenceErrBadRequestCounter) - case *types.WorkflowExecutionAlreadyStartedError: + + } else if errors.Is(err, types.WorkflowExecutionAlreadyStartedError{}) { scope.IncCounter(metrics.CadenceErrExecutionAlreadyStartedCounter) - case *types.EntityNotExistsError: + + } else if errors.Is(err, types.EntityNotExistsError{}) { scope.IncCounter(metrics.CadenceErrEntityNotExistsCounter) - case *types.WorkflowExecutionAlreadyCompletedError: + + } else if errors.Is(err, types.WorkflowExecutionAlreadyCompletedError{}) { scope.IncCounter(metrics.CadenceErrWorkflowExecutionAlreadyCompletedCounter) - case *types.CancellationAlreadyRequestedError: + + } else if errors.Is(err, types.CancellationAlreadyRequestedError{}) { scope.IncCounter(metrics.CadenceErrCancellationAlreadyRequestedCounter) - case *types.LimitExceededError: + + } else if errors.Is(err, types.LimitExceededError{}) { scope.IncCounter(metrics.CadenceErrLimitExceededCounter) - case *types.RetryTaskV2Error: + + } else if errors.Is(err, types.RetryTaskV2Error{}) { scope.IncCounter(metrics.CadenceErrRetryTaskCounter) - case *types.ServiceBusyError: + + } else if errors.Is(err, types.ServiceBusyError{}) { scope.IncCounter(metrics.CadenceErrServiceBusyCounter) - case *yarpcerrors.Status: - if err.Code() == yarpcerrors.CodeDeadlineExceeded { + + } else if errors.As(err, &yarpcE) { + + if yarpcE.Code() == yarpcerrors.CodeDeadlineExceeded { scope.IncCounter(metrics.CadenceErrContextTimeoutCounter) } scope.IncCounter(metrics.CadenceFailures) - case *types.InternalServiceError: + + } else if errors.Is(err, types.InternalServiceError{}) { scope.IncCounter(metrics.CadenceFailures) + h.GetLogger().Error("Internal service error", tag.Error(err), tag.WorkflowID(workflowID), + tag.WorkflowRunID(runID), tag.WorkflowDomainID(domainID)) - default: + + } else { + // Default / unknown error fallback scope.IncCounter(metrics.CadenceFailures) - h.getLoggerWithTags(domainID, workflowID).Error("Uncategorized error", tag.Error(err)) + h.GetLogger().Error("Uncategorized error", + tag.Error(err), + tag.WorkflowID(workflowID), + tag.WorkflowRunID(runID), + tag.WorkflowDomainID(domainID)) } } @@ -2165,10 +2213,11 @@ func (h *handlerImpl) error( scope metrics.Scope, domainID string, workflowID string, + runID string, ) error { - err = h.convertError(err) - h.updateErrorMetric(scope, domainID, workflowID, err) + + h.updateErrorMetric(scope, domainID, workflowID, runID, err) if errors.Is(err, workflow.ErrMaxAttemptsExceeded) { // Calling the dummy Workflow Check from task Validator. This is an ongoing project where we plan to do some validations on // the following workflow. Based on the validations (is the workflow stale? does the workflow come from a deprecated domain?) @@ -2180,23 +2229,6 @@ func (h *handlerImpl) error( return err } -func (h *handlerImpl) getLoggerWithTags( - domainID string, - workflowID string, -) log.Logger { - - logger := h.GetLogger() - if domainID != "" { - logger = logger.WithTags(tag.WorkflowDomainID(domainID)) - } - - if workflowID != "" { - logger = logger.WithTags(tag.WorkflowID(workflowID)) - } - - return logger -} - func (h *handlerImpl) emitInfoOrDebugLog( domainID string, msg string,