Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean namespace handover #3692

Merged
merged 8 commits into from
Dec 21, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1430,6 +1430,7 @@ var (
TaskWorkflowBusyCounter = NewCounterDef("task_errors_workflow_busy")
TaskNotActiveCounter = NewCounterDef("task_errors_not_active_counter")
TaskLimitExceededCounter = NewCounterDef("task_errors_limit_exceeded_counter")
TaskNamespaceHandoverCounter = NewCounterDef("task_errors_namespace_handover")
TaskScheduleToStartLatency = NewTimerDef("task_schedule_to_start_latency")
TransferTaskMissingEventCounter = NewCounterDef("transfer_task_missing_event_counter")
TaskBatchCompleteCounter = NewCounterDef("task_batch_complete_counter")
Expand Down
14 changes: 6 additions & 8 deletions common/rpc/interceptor/namespace_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ package interceptor

import (
"context"
"fmt"

enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
Expand All @@ -49,9 +48,8 @@ type (
)

var (
ErrNamespaceNotSet = serviceerror.NewInvalidArgument("Namespace not set on request.")
errNamespaceNotSet = serviceerror.NewInvalidArgument("Namespace not set on request.")
errNamespaceTooLong = serviceerror.NewInvalidArgument("Namespace length exceeds limit.")
errNamespaceHandover = serviceerror.NewUnavailable(fmt.Sprintf("Namespace replication in %s state.", enumspb.REPLICATION_STATE_HANDOVER.String()))
errTaskTokenNotSet = serviceerror.NewInvalidArgument("Task token not set on request.")
errTaskTokenNamespaceMismatch = serviceerror.NewInvalidArgument("Operation requested with a token from a different namespace.")

Expand Down Expand Up @@ -170,20 +168,20 @@ func (ni *NamespaceValidatorInterceptor) extractNamespaceFromRequest(req interfa
// Special case for DescribeNamespace API which should read namespace directly from database.
// Therefore, it must bypass namespace registry and validator.
if request.GetId() == "" && namespaceName.IsEmpty() {
return nil, ErrNamespaceNotSet
return nil, errNamespaceNotSet
}
return nil, nil
case *workflowservice.RegisterNamespaceRequest:
// Special case for RegisterNamespace API. `namespaceName` is name of namespace that about to be registered.
// There is no namespace entry for it, therefore, it must bypass namespace registry and validator.
if namespaceName.IsEmpty() {
return nil, ErrNamespaceNotSet
return nil, errNamespaceNotSet
}
return nil, nil
default:
// All other APIs.
if namespaceName.IsEmpty() {
return nil, ErrNamespaceNotSet
return nil, errNamespaceNotSet
}
return ni.namespaceRegistry.GetNamespace(namespaceName)
}
Expand Down Expand Up @@ -215,7 +213,7 @@ func (ni *NamespaceValidatorInterceptor) extractNamespaceFromTaskToken(req inter
}

if namespaceID.IsEmpty() {
return nil, ErrNamespaceNotSet
return nil, errNamespaceNotSet
}
return ni.namespaceRegistry.GetNamespaceByID(namespaceID)
}
Expand Down Expand Up @@ -266,5 +264,5 @@ func (ni *NamespaceValidatorInterceptor) checkReplicationState(namespaceEntry *n
return nil
}

return errNamespaceHandover
return common.ErrNamespaceHandover
}
6 changes: 3 additions & 3 deletions common/rpc/interceptor/namespace_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,14 @@ func (s *namespaceValidatorSuite) Test_StateValidationIntercept_StatusFromNamesp
{
state: enumspb.NAMESPACE_STATE_REGISTERED,
replicationState: enumspb.REPLICATION_STATE_HANDOVER,
expectedErr: errNamespaceHandover,
expectedErr: common.ErrNamespaceHandover,
method: "/temporal/StartWorkflowExecution",
req: &workflowservice.StartWorkflowExecutionRequest{Namespace: "test-namespace"},
},
// DescribeNamespace
{
state: enumspb.NAMESPACE_STATE_UNSPECIFIED,
expectedErr: ErrNamespaceNotSet,
expectedErr: errNamespaceNotSet,
method: "/temporal/DescribeNamespace",
req: &workflowservice.DescribeNamespaceRequest{},
},
Expand All @@ -232,7 +232,7 @@ func (s *namespaceValidatorSuite) Test_StateValidationIntercept_StatusFromNamesp
},
{
state: enumspb.NAMESPACE_STATE_UNSPECIFIED,
expectedErr: ErrNamespaceNotSet,
expectedErr: errNamespaceNotSet,
method: "/temporal/RegisterNamespace",
req: &workflowservice.RegisterNamespaceRequest{},
},
Expand Down
9 changes: 9 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ var (
ErrContextTimeoutNotSet = serviceerror.NewInvalidArgument("Context timeout is not set.")
)

var (
// ErrNamespaceHandover is error indicating namespace is in handover state and cannot process request.
ErrNamespaceHandover = serviceerror.NewUnavailable(fmt.Sprintf("Namespace replication in %s state.", enumspb.REPLICATION_STATE_HANDOVER.String()))
)

// AwaitWaitGroup calls Wait on the given wait
// Returns true if the Wait() call succeeded before the timeout
// Returns false if the Wait() did not return before the timeout
Expand Down Expand Up @@ -336,6 +341,10 @@ func IsServiceClientTransientError(err error) bool {
}

func IsServiceHandlerRetryableError(err error) bool {
if err.Error() == ErrNamespaceHandover.Error() {
return false
}

switch err.(type) {
case *serviceerror.Internal,
*serviceerror.Unavailable:
Expand Down
2 changes: 2 additions & 0 deletions service/history/consts/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ var (
ErrWorkflowNotReady = serviceerror.NewWorkflowNotReady("Workflow state is not ready to handle the request.")
// ErrWorkflowTaskNotScheduled is error indicating workflow task is not scheduled yet.
ErrWorkflowTaskNotScheduled = serviceerror.NewWorkflowNotReady("Workflow task is not scheduled yet.")
// ErrNamespaceHandover is error indicating namespace is in handover state and cannot process request.
ErrNamespaceHandover = common.ErrNamespaceHandover

// FailedWorkflowStatuses is a set of failed workflow close states, used for start workflow policy
// for start workflow execution API
Expand Down
3 changes: 1 addition & 2 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,7 @@ func (e *historyEngineImpl) registerNamespaceFailoverCallback() {
}

if e.shard.GetClusterMetadata().IsGlobalNamespaceEnabled() {
maxTaskID, _ := e.replicationAckMgr.GetMaxTaskInfo()
e.shard.UpdateHandoverNamespaces(nextNamespaces, maxTaskID)
e.shard.UpdateHandoverNamespaces(nextNamespaces)
}

newNotificationVersion := nextNamespaces[len(nextNamespaces)-1].NotificationVersion() + 1
Expand Down
13 changes: 13 additions & 0 deletions service/history/nDCTaskUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package history
import (
"context"

enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"

enumsspb "go.temporal.io/server/api/enums/v1"
Expand Down Expand Up @@ -210,3 +211,15 @@ func getNamespaceTagByID(

return metrics.NamespaceTag(namespaceName.String())
}

func getNamespaceTagAndReplicationStateByID(
registry namespace.Registry,
namespaceID string,
) (metrics.Tag, enumspb.ReplicationState) {
namespace, err := registry.GetNamespaceByID(namespace.ID(namespaceID))
if err != nil {
return metrics.NamespaceUnknownTag(), enumspb.REPLICATION_STATE_UNSPECIFIED
}

return metrics.NamespaceTag(namespace.Name().String()), namespace.ReplicationState()
}
23 changes: 18 additions & 5 deletions service/history/queues/executable.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,12 @@ func (e *executableImpl) HandleErr(err error) (retErr error) {
return nil
}

if err.Error() == consts.ErrNamespaceHandover.Error() {
e.taggedMetricsHandler.Counter(metrics.TaskNamespaceHandoverCounter.GetMetricName()).Record(1)
err = consts.ErrNamespaceHandover
return err
}

if _, ok := err.(*serviceerror.NamespaceNotActive); ok {
// TODO remove this error check special case after multi-cursor is enabled by default,
// since the new task life cycle will not give up until task processed / verified
Expand Down Expand Up @@ -325,7 +331,10 @@ func (e *executableImpl) IsRetryableError(err error) bool {
// ErrTaskRetry means mutable state is not ready for standby task processing
// there's no point for retrying the task immediately which will hold the worker corouinte
// TODO: change ErrTaskRetry to a better name
return err != consts.ErrTaskRetry && err != consts.ErrWorkflowBusy && err != consts.ErrDependencyTaskNotCompleted
return err != consts.ErrTaskRetry &&
err != consts.ErrWorkflowBusy &&
err != consts.ErrDependencyTaskNotCompleted &&
err != consts.ErrNamespaceHandover
}

func (e *executableImpl) RetryPolicy() backoff.RetryPolicy {
Expand Down Expand Up @@ -449,7 +458,9 @@ func (e *executableImpl) shouldResubmitOnNack(attempt int, err error) bool {
return false
}

return err != consts.ErrTaskRetry && err != consts.ErrDependencyTaskNotCompleted
return err != consts.ErrTaskRetry &&
err != consts.ErrDependencyTaskNotCompleted &&
err != consts.ErrNamespaceHandover
}

func (e *executableImpl) rescheduleTime(
Expand All @@ -459,12 +470,14 @@ func (e *executableImpl) rescheduleTime(
// elapsedTime (the first parameter in ComputeNextDelay) is not relevant here
// since reschedule policy has no expiration interval.

if err == consts.ErrTaskRetry {
if err == consts.ErrTaskRetry || err == consts.ErrNamespaceHandover {
// using a different reschedule policy to slow down retry
// as the error means mutable state is not ready to handle the task,
// as the error means mutable state or namespace is not ready to handle the task,
// need to wait for replication.
return e.timeSource.Now().Add(taskNotReadyReschedulePolicy.ComputeNextDelay(0, attempt))
} else if err == consts.ErrDependencyTaskNotCompleted {
}

if err == consts.ErrDependencyTaskNotCompleted {
return e.timeSource.Now().Add(dependencyTaskNotCompletedReschedulePolicy.ComputeNextDelay(0, attempt))
}

Expand Down
2 changes: 1 addition & 1 deletion service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ type (

GetNamespaceNotificationVersion() int64
UpdateNamespaceNotificationVersion(namespaceNotificationVersion int64) error
UpdateHandoverNamespaces(newNamespaces []*namespace.Namespace, maxRepTaskID int64)
UpdateHandoverNamespaces(newNamespaces []*namespace.Namespace)

AppendHistoryEvents(ctx context.Context, request *persistence.AppendHistoryNodesRequest, namespaceID namespace.ID, execution commonpb.WorkflowExecution) (int, error)

Expand Down
Loading