Skip to content

Commit

Permalink
[shard-distributor] Added logic to return the shard owner based on th…
Browse files Browse the repository at this point in the history
…e namesplace (#6564)

What changed?

Implemented the current sharding strategy in the shard-distributor

Added a namespace not found error

Why?
This is the implementation of the current strategy, so we can start
centralizing the shard management in this service

How did you test it?
Unit test

Potential risks

Release notes

Documentation Changes
  • Loading branch information
jakobht authored Dec 18, 2024
1 parent 2bd1e5d commit f4e219a
Show file tree
Hide file tree
Showing 10 changed files with 451 additions and 26 deletions.
200 changes: 191 additions & 9 deletions .gen/proto/sharddistributor/v1/service.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 17 additions & 16 deletions .gen/proto/sharddistributor/v1/service.pb.yarpc.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions common/types/mapper/proto/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.uber.org/yarpc/encoding/protobuf"
"go.uber.org/yarpc/yarpcerrors"

sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
sharedv1 "github.com/uber/cadence/.gen/proto/shared/v1"
cadence_errors "github.com/uber/cadence/common/errors"
"github.com/uber/cadence/common/types"
Expand Down Expand Up @@ -86,6 +87,8 @@ func FromError(err error) error {
return typedErr
} else if ok, typedErr = errorutils.ConvertError(err, fromStickyWorkerUnavailableErr); ok {
return typedErr
} else if ok, typedErr = errorutils.ConvertError(err, fromNamespaceNotFoundErr); ok {
return typedErr
}

return protobuf.NewError(yarpcerrors.CodeUnknown, err.Error())
Expand Down Expand Up @@ -118,6 +121,10 @@ func ToError(err error) error {
return &types.WorkflowExecutionAlreadyCompletedError{
Message: status.Message(),
}
case *sharddistributorv1.NamespaceNotFoundError:
return &types.NamespaceNotFoundError{
Namespace: details.Namespace,
}
}
case yarpcerrors.CodeInvalidArgument:
switch getErrorDetails(err).(type) {
Expand Down Expand Up @@ -363,3 +370,9 @@ func fromRemoteSyncMatchedErr(e *types.RemoteSyncMatchedError) error {
func fromStickyWorkerUnavailableErr(e *types.StickyWorkerUnavailableError) error {
return protobuf.NewError(yarpcerrors.CodeUnavailable, e.Message, protobuf.WithErrorDetails(&apiv1.StickyWorkerUnavailableError{}))
}

func fromNamespaceNotFoundErr(e *types.NamespaceNotFoundError) error {
return protobuf.NewError(yarpcerrors.CodeNotFound, e.Error(), protobuf.WithErrorDetails(&sharddistributorv1.NamespaceNotFoundError{
Namespace: e.Namespace,
}))
}
13 changes: 13 additions & 0 deletions common/types/sharddistributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

package types

import "fmt"

type GetShardOwnerRequest struct {
ShardKey string
Namespace string
Expand Down Expand Up @@ -59,3 +61,14 @@ func (v *GetShardOwnerResponse) GetNamespace() (o string) {
}
return
}

type NamespaceNotFoundError struct {
Namespace string
}

func (n *NamespaceNotFoundError) Error() (o string) {
if n != nil {
return fmt.Sprintf("namespace not found %v", n.Namespace)
}
return
}
1 change: 1 addition & 0 deletions common/types/testdata/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const (
ClientLibraryVersion = "ClientLibraryVersion"
SupportedVersions = "SupportedVersions"
FeatureFlag = "FeatureFlag"
Namespace = "Namespace"

Attempt = 2
ScheduleID = 5
Expand Down
4 changes: 4 additions & 0 deletions common/types/testdata/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ var (
MyIdentity: HostName2,
TasklistName: TaskListName,
}
NamespaceNotFoundError = types.NamespaceNotFoundError{
Namespace: Namespace,
}
)

var Errors = []error{
Expand All @@ -143,4 +146,5 @@ var Errors = []error{
&WorkflowExecutionAlreadyStartedError,
&StickyWorkerUnavailableError,
&TaskListNotOwnedByHostError,
&NamespaceNotFoundError,
}
4 changes: 4 additions & 0 deletions proto/internal/uber/cadence/sharddistributor/v1/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,7 @@ message GetShardOwnerResponse {
string owner = 1;
string namespace = 2;
}

message NamespaceNotFoundError {
string namespace = 1;
}
Loading

0 comments on commit f4e219a

Please sign in to comment.