Skip to content

Commit

Permalink
Support custom yarpc peer chooser for p2p connections (#6345)
Browse files Browse the repository at this point in the history
  • Loading branch information
taylanisikdemir authored Oct 16, 2024
1 parent 100562f commit d4b375d
Show file tree
Hide file tree
Showing 26 changed files with 1,013 additions and 207 deletions.
5 changes: 2 additions & 3 deletions client/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"github.com/uber/cadence/client/wrappers/metered"
"github.com/uber/cadence/client/wrappers/thrift"
timeoutwrapper "github.com/uber/cadence/client/wrappers/timeout"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/membership"
Expand All @@ -68,7 +67,7 @@ type (
DomainIDToNameFunc func(string) (string, error)

rpcClientFactory struct {
rpcFactory common.RPCFactory
rpcFactory rpc.Factory
resolver membership.Resolver
metricsClient metrics.Client
dynConfig *dynamicconfig.Collection
Expand All @@ -79,7 +78,7 @@ type (

// NewRPCClientFactory creates an instance of client factory that knows how to dispatch RPC calls.
func NewRPCClientFactory(
rpcFactory common.RPCFactory,
rpcFactory rpc.Factory,
resolver membership.Resolver,
metricsClient metrics.Client,
dc *dynamicconfig.Collection,
Expand Down
4 changes: 2 additions & 2 deletions cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (s *server) startService() common.Daemon {

params.MetricScope = svcCfg.Metrics.NewScope(params.Logger, params.Name)

rpcParams, err := rpc.NewParams(params.Name, s.cfg, dc)
rpcParams, err := rpc.NewParams(params.Name, s.cfg, dc, params.Logger)
if err != nil {
log.Fatalf("error creating rpc factory params: %v", err)
}
Expand All @@ -170,7 +170,7 @@ func (s *server) startService() common.Daemon {
peerProvider, err := ringpopprovider.New(
params.Name,
&s.cfg.Ringpop,
rpcFactory.GetChannel(),
rpcFactory.GetTChannel(),
membership.PortMap{
membership.PortGRPC: svcCfg.RPC.GRPCPort,
membership.PortTchannel: svcCfg.RPC.Port,
Expand Down
11 changes: 11 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1588,6 +1588,12 @@ const (
// Default value: false
// Allowed filters: N/A
EnableSQLAsyncTransaction
// EnableConnectionRetainingDirectChooser is the key for enabling connection retaining direct yarpc chooser
// KeyName: system.enableConnectionRetainingDirectChooser
// Value type: Bool
// Default value: false
// Allowed filters: N/A
EnableConnectionRetainingDirectChooser

// key for frontend

Expand Down Expand Up @@ -3950,6 +3956,11 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "EnableSQLAsyncTransaction is the key for enabling async transaction",
DefaultValue: false,
},
EnableConnectionRetainingDirectChooser: {
KeyName: "system.enableConnectionRetainingDirectChooser",
Description: "EnableConnectionRetainingDirectChooser is the key for enabling connection retaining direct chooser",
DefaultValue: false,
},
EnableClientVersionCheck: {
KeyName: "frontend.enableClientVersionCheck",
Description: "EnableClientVersionCheck is enables client version check for frontend",
Expand Down
14 changes: 0 additions & 14 deletions common/rpc.go → common/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,8 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination rpc_mock.go -self_package github.com/uber/cadence/common

package common

import (
"go.uber.org/yarpc"
)

const (
// LibraryVersionHeaderName refers to the name of the
// tchannel / http header that contains the client
Expand Down Expand Up @@ -61,11 +55,3 @@ const (
// ClientIsolationGroupHeaderName refers to the name of the header that contains the isolation group which the client request is from
ClientIsolationGroupHeaderName = "cadence-client-isolation-group"
)

type (
// RPCFactory Creates a dispatcher that knows how to transport requests.
RPCFactory interface {
GetDispatcher() *yarpc.Dispatcher
GetMaxMessageSize() int
}
)
1 change: 1 addition & 0 deletions common/log/tag/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ var (
ComponentMapQ = component("mapq")
ComponentMapQTree = component("mapq-tree")
ComponentMapQTreeNode = component("mapq-tree-node")
ComponentRPCFactory = component("rpc-factory")
)

// Pre-defined values for TagSysLifecycle
Expand Down
2 changes: 1 addition & 1 deletion common/membership/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (r *ring) notifySubscribers(msg ChangedEvent) {
select {
case ch <- &msg:
default:
r.logger.Error("subscriber notification failed", tag.Name(name))
r.logger.Warn("subscriber notification failed", tag.Name(name))
}
}
}
Expand Down
1 change: 1 addition & 0 deletions common/membership/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type (
// Resolver provides membership information for all cadence services.
Resolver interface {
common.Daemon

// WhoAmI returns self host details.
// To be consistent with peer provider, it is advised to use peer provider
// to return this information
Expand Down
3 changes: 2 additions & 1 deletion common/resource/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/partition"
"github.com/uber/cadence/common/pinot"
"github.com/uber/cadence/common/rpc"
)

type (
Expand All @@ -58,7 +59,7 @@ type (

MetricScope tally.Scope
MembershipResolver membership.Resolver
RPCFactory common.RPCFactory
RPCFactory rpc.Factory
PProfInitializer common.PProfInitializer
PersistenceConfig config.Persistence
ClusterMetadata cluster.Metadata
Expand Down
4 changes: 2 additions & 2 deletions common/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import (
"github.com/uber/cadence/common/partition"
"github.com/uber/cadence/common/persistence"
persistenceClient "github.com/uber/cadence/common/persistence/client"
"github.com/uber/cadence/common/quotas/global/rpc"
qrpc "github.com/uber/cadence/common/quotas/global/rpc"
"github.com/uber/cadence/common/service"
)

Expand Down Expand Up @@ -96,7 +96,7 @@ type (
GetMatchingClient() matching.Client
GetHistoryRawClient() history.Client
GetHistoryClient() history.Client
GetRatelimiterAggregatorsClient() rpc.Client
GetRatelimiterAggregatorsClient() qrpc.Client
GetRemoteAdminClient(cluster string) admin.Client
GetRemoteFrontendClient(cluster string) frontend.Client
GetClientBean() client.Bean
Expand Down
19 changes: 12 additions & 7 deletions common/resource/resourceImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ import (
"github.com/uber/cadence/common/partition"
"github.com/uber/cadence/common/persistence"
persistenceClient "github.com/uber/cadence/common/persistence/client"
"github.com/uber/cadence/common/quotas/global/rpc"
qrpc "github.com/uber/cadence/common/quotas/global/rpc"
"github.com/uber/cadence/common/quotas/permember"
"github.com/uber/cadence/common/rpc"
"github.com/uber/cadence/common/service"
)

Expand Down Expand Up @@ -141,15 +142,15 @@ type (

pprofInitializer common.PProfInitializer
runtimeMetricsReporter *metrics.RuntimeMetricsReporter
rpcFactory common.RPCFactory
rpcFactory rpc.Factory

isolationGroups isolationgroup.State
isolationGroupConfigStore configstore.Client
partitioner partition.Partitioner

asyncWorkflowQueueProvider queue.Provider

ratelimiterAggregatorClient rpc.Client
ratelimiterAggregatorClient qrpc.Client
}
)

Expand Down Expand Up @@ -304,7 +305,7 @@ func New(
}
partitioner := ensurePartitionerOrDefault(params, isolationGroupState)

ratelimiterAggs := rpc.New(
ratelimiterAggs := qrpc.New(
historyRawClient, // no retries, will retry internally if needed
clientBean.GetHistoryPeers(),
logger,
Expand Down Expand Up @@ -384,7 +385,6 @@ func New(

// Start all resources
func (h *Impl) Start() {

if !atomic.CompareAndSwapInt32(
&h.status,
common.DaemonStatusInitialized,
Expand All @@ -399,6 +399,9 @@ func (h *Impl) Start() {
if err := h.pprofInitializer.Start(); err != nil {
h.logger.WithTags(tag.Error(err)).Fatal("fail to start PProf")
}

h.rpcFactory.Start(h.membershipResolver)

if err := h.dispatcher.Start(); err != nil {
h.logger.WithTags(tag.Error(err)).Fatal("fail to start dispatcher")
}
Expand All @@ -423,7 +426,6 @@ func (h *Impl) Start() {

// Stop stops all resources
func (h *Impl) Stop() {

if !atomic.CompareAndSwapInt32(
&h.status,
common.DaemonStatusStarted,
Expand All @@ -435,9 +437,12 @@ func (h *Impl) Stop() {
h.domainCache.Stop()
h.domainMetricsScopeCache.Stop()
h.membershipResolver.Stop()

if err := h.dispatcher.Stop(); err != nil {
h.logger.WithTags(tag.Error(err)).Error("failed to stop dispatcher")
}
h.rpcFactory.Stop()

h.runtimeMetricsReporter.Stop()
h.persistenceBean.Close()
if h.isolationGroupConfigStore != nil {
Expand Down Expand Up @@ -555,7 +560,7 @@ func (h *Impl) GetHistoryClient() history.Client {
return h.historyClient
}

func (h *Impl) GetRatelimiterAggregatorsClient() rpc.Client {
func (h *Impl) GetRatelimiterAggregatorsClient() qrpc.Client {
return h.ratelimiterAggregatorClient
}

Expand Down
144 changes: 144 additions & 0 deletions common/rpc/direct_peer_chooser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package rpc

import (
"context"
"sync"

"go.uber.org/yarpc/api/peer"
"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/peer/direct"
"go.uber.org/yarpc/yarpcerrors"

"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/membership"
)

// directPeerChooser is a peer.Chooser that chooses a peer based on the shard key.
// Peers are managed by the peerList and peers are reused across multiple requests.
type directPeerChooser struct {
serviceName string
logger log.Logger
t peer.Transport
enableConnRetainMode dynamicconfig.BoolPropertyFn
legacyChooser peer.Chooser
legacyChooserErr error
mu sync.RWMutex
}

func newDirectChooser(serviceName string, t peer.Transport, logger log.Logger, enableConnRetainMode dynamicconfig.BoolPropertyFn) *directPeerChooser {
return &directPeerChooser{
serviceName: serviceName,
logger: logger,
t: t,
enableConnRetainMode: enableConnRetainMode,
}
}

// Start statisfies the peer.Chooser interface.
func (g *directPeerChooser) Start() error {
c, ok := g.getLegacyChooser()
if ok {
return c.Start()
}

return nil // no-op
}

// Stop statisfies the peer.Chooser interface.
func (g *directPeerChooser) Stop() error {
c, ok := g.getLegacyChooser()
if ok {
return c.Stop()
}

return nil // no-op
}

// IsRunning statisfies the peer.Chooser interface.
func (g *directPeerChooser) IsRunning() bool {
c, ok := g.getLegacyChooser()
if ok {
return c.IsRunning()
}

return true // no-op
}

// Choose returns an existing peer for the shard key.
func (g *directPeerChooser) Choose(ctx context.Context, req *transport.Request) (peer peer.Peer, onFinish func(error), err error) {
if g.enableConnRetainMode != nil && !g.enableConnRetainMode() {
return g.chooseFromLegacyDirectPeerChooser(ctx, req)
}

if req.ShardKey == "" {
return nil, nil, yarpcerrors.InvalidArgumentErrorf("chooser requires ShardKey to be non-empty")
}

// TODO: implement connection retain mode
return nil, nil, yarpcerrors.UnimplementedErrorf("direct peer chooser conn retain mode unimplemented")
}

func (g *directPeerChooser) UpdatePeers(members []membership.HostInfo) {
// TODO: implement
g.logger.Debug("direct peer chooser got a membership update", tag.Counter(len(members)))
}

func (g *directPeerChooser) chooseFromLegacyDirectPeerChooser(ctx context.Context, req *transport.Request) (peer.Peer, func(error), error) {
c, ok := g.getLegacyChooser()
if !ok {
return nil, nil, yarpcerrors.InternalErrorf("failed to get legacy direct peer chooser")
}

return c.Choose(ctx, req)
}

func (g *directPeerChooser) getLegacyChooser() (peer.Chooser, bool) {
g.mu.RLock()

if g.legacyChooser != nil {
// Legacy chooser already created, return it
g.mu.RUnlock()
return g.legacyChooser, true
}

if g.legacyChooserErr != nil {
// There was an error creating the legacy chooser, return false
g.mu.RUnlock()
return nil, false
}

g.mu.RUnlock()

g.mu.Lock()
g.legacyChooser, g.legacyChooserErr = direct.New(direct.Configuration{}, g.t)
g.mu.Unlock()

if g.legacyChooserErr != nil {
g.logger.Error("failed to create legacy direct peer chooser", tag.Error(g.legacyChooserErr))
return nil, false
}

return g.legacyChooser, true
}
Loading

0 comments on commit d4b375d

Please sign in to comment.