Skip to content

Commit

Permalink
Allow context disable frontend redirection (#4547)
Browse files Browse the repository at this point in the history
* Add RPC header xdc-redirection, allowing disable DC redirection, default to redirect
  • Loading branch information
wxing1292 committed Jul 14, 2023
1 parent 4c39a7e commit 077cc57
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 1 deletion.
30 changes: 29 additions & 1 deletion service/frontend/redirection_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ package frontend

import (
"context"
"strconv"
"time"

"go.temporal.io/api/workflowservice/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"go.temporal.io/server/client"
"go.temporal.io/server/common/clock"
Expand All @@ -42,9 +44,13 @@ import (
"go.temporal.io/server/common/rpc/interceptor"
)

var (
const (
dcRedirectionContextHeaderName = "xdc-redirection"

dcRedirectionMetricsPrefix = "DCRedirection"
)

var (
localAPIResponses = map[string]responseConstructorFn{
// Namespace APIs, namespace APIs does not require redirection
"DeprecateNamespace": func() any { return &workflowservice.DeprecateNamespaceResponse{} },
Expand Down Expand Up @@ -173,6 +179,9 @@ func (i *RedirectionInterceptor) Intercept(
if _, isWorkflowHandler := info.Server.(*WorkflowHandler); !isWorkflowHandler {
return handler(ctx, req)
}
if !i.redirectionAllowed(ctx) {
return handler(ctx, req)
}

_, methodName := interceptor.SplitMethodName(info.FullMethod)
if _, ok := localAPIResponses[methodName]; ok {
Expand Down Expand Up @@ -262,3 +271,22 @@ func (i *RedirectionInterceptor) afterCall(
metricsHandler.Counter(metrics.ClientRedirectionFailures.GetMetricName()).Record(1)
}
}

func (i *RedirectionInterceptor) redirectionAllowed(
ctx context.Context,
) bool {
// default to allow dc redirection
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return true
}
values := md.Get(dcRedirectionContextHeaderName)
if len(values) == 0 {
return true
}
allowed, err := strconv.ParseBool(values[0])
if err != nil {
return true
}
return allowed
}
31 changes: 31 additions & 0 deletions service/frontend/redirection_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/client"
Expand Down Expand Up @@ -328,6 +329,36 @@ func (s *redirectionInterceptorSuite) TestHandleGlobalAPIInvocation_NamespaceNot
s.IsType(&serviceerror.NamespaceNotFound{}, err)
}

func (s *redirectionInterceptorSuite) TestRedirectionAllowed_Empty() {
ctx := metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{}))
allowed := s.redirector.redirectionAllowed(ctx)
s.True(allowed)
}

func (s *redirectionInterceptorSuite) TestRedirectionAllowed_Error() {
ctx := metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{
dcRedirectionContextHeaderName: "?",
}))
allowed := s.redirector.redirectionAllowed(ctx)
s.True(allowed)
}

func (s *redirectionInterceptorSuite) TestRedirectionAllowed_True() {
ctx := metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{
dcRedirectionContextHeaderName: "t",
}))
allowed := s.redirector.redirectionAllowed(ctx)
s.True(allowed)
}

func (s *redirectionInterceptorSuite) TestRedirectionAllowed_False() {
ctx := metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{
dcRedirectionContextHeaderName: "f",
}))
allowed := s.redirector.redirectionAllowed(ctx)
s.False(allowed)
}

type (
mockClientConnInterface struct {
*suite.Suite
Expand Down

0 comments on commit 077cc57

Please sign in to comment.