diff --git a/cmd/server/cadence/server.go b/cmd/server/cadence/server.go index 1e087f7b04f..f6ece43c16e 100644 --- a/cmd/server/cadence/server.go +++ b/cmd/server/cadence/server.go @@ -155,8 +155,9 @@ func (s *server) startService() common.Daemon { ) params.MetricScope = svcCfg.Metrics.NewScope(params.Logger, params.Name) + params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, params.Logger)) - rpcParams, err := rpc.NewParams(params.Name, s.cfg, dc, params.Logger) + rpcParams, err := rpc.NewParams(params.Name, s.cfg, dc, params.Logger, params.MetricsClient) if err != nil { log.Fatalf("error creating rpc factory params: %v", err) } @@ -182,8 +183,6 @@ func (s *server) startService() common.Daemon { log.Fatalf("ringpop provider failed: %v", err) } - params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, params.Logger)) - params.MembershipResolver, err = membership.NewResolver( peerProvider, params.Logger, diff --git a/common/log/tag/tags.go b/common/log/tag/tags.go index 69c1ae37944..46a4fc6d35e 100644 --- a/common/log/tag/tags.go +++ b/common/log/tag/tags.go @@ -408,6 +408,11 @@ func Service(sv string) Tag { return newStringTag("service", sv) } +// DestService returns tag for destination service +func DestService(sv string) Tag { + return newStringTag("dest-service", sv) +} + // Addresses returns tag for Addresses func Addresses(ads []string) Tag { return newObjectTag("addresses", ads) diff --git a/common/metrics/defs.go b/common/metrics/defs.go index e83959ba3f5..d943b2670df 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -846,6 +846,9 @@ const ( // GlobalRatelimiterAggregator is the metrics scope for aggregator-side common/quotas/global behavior GlobalRatelimiterAggregator + // P2PRPCPeerChooserScope is the metrics scope for P2P RPC peer chooser + P2PRPCPeerChooserScope + NumCommonScopes ) @@ -1741,6 +1744,8 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ // currently used by both frontend and history, but may grow to other limiting-host-services. GlobalRatelimiter: {operation: "GlobalRatelimiter"}, GlobalRatelimiterAggregator: {operation: "GlobalRatelimiterAggregator"}, + + P2PRPCPeerChooserScope: {operation: "P2PRPCPeerChooser"}, }, // Frontend Scope Names Frontend: { @@ -2243,6 +2248,11 @@ const ( GlobalRatelimiterRemovedLimits GlobalRatelimiterRemovedHostLimits + // p2p rpc metrics + P2PPeersCount + P2PPeerAdded + P2PPeerRemoved + NumCommonMetrics // Needs to be last on this list for iota numbering ) @@ -2936,6 +2946,10 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ GlobalRatelimiterHostLimitsQueried: {metricName: "global_ratelimiter_host_limits_queried", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram}, GlobalRatelimiterRemovedLimits: {metricName: "global_ratelimiter_removed_limits", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram}, GlobalRatelimiterRemovedHostLimits: {metricName: "global_ratelimiter_removed_host_limits", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram}, + + P2PPeersCount: {metricName: "peers_count", metricType: Gauge}, + P2PPeerAdded: {metricName: "peer_added", metricType: Counter}, + P2PPeerRemoved: {metricName: "peer_removed", metricType: Counter}, }, History: { TaskRequests: {metricName: "task_requests", metricType: Counter}, diff --git a/common/metrics/tags.go b/common/metrics/tags.go index f2a8616337a..8607b3bef89 100644 --- a/common/metrics/tags.go +++ b/common/metrics/tags.go @@ -53,6 +53,7 @@ const ( transport = "transport" caller = "caller" service = "service" + destService = "dest_service" signalName = "signalName" workflowVersion = "workflow_version" shardID = "shard_id" @@ -219,11 +220,16 @@ func CallerTag(value string) Tag { return simpleMetric{key: caller, value: value} } -// CallerTag returns a new RPC Caller type tag. +// ServiceTag returns a new service tag. func ServiceTag(value string) Tag { return simpleMetric{key: service, value: value} } +// DestServiceTag returns a new destination service tag. +func DestServiceTag(value string) Tag { + return simpleMetric{key: destService, value: value} +} + // Hosttag emits the host identifier func HostTag(value string) Tag { return metricWithUnknown(host, value) diff --git a/common/resource/resource_impl.go b/common/resource/resource_impl.go index 1eb6ba7dfcb..7622554175d 100644 --- a/common/resource/resource_impl.go +++ b/common/resource/resource_impl.go @@ -395,7 +395,9 @@ func (h *Impl) Start() { h.logger.WithTags(tag.Error(err)).Fatal("fail to start PProf") } - h.rpcFactory.Start(h.membershipResolver) + if err := h.rpcFactory.Start(h.membershipResolver); err != nil { + h.logger.WithTags(tag.Error(err)).Fatal("fail to start RPC factory") + } if err := h.dispatcher.Start(); err != nil { h.logger.WithTags(tag.Error(err)).Fatal("fail to start dispatcher") diff --git a/common/resource/resource_impl_test.go b/common/resource/resource_impl_test.go index 93e9ecfc3d6..bf8f198d9ca 100644 --- a/common/resource/resource_impl_test.go +++ b/common/resource/resource_impl_test.go @@ -82,7 +82,7 @@ func TestStartStop(t *testing.T) { "primary-cluster": {InitialFailoverVersion: 1, Enabled: true, RPCTransport: "tchannel", RPCAddress: "localhost:0"}, "secondary-cluster": {InitialFailoverVersion: 1, Enabled: true, RPCTransport: "tchannel", RPCAddress: "localhost:0"}, }, nil, metricsCl, logger) - directOutboundPCF := rpc.NewDirectPeerChooserFactory(serviceName, logger) + directOutboundPCF := rpc.NewDirectPeerChooserFactory(serviceName, logger, metricsCl) directConnRetainFn := func(opts ...dynamicconfig.FilterOption) bool { return false } pcf := rpc.NewMockPeerChooserFactory(ctrl) peerChooser := rpc.NewMockPeerChooser(ctrl) diff --git a/common/rpc/direct_peer_chooser.go b/common/rpc/direct_peer_chooser.go index 1e515e4ebc4..ae6219eafb2 100644 --- a/common/rpc/direct_peer_chooser.go +++ b/common/rpc/direct_peer_chooser.go @@ -22,73 +22,132 @@ package rpc import ( "context" + "fmt" "sync" + "sync/atomic" "go.uber.org/yarpc/api/peer" "go.uber.org/yarpc/api/transport" "go.uber.org/yarpc/peer/direct" + "go.uber.org/yarpc/peer/hostport" "go.uber.org/yarpc/yarpcerrors" + "github.com/uber/cadence/common" "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/membership" + "github.com/uber/cadence/common/metrics" +) + +var ( + noOpSubscriberInstance = &noOpSubscriber{} ) // directPeerChooser is a peer.Chooser that chooses a peer based on the shard key. // Peers are managed by the peerList and peers are reused across multiple requests. type directPeerChooser struct { + status int32 serviceName string logger log.Logger + scope metrics.Scope t peer.Transport enableConnRetainMode dynamicconfig.BoolPropertyFn legacyChooser peer.Chooser legacyChooserErr error mu sync.RWMutex + peers map[string]peer.Peer } -func newDirectChooser(serviceName string, t peer.Transport, logger log.Logger, enableConnRetainMode dynamicconfig.BoolPropertyFn) *directPeerChooser { - return &directPeerChooser{ +func newDirectChooser( + serviceName string, + t peer.Transport, + logger log.Logger, + metricsCl metrics.Client, + enableConnRetainMode dynamicconfig.BoolPropertyFn, +) *directPeerChooser { + dpc := &directPeerChooser{ serviceName: serviceName, - logger: logger, + logger: logger.WithTags(tag.DestService(serviceName)), + scope: metricsCl.Scope(metrics.P2PRPCPeerChooserScope).Tagged(metrics.DestServiceTag(serviceName)), t: t, enableConnRetainMode: enableConnRetainMode, + peers: make(map[string]peer.Peer), + } + + if dpc.enableConnRetainMode == nil { + dpc.enableConnRetainMode = func(opts ...dynamicconfig.FilterOption) bool { return false } } + + return dpc } // Start statisfies the peer.Chooser interface. -func (g *directPeerChooser) Start() error { - c, ok := g.getLegacyChooser() - if ok { - return c.Start() +func (g *directPeerChooser) Start() (err error) { + if !atomic.CompareAndSwapInt32(&g.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) { + return nil + } + + defer func() { + if err != nil { + g.logger.Error("direct peer chooser failed to start", tag.Error(err)) + return + } + g.logger.Info("direct peer chooser started") + }() + + if !g.enableConnRetainMode() { + c, ok := g.getLegacyChooser() + if ok { + return c.Start() + } + + return fmt.Errorf("failed to start direct peer chooser because direct peer chooser initialization failed, err: %v", g.legacyChooserErr) } - return nil // no-op + return nil } // Stop statisfies the peer.Chooser interface. func (g *directPeerChooser) Stop() error { - c, ok := g.getLegacyChooser() - if ok { - return c.Stop() + if !atomic.CompareAndSwapInt32(&g.status, common.DaemonStatusStarted, common.DaemonStatusStopped) { + return nil + } + + var err error + // Stop legacy chooser if it was initialized + if g.legacyChooser != nil { + err = g.legacyChooser.Stop() } - return nil // no-op + // Release all peers if there's any + g.updatePeersInternal(nil) + + g.logger.Info("direct peer chooser stopped", tag.Error(err)) + return err } // IsRunning statisfies the peer.Chooser interface. func (g *directPeerChooser) IsRunning() bool { - c, ok := g.getLegacyChooser() - if ok { - return c.IsRunning() + if atomic.LoadInt32(&g.status) != common.DaemonStatusStarted { + return false + } + + if !g.enableConnRetainMode() { + c, ok := g.getLegacyChooser() + if ok { + return c.IsRunning() + } + return false } return true // no-op } // Choose returns an existing peer for the shard key. +// ShardKey is {host}:{port} of the peer. It could be tchannel or grpc address. func (g *directPeerChooser) Choose(ctx context.Context, req *transport.Request) (peer peer.Peer, onFinish func(error), err error) { - if g.enableConnRetainMode != nil && !g.enableConnRetainMode() { + if !g.enableConnRetainMode() { return g.chooseFromLegacyDirectPeerChooser(ctx, req) } @@ -96,19 +155,111 @@ func (g *directPeerChooser) Choose(ctx context.Context, req *transport.Request) return nil, nil, yarpcerrors.InvalidArgumentErrorf("chooser requires ShardKey to be non-empty") } - // TODO: implement connection retain mode - return nil, nil, yarpcerrors.UnimplementedErrorf("direct peer chooser conn retain mode unimplemented") + g.mu.RLock() + p, ok := g.peers[req.ShardKey] + if ok { + g.mu.RUnlock() + return p, func(error) {}, nil + } + g.mu.RUnlock() + + // peer is not cached, add new peer + p, err = g.addPeer(req.ShardKey) + if err != nil { + return nil, nil, yarpcerrors.InternalErrorf("failed to add peer for shard key %v, err: %v", req.ShardKey, err) + } + + return p, func(error) {}, nil } -func (g *directPeerChooser) UpdatePeers(members []membership.HostInfo) { - // TODO: implement +// UpdatePeers removes peers that are not in the members list. +// Do not create actual yarpc peers for the members. They are created lazily when a request comes in (Choose is called). +func (g *directPeerChooser) UpdatePeers(serviceName string, members []membership.HostInfo) { + if g.serviceName != serviceName { + g.logger.Debug("This is not the service chooser is created for. Ignore such updates.", tag.Dynamic("members-service", serviceName)) + return + } + g.logger.Debug("direct peer chooser got a membership update", tag.Counter(len(members))) + + // If the chooser is not started, do not act on membership changes. + // If membership updates arrive after chooser is stopped, ignore them. + if atomic.LoadInt32(&g.status) != common.DaemonStatusStarted { + return + } + + g.updatePeersInternal(members) +} + +func (g *directPeerChooser) updatePeersInternal(members []membership.HostInfo) { + // Create a map of valid peer addresses given members list. + validPeerAddresses := make(map[string]bool) + for _, member := range members { + for _, portName := range []string{membership.PortTchannel, membership.PortGRPC} { + addr, err := member.GetNamedAddress(portName) + if err != nil { + g.logger.Error(fmt.Sprintf("failed to get %s address of member", portName), tag.Error(err), tag.Address(member.GetAddress())) + continue + } + validPeerAddresses[addr] = true + } + } + + // Take a copy of the current peers to avoid keeping write lock while removing all peers. + peers := make(map[string]bool) + g.mu.RLock() + for addr := range g.peers { + peers[addr] = true + } + g.mu.RUnlock() + + g.logger.Debugf("valid peers: %v, current peers: %v", validPeerAddresses, peers) + + for addr := range peers { + if !validPeerAddresses[addr] { + g.removePeer(addr) + } + } +} + +func (g *directPeerChooser) removePeer(addr string) { + g.mu.RLock() + if err := g.t.ReleasePeer(g.peers[addr], noOpSubscriberInstance); err != nil { + g.logger.Error("failed to release peer", tag.Error(err), tag.Address(addr)) + } + g.mu.RUnlock() + + g.mu.Lock() + defer g.mu.Unlock() + + delete(g.peers, addr) + g.logger.Info("removed peer from direct peer chooser", tag.Address(addr)) + g.scope.IncCounter(metrics.P2PPeerRemoved) + g.scope.UpdateGauge(metrics.P2PPeersCount, float64(len(g.peers))) +} + +func (g *directPeerChooser) addPeer(addr string) (peer.Peer, error) { + g.mu.Lock() + defer g.mu.Unlock() + if p, ok := g.peers[addr]; ok { + return p, nil + } + + p, err := g.t.RetainPeer(hostport.Identify(addr), noOpSubscriberInstance) + if err != nil { + return nil, err + } + g.peers[addr] = p + g.logger.Info("added peer to direct peer chooser", tag.Address(addr)) + g.scope.IncCounter(metrics.P2PPeerAdded) + g.scope.UpdateGauge(metrics.P2PPeersCount, float64(len(g.peers))) + return p, nil } func (g *directPeerChooser) chooseFromLegacyDirectPeerChooser(ctx context.Context, req *transport.Request) (peer.Peer, func(error), error) { c, ok := g.getLegacyChooser() if !ok { - return nil, nil, yarpcerrors.InternalErrorf("failed to get legacy direct peer chooser") + return nil, nil, yarpcerrors.InternalErrorf("failed to get legacy direct peer chooser, err: %v", g.legacyChooserErr) } return c.Choose(ctx, req) @@ -140,5 +291,18 @@ func (g *directPeerChooser) getLegacyChooser() (peer.Chooser, bool) { return nil, false } + if atomic.LoadInt32(&g.status) == common.DaemonStatusStarted { + // Start the legacy chooser if the current chooser is already started + if err := g.legacyChooser.Start(); err != nil { + g.logger.Error("failed to start legacy direct peer chooser", tag.Error(err)) + return nil, false + } + } + return g.legacyChooser, true } + +// noOpSubscriber is a no-op implementation of peer.Subscriber +type noOpSubscriber struct{} + +func (*noOpSubscriber) NotifyStatusChanged(peer.Identifier) {} diff --git a/common/rpc/direct_peer_chooser_test.go b/common/rpc/direct_peer_chooser_test.go index 722f773872e..99f354eead2 100644 --- a/common/rpc/direct_peer_chooser_test.go +++ b/common/rpc/direct_peer_chooser_test.go @@ -24,6 +24,7 @@ import ( "context" "testing" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" "go.uber.org/goleak" "go.uber.org/yarpc/api/transport" @@ -31,32 +32,147 @@ import ( "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/log/testlogger" + "github.com/uber/cadence/common/membership" + "github.com/uber/cadence/common/metrics" ) -func TestDirectChooser(t *testing.T) { - req := &transport.Request{ - Caller: "caller", - Service: "service", - ShardKey: "shard1", +func TestDirectChooser_PeerUpdates(t *testing.T) { + logger := testlogger.New(t) + metricCl := metrics.NewNoopMetricsClient() + serviceName := "service" + directConnRetainFn := func(opts ...dynamicconfig.FilterOption) bool { return true } + grpcTransport := grpc.NewTransport() + chooser := newDirectChooser(serviceName, grpcTransport, logger, metricCl, directConnRetainFn) + + choosePeers := func(peers ...string) { + t.Helper() + // Calling Choose() will create peers and they will be cached + for _, p := range peers { + _, onFinish, err := chooser.Choose(context.Background(), &transport.Request{ + Caller: "caller", + Service: "service", + ShardKey: p, + }) + assert.NoError(t, err, "Choose() failed") + onFinish(nil) + } + } + + currentPeersMap := func() map[string]bool { + t.Helper() + chooser.mu.RLock() + defer chooser.mu.RUnlock() + peers := make(map[string]bool, len(chooser.peers)) + for p := range chooser.peers { + peers[p] = true + } + return peers + } + + newHost := func(peer string) membership.HostInfo { + return membership.NewDetailedHostInfo(peer+":80", peer, membership.PortMap{ + membership.PortGRPC: 80, + }) + } + + t.Run("chooser not started so should discard membership updates", func(t *testing.T) { + choosePeers("peer1:80", "peer2:80") + chooser.UpdatePeers(serviceName, nil) + wantPeers := map[string]bool{"peer1:80": true, "peer2:80": true} + gotPeers := currentPeersMap() + if diff := cmp.Diff(wantPeers, gotPeers); diff != "" { + t.Fatalf("Peers mismatch (-want +got):\n%s", diff) + } + }) + + // Start chooser and do more validations + if err := chooser.Start(); err != nil { + t.Fatalf("failed to start direct peer chooser: %v", err) + } + + defer chooser.Stop() + + t.Run("peer1 and peer2 are chosen, peer2 is removed from members list", func(t *testing.T) { + choosePeers("peer1:80", "peer2:80") + chooser.UpdatePeers(serviceName, []membership.HostInfo{ + newHost("peer1"), + }) + wantPeers := map[string]bool{"peer1:80": true} + gotPeers := currentPeersMap() + if diff := cmp.Diff(wantPeers, gotPeers); diff != "" { + t.Fatalf("Peers mismatch (-want +got):\n%s", diff) + } + }) + + t.Run("peer3 and peer4 are also chosen, membership list has peer1 and peer4", func(t *testing.T) { + choosePeers("peer3:80", "peer4:80") + chooser.UpdatePeers(serviceName, []membership.HostInfo{ + newHost("peer1"), + newHost("peer4"), + }) + wantPeers := map[string]bool{"peer1:80": true, "peer4:80": true} + gotPeers := currentPeersMap() + if diff := cmp.Diff(wantPeers, gotPeers); diff != "" { + t.Fatalf("Peers mismatch (-want +got):\n%s", diff) + } + }) + + t.Run("membership list update for another service is ignored, should still keep peer1 and peer4", func(t *testing.T) { + chooser.UpdatePeers("another-service", []membership.HostInfo{ + newHost("peer50"), + }) + wantPeers := map[string]bool{"peer1:80": true, "peer4:80": true} + gotPeers := currentPeersMap() + if diff := cmp.Diff(wantPeers, gotPeers); diff != "" { + t.Fatalf("Peers mismatch (-want +got):\n%s", diff) + } + }) +} + +func TestDirectChooser_StartStop(t *testing.T) { + newReq := func(shardKey string) *transport.Request { + return &transport.Request{ + Caller: "caller", + Service: "service", + ShardKey: shardKey, + } } tests := []struct { - desc string - retainConn bool - req *transport.Request - wantChooseErr bool + desc string + retainConn bool + req *transport.Request + multipleChoose bool + wantChooseErr bool }{ { - desc: "don't retain connection", + desc: "legacy chooser", retainConn: false, - req: req, + req: newReq("key"), + }, + { + desc: "legacy chooser - empty shard key", + retainConn: false, + req: newReq(""), + wantChooseErr: true, + }, + { + desc: "connection retain mode", + retainConn: true, + req: newReq("key"), }, { - desc: "retain connection", + desc: "connection retain mode - empty shard key", retainConn: true, - req: req, + req: newReq(""), wantChooseErr: true, }, + { + desc: "connection retain mode - multiple choose should return chooser from cache", + retainConn: true, + req: newReq("key"), + multipleChoose: true, + }, } for _, tc := range tests { @@ -64,15 +180,21 @@ func TestDirectChooser(t *testing.T) { defer goleak.VerifyNone(t) logger := testlogger.New(t) + metricCl := metrics.NewNoopMetricsClient() serviceName := "service" directConnRetainFn := func(opts ...dynamicconfig.FilterOption) bool { return tc.retainConn } grpcTransport := grpc.NewTransport() - chooser := newDirectChooser(serviceName, grpcTransport, logger, directConnRetainFn) + chooser := newDirectChooser(serviceName, grpcTransport, logger, metricCl, directConnRetainFn) + + assert.False(t, chooser.IsRunning(), "expected IsRunning()=false before Start()") + if err := chooser.Start(); err != nil { t.Fatalf("failed to start direct peer chooser: %v", err) } + assert.NoError(t, chooser.Start(), "starting again should be no-op") + assert.True(t, chooser.IsRunning()) peer, onFinish, err := chooser.Choose(context.Background(), tc.req) @@ -84,13 +206,23 @@ func TestDirectChooser(t *testing.T) { assert.NotNil(t, peer) assert.NotNil(t, onFinish) - // call onFinish to release the peer + // call onFinish will release the peer for legacy chooser onFinish(nil) } + if tc.multipleChoose { + peer2, onFinish2, err2 := chooser.Choose(context.Background(), tc.req) + assert.NoError(t, err2) + assert.NotNil(t, onFinish2) + assert.Equal(t, peer, peer2) + onFinish2(nil) + } + if err := chooser.Stop(); err != nil { t.Fatalf("failed to stop direct peer chooser: %v", err) } + + assert.NoError(t, chooser.Stop(), "stopping again should be no-op") }) } } diff --git a/common/rpc/factory.go b/common/rpc/factory.go index cd1a0b76ed0..90d182cd234 100644 --- a/common/rpc/factory.go +++ b/common/rpc/factory.go @@ -37,6 +37,7 @@ import ( "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/membership" + "github.com/uber/cadence/common/service" ) const ( @@ -44,6 +45,11 @@ const ( factoryComponentName = "rpc-factory" ) +var ( + // P2P outbounds are only needed for history and matching services + servicesToTalkP2P = []string{service.History, service.Matching} +) + // Factory is an implementation of rpc.Factory interface type FactoryImpl struct { maxMessageSize int @@ -176,22 +182,27 @@ func (d *FactoryImpl) GetMaxMessageSize() int { } func (d *FactoryImpl) Start(peerLister PeerLister) error { - // subscribe to membership changes and notify outbounds builder for peer updates d.peerLister = peerLister - ch := make(chan *membership.ChangedEvent, 1) - if err := d.peerLister.Subscribe(d.serviceName, factoryComponentName, ch); err != nil { - return fmt.Errorf("rpc factory failed to subscribe to membership updates: %v", err) + // subscribe to membership changes for history and matching. This is needed to update the peers for rpc + for _, svc := range servicesToTalkP2P { + ch := make(chan *membership.ChangedEvent, 1) + if err := d.peerLister.Subscribe(svc, factoryComponentName, ch); err != nil { + return fmt.Errorf("rpc factory failed to subscribe to membership updates for svc: %v, err: %v", svc, err) + } + d.wg.Add(1) + go d.listenMembershipChanges(svc, ch) } - d.wg.Add(1) - go d.listenMembershipChanges(ch) return nil } func (d *FactoryImpl) Stop() error { d.logger.Info("stopping rpc factory") - if err := d.peerLister.Unsubscribe(d.serviceName, factoryComponentName); err != nil { - d.logger.Error("rpc factory failed to unsubscribe from membership updates", tag.Error(err)) + + for _, svc := range servicesToTalkP2P { + if err := d.peerLister.Unsubscribe(svc, factoryComponentName); err != nil { + d.logger.Error("rpc factory failed to unsubscribe from membership updates", tag.Error(err), tag.Service(svc)) + } } d.cancelFn() @@ -201,22 +212,22 @@ func (d *FactoryImpl) Stop() error { return nil } -func (d *FactoryImpl) listenMembershipChanges(ch chan *membership.ChangedEvent) { +func (d *FactoryImpl) listenMembershipChanges(svc string, ch chan *membership.ChangedEvent) { defer d.wg.Done() for { select { case <-ch: - d.logger.Debug("rpc factory received membership changed event") - members, err := d.peerLister.Members(d.serviceName) + d.logger.Debug("rpc factory received membership changed event", tag.Service(svc)) + members, err := d.peerLister.Members(svc) if err != nil { - d.logger.Error("rpc factory failed to get members from membership resolver", tag.Error(err)) + d.logger.Error("rpc factory failed to get members from membership resolver", tag.Error(err), tag.Service(svc)) continue } - d.outbounds.UpdatePeers(members) + d.outbounds.UpdatePeers(svc, members) case <-d.ctx.Done(): - d.logger.Info("rpc factory stopped so listenMembershipChanges returning") + d.logger.Info("rpc factory stopped so listenMembershipChanges returning", tag.Service(svc)) return } } diff --git a/common/rpc/factory_test.go b/common/rpc/factory_test.go index 6b039cc8116..7b64a0661b7 100644 --- a/common/rpc/factory_test.go +++ b/common/rpc/factory_test.go @@ -21,6 +21,7 @@ package rpc import ( + "errors" "sync" "testing" "time" @@ -31,6 +32,7 @@ import ( "github.com/uber/cadence/common/log/testlogger" "github.com/uber/cadence/common/membership" + "github.com/uber/cadence/common/service" ) func TestNewFactory(t *testing.T) { @@ -61,59 +63,124 @@ func TestNewFactory(t *testing.T) { } func TestStartStop(t *testing.T) { - defer goleak.VerifyNone(t) - - ctrl := gomock.NewController(t) - logger := testlogger.New(t) - serviceName := "service" - ob := NewMockOutboundsBuilder(ctrl) - var mu sync.Mutex - var gotMembers []membership.HostInfo - outbounds := &Outbounds{ - onUpdatePeers: func(members []membership.HostInfo) { - mu.Lock() - defer mu.Unlock() - gotMembers = members + membersBySvc := map[string][]membership.HostInfo{ + service.Matching: { + membership.NewHostInfo("localhost:9191"), + membership.NewHostInfo("localhost:9192"), + }, + service.History: { + membership.NewHostInfo("localhost:8585"), }, } - ob.EXPECT().Build(gomock.Any(), gomock.Any()).Return(outbounds, nil).Times(1) - grpcMsgSize := 4 * 1024 * 1024 - f := NewFactory(logger, Params{ - ServiceName: serviceName, - TChannelAddress: "localhost:0", - GRPCMaxMsgSize: grpcMsgSize, - GRPCAddress: "localhost:0", - HTTP: &httpParams{ - Address: "localhost:0", + + tests := []struct { + desc string + wantMembersBySvc map[string][]membership.HostInfo + mockFn func(*membership.MockResolver) + wantStartErr bool + }{ + { + desc: "success", + wantMembersBySvc: membersBySvc, + mockFn: func(peerLister *membership.MockResolver) { + for _, svc := range servicesToTalkP2P { + peerLister.EXPECT().Subscribe(svc, factoryComponentName, gomock.Any()). + DoAndReturn(func(service, name string, notifyChannel chan<- *membership.ChangedEvent) error { + // Notify the channel once to validate listening logic is working + notifyChannel <- &membership.ChangedEvent{} + return nil + }).Times(1) + + peerLister.EXPECT().Members(svc).Return(membersBySvc[svc], nil).Times(1) + peerLister.EXPECT().Unsubscribe(svc, factoryComponentName).Return(nil).Times(1) + } + }, }, - OutboundsBuilder: ob, - }) + { + desc: "subscription to membership updates fail", + wantStartErr: true, + mockFn: func(peerLister *membership.MockResolver) { + for i, svc := range servicesToTalkP2P { + if i == 0 { + // subscribe will only be called for the first service and after failing, it should not be called for the rest + peerLister.EXPECT().Subscribe(svc, factoryComponentName, gomock.Any()).Return(errors.New("failed")).Times(1) + } - members := []membership.HostInfo{ - membership.NewHostInfo("localhost:9191"), - membership.NewHostInfo("localhost:9192"), - } - peerLister := membership.NewMockResolver(ctrl) - peerLister.EXPECT().Subscribe(serviceName, factoryComponentName, gomock.Any()). - DoAndReturn(func(service, name string, notifyChannel chan<- *membership.ChangedEvent) error { - // Notify the channel once to validate listening logic is working - notifyChannel <- &membership.ChangedEvent{} - return nil - }).Times(1) - peerLister.EXPECT().Unsubscribe(serviceName, factoryComponentName).Return(nil).Times(1) - peerLister.EXPECT().Members(serviceName).Return(members, nil).Times(1) - - if err := f.Start(peerLister); err != nil { - t.Fatalf("Factory.Start() returned error: %v", err) + // subscribe will be called for all services during stop + peerLister.EXPECT().Unsubscribe(svc, factoryComponentName).Return(nil).Times(1) + } + }, + }, + { + desc: "unsubscirption from membership updates fail", + wantMembersBySvc: membersBySvc, + mockFn: func(peerLister *membership.MockResolver) { + for _, svc := range servicesToTalkP2P { + peerLister.EXPECT().Subscribe(svc, factoryComponentName, gomock.Any()). + DoAndReturn(func(service, name string, notifyChannel chan<- *membership.ChangedEvent) error { + // Notify the channel once to validate listening logic is working + notifyChannel <- &membership.ChangedEvent{} + return nil + }).Times(1) + peerLister.EXPECT().Members(svc).Return(membersBySvc[svc], nil).Times(1) + peerLister.EXPECT().Unsubscribe(svc, factoryComponentName).Return(errors.New("failed")).Times(1) + } + }, + }, } - // Wait for membership changes to be processed - time.Sleep(100 * time.Millisecond) - mu.Lock() - assert.Equal(t, members, gotMembers, "UpdatePeers not called with expected members") - mu.Unlock() + for _, tc := range tests { + t.Run(tc.desc, func(t *testing.T) { + ctrl := gomock.NewController(t) + logger := testlogger.New(t) + serviceName := "service" + ob := NewMockOutboundsBuilder(ctrl) + var mu sync.Mutex + gotMembers := make(map[string][]membership.HostInfo) + outbounds := &Outbounds{ + onUpdatePeers: func(svc string, members []membership.HostInfo) { + mu.Lock() + defer mu.Unlock() + gotMembers[svc] = members + }, + } + ob.EXPECT().Build(gomock.Any(), gomock.Any()).Return(outbounds, nil).Times(1) + grpcMsgSize := 4 * 1024 * 1024 + f := NewFactory(logger, Params{ + ServiceName: serviceName, + TChannelAddress: "localhost:0", + GRPCMaxMsgSize: grpcMsgSize, + GRPCAddress: "localhost:0", + HTTP: &httpParams{ + Address: "localhost:0", + }, + OutboundsBuilder: ob, + }) + + peerLister := membership.NewMockResolver(ctrl) + tc.mockFn(peerLister) + + if err := f.Start(peerLister); err != nil { + if !tc.wantStartErr { + t.Fatalf("Factory.Start() returned error: %v", err) + } + + // start failed expectedly. do not proceed with rest of the validations + f.Stop() + return + } + + // Wait for membership changes to be processed + time.Sleep(100 * time.Millisecond) + mu.Lock() + assert.Equal(t, tc.wantMembersBySvc, gotMembers, "UpdatePeers not called with expected members") + mu.Unlock() + + if err := f.Stop(); err != nil { + t.Fatalf("Factory.Stop() returned error: %v", err) + } - if err := f.Stop(); err != nil { - t.Fatalf("Factory.Stop() returned error: %v", err) + goleak.VerifyNone(t) + }) } } diff --git a/common/rpc/outbounds.go b/common/rpc/outbounds.go index 8c17122744b..ccd02a6d17a 100644 --- a/common/rpc/outbounds.go +++ b/common/rpc/outbounds.go @@ -55,12 +55,12 @@ type OutboundsBuilder interface { type Outbounds struct { yarpc.Outbounds - onUpdatePeers func([]membership.HostInfo) + onUpdatePeers func(serviceName string, members []membership.HostInfo) } -func (o *Outbounds) UpdatePeers(peers []membership.HostInfo) { +func (o *Outbounds) UpdatePeers(serviceName string, peers []membership.HostInfo) { if o.onUpdatePeers != nil { - o.onUpdatePeers(peers) + o.onUpdatePeers(serviceName, peers) } } @@ -76,7 +76,7 @@ func CombineOutbounds(builders ...OutboundsBuilder) OutboundsBuilder { func (b multiOutboundsBuilder) Build(grpc *grpc.Transport, tchannel *tchannel.Transport) (*Outbounds, error) { outbounds := yarpc.Outbounds{} var errs error - var callbacks []func([]membership.HostInfo) + var callbacks []func(string, []membership.HostInfo) for _, builder := range b.builders { builderOutbounds, err := builder.Build(grpc, tchannel) if err != nil { @@ -99,9 +99,9 @@ func (b multiOutboundsBuilder) Build(grpc *grpc.Transport, tchannel *tchannel.Tr return &Outbounds{ Outbounds: outbounds, - onUpdatePeers: func(peers []membership.HostInfo) { + onUpdatePeers: func(serviceName string, members []membership.HostInfo) { for _, callback := range callbacks { - callback(peers) + callback(serviceName, members) } }, }, errs diff --git a/common/rpc/outbounds_test.go b/common/rpc/outbounds_test.go index fb83c4c5fcd..e20a26a5d16 100644 --- a/common/rpc/outbounds_test.go +++ b/common/rpc/outbounds_test.go @@ -37,6 +37,7 @@ import ( "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/log/testlogger" + "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/service" ) @@ -163,15 +164,16 @@ func TestDirectOutbound(t *testing.T) { grpc := &grpc.Transport{} tchannel := &tchannel.Transport{} logger := testlogger.New(t) + metricCl := metrics.NewNoopMetricsClient() falseFn := func(opts ...dynamicconfig.FilterOption) bool { return false } - o, err := NewDirectOutboundBuilder("cadence-history", false, nil, NewDirectPeerChooserFactory("cadence-history", logger), falseFn).Build(grpc, tchannel) + o, err := NewDirectOutboundBuilder("cadence-history", false, nil, NewDirectPeerChooserFactory("cadence-history", logger, metricCl), falseFn).Build(grpc, tchannel) assert.NoError(t, err) outbounds := o.Outbounds assert.Equal(t, "cadence-history", outbounds["cadence-history"].ServiceName) assert.NotNil(t, outbounds["cadence-history"].Unary) - o, err = NewDirectOutboundBuilder("cadence-history", true, nil, NewDirectPeerChooserFactory("cadence-history", logger), falseFn).Build(grpc, tchannel) + o, err = NewDirectOutboundBuilder("cadence-history", true, nil, NewDirectPeerChooserFactory("cadence-history", logger, metricCl), falseFn).Build(grpc, tchannel) assert.NoError(t, err) outbounds = o.Outbounds assert.Equal(t, "cadence-history", outbounds["cadence-history"].ServiceName) diff --git a/common/rpc/params.go b/common/rpc/params.go index 10b58faad6b..bcf4b835b4f 100644 --- a/common/rpc/params.go +++ b/common/rpc/params.go @@ -34,6 +34,7 @@ import ( "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/service" ) @@ -62,7 +63,7 @@ type httpParams struct { } // NewParams creates parameters for rpc.Factory from the given config -func NewParams(serviceName string, config *config.Config, dc *dynamicconfig.Collection, logger log.Logger) (Params, error) { +func NewParams(serviceName string, config *config.Config, dc *dynamicconfig.Collection, logger log.Logger, metricsCl metrics.Client) (Params, error) { serviceConfig, err := config.GetServiceConfig(serviceName) if err != nil { return Params{}, err @@ -143,14 +144,14 @@ func NewParams(serviceName string, config *config.Config, dc *dynamicconfig.Coll service.History, enableGRPCOutbound, outboundTLS[service.History], - NewDirectPeerChooserFactory(service.History, logger), + NewDirectPeerChooserFactory(service.History, logger, metricsCl), dc.GetBoolProperty(dynamicconfig.EnableConnectionRetainingDirectChooser), ), NewDirectOutboundBuilder( service.Matching, enableGRPCOutbound, outboundTLS[service.Matching], - NewDirectPeerChooserFactory(service.Matching, logger), + NewDirectPeerChooserFactory(service.Matching, logger, metricsCl), dc.GetBoolProperty(dynamicconfig.EnableConnectionRetainingDirectChooser), ), publicClientOutbound, diff --git a/common/rpc/params_test.go b/common/rpc/params_test.go index cdfe9b6f38f..30ceef69235 100644 --- a/common/rpc/params_test.go +++ b/common/rpc/params_test.go @@ -29,6 +29,7 @@ import ( "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/log/testlogger" + "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/service" ) @@ -41,47 +42,55 @@ func TestNewParams(t *testing.T) { Services: map[string]config.Service{"frontend": svc}} } logger := testlogger.New(t) + metricsCl := metrics.NewNoopMetricsClient() - _, err := NewParams(serviceName, &config.Config{}, dc, logger) + _, err := NewParams(serviceName, &config.Config{}, dc, logger, metricsCl) assert.EqualError(t, err, "no config section for service: frontend") - _, err = NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnLocalHost: true, BindOnIP: "1.2.3.4"}}), dc, logger) + _, err = NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnLocalHost: true, BindOnIP: "1.2.3.4"}}), dc, logger, metricsCl) assert.EqualError(t, err, "get listen IP: bindOnLocalHost and bindOnIP are mutually exclusive") - _, err = NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnIP: "invalidIP"}}), dc, logger) + _, err = NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnIP: "invalidIP"}}), dc, logger, metricsCl) assert.EqualError(t, err, "get listen IP: unable to parse bindOnIP value or it is not an IPv4 or IPv6 address: invalidIP") - _, err = NewParams(serviceName, &config.Config{Services: map[string]config.Service{"frontend": {}}}, dc, logger) + _, err = NewParams(serviceName, &config.Config{Services: map[string]config.Service{"frontend": {}}}, dc, logger, metricsCl) assert.EqualError(t, err, "public client outbound: need to provide an endpoint config for PublicClient") - _, err = NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnLocalHost: true, TLS: config.TLS{Enabled: true, CertFile: "invalid", KeyFile: "invalid"}}}), dc, logger) + cfg := makeConfig(config.Service{RPC: config.RPC{BindOnLocalHost: true, TLS: config.TLS{Enabled: true, CertFile: "invalid", KeyFile: "invalid"}}}) + _, err = NewParams(serviceName, cfg, dc, logger, metricsCl) assert.EqualError(t, err, "inbound TLS config: open invalid: no such file or directory") - _, err = NewParams(serviceName, &config.Config{Services: map[string]config.Service{ + cfg = &config.Config{Services: map[string]config.Service{ "frontend": {RPC: config.RPC{BindOnLocalHost: true}}, "history": {RPC: config.RPC{TLS: config.TLS{Enabled: true, CaFile: "invalid"}}}, - }}, dc, logger) + }} + _, err = NewParams(serviceName, cfg, dc, logger, metricsCl) assert.EqualError(t, err, "outbound cadence-history TLS config: open invalid: no such file or directory") - params, err := NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnLocalHost: true, Port: 1111, GRPCPort: 2222, GRPCMaxMsgSize: 3333}}), dc, logger) + cfg = makeConfig(config.Service{RPC: config.RPC{BindOnLocalHost: true, Port: 1111, GRPCPort: 2222, GRPCMaxMsgSize: 3333}}) + params, err := NewParams(serviceName, cfg, dc, logger, metricsCl) assert.NoError(t, err) assert.Equal(t, "127.0.0.1:1111", params.TChannelAddress) assert.Equal(t, "127.0.0.1:2222", params.GRPCAddress) assert.Equal(t, 3333, params.GRPCMaxMsgSize) assert.Nil(t, params.InboundTLS) - params, err = NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnLocalHost: true, HTTP: &config.HTTP{Port: 8800}}}), dc, logger) + cfg = makeConfig(config.Service{RPC: config.RPC{BindOnLocalHost: true, HTTP: &config.HTTP{Port: 8800}}}) + params, err = NewParams(serviceName, cfg, dc, logger, metricsCl) assert.NoError(t, err) assert.Equal(t, "127.0.0.1:8800", params.HTTP.Address) - params, err = NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnLocalHost: true, HTTP: &config.HTTP{}}}), dc, logger) + cfg = makeConfig(config.Service{RPC: config.RPC{BindOnLocalHost: true, HTTP: &config.HTTP{}}}) + params, err = NewParams(serviceName, cfg, dc, logger, metricsCl) assert.Error(t, err) - params, err = NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnIP: "1.2.3.4", GRPCPort: 2222}}), dc, logger) + cfg = makeConfig(config.Service{RPC: config.RPC{BindOnIP: "1.2.3.4", GRPCPort: 2222}}) + params, err = NewParams(serviceName, cfg, dc, logger, metricsCl) assert.NoError(t, err) assert.Equal(t, "1.2.3.4:2222", params.GRPCAddress) - params, err = NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{GRPCPort: 2222, TLS: config.TLS{Enabled: true}}}), dc, logger) + cfg = makeConfig(config.Service{RPC: config.RPC{GRPCPort: 2222, TLS: config.TLS{Enabled: true}}}) + params, err = NewParams(serviceName, cfg, dc, logger, metricsCl) assert.NoError(t, err) ip, port, err := net.SplitHostPort(params.GRPCAddress) assert.NoError(t, err) diff --git a/common/rpc/peer_chooser.go b/common/rpc/peer_chooser.go index e1da84033f9..5584660582f 100644 --- a/common/rpc/peer_chooser.go +++ b/common/rpc/peer_chooser.go @@ -31,6 +31,7 @@ import ( "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/membership" + "github.com/uber/cadence/common/metrics" ) const defaultDNSRefreshInterval = time.Second * 10 @@ -56,7 +57,7 @@ type ( peer.Chooser // UpdatePeers updates the list of peers if needed. - UpdatePeers([]membership.HostInfo) + UpdatePeers(serviceName string, members []membership.HostInfo) } dnsPeerChooserFactory struct { @@ -67,6 +68,7 @@ type ( directPeerChooserFactory struct { serviceName string logger log.Logger + metricsCl metrics.Client choosers []*directPeerChooser } ) @@ -76,7 +78,7 @@ type defaultPeerChooser struct { } // UpdatePeers is a no-op for defaultPeerChooser. It is added to satisfy the PeerChooser interface. -func (d *defaultPeerChooser) UpdatePeers(peers []membership.HostInfo) {} +func (d *defaultPeerChooser) UpdatePeers(string, []membership.HostInfo) {} func NewDNSPeerChooserFactory(interval time.Duration, logger log.Logger) PeerChooserFactory { if interval <= 0 { @@ -96,14 +98,16 @@ func (f *dnsPeerChooserFactory) CreatePeerChooser(transport peer.Transport, opts return &defaultPeerChooser{Chooser: peerList}, nil } -func NewDirectPeerChooserFactory(serviceName string, logger log.Logger) PeerChooserFactory { +func NewDirectPeerChooserFactory(serviceName string, logger log.Logger, metricsCl metrics.Client) PeerChooserFactory { return &directPeerChooserFactory{ - logger: logger, + serviceName: serviceName, + logger: logger, + metricsCl: metricsCl, } } func (f *directPeerChooserFactory) CreatePeerChooser(transport peer.Transport, opts PeerChooserOptions) (PeerChooser, error) { - c := newDirectChooser(f.serviceName, transport, f.logger, opts.EnableConnectionRetainingDirectChooser) + c := newDirectChooser(f.serviceName, transport, f.logger, f.metricsCl, opts.EnableConnectionRetainingDirectChooser) f.choosers = append(f.choosers, c) return c, nil } diff --git a/common/rpc/peer_chooser_mock.go b/common/rpc/peer_chooser_mock.go index 935debc3980..c234e562e85 100644 --- a/common/rpc/peer_chooser_mock.go +++ b/common/rpc/peer_chooser_mock.go @@ -157,13 +157,13 @@ func (mr *MockPeerChooserMockRecorder) Stop() *gomock.Call { } // UpdatePeers mocks base method. -func (m *MockPeerChooser) UpdatePeers(arg0 []membership.HostInfo) { +func (m *MockPeerChooser) UpdatePeers(serviceName string, members []membership.HostInfo) { m.ctrl.T.Helper() - m.ctrl.Call(m, "UpdatePeers", arg0) + m.ctrl.Call(m, "UpdatePeers", serviceName, members) } // UpdatePeers indicates an expected call of UpdatePeers. -func (mr *MockPeerChooserMockRecorder) UpdatePeers(arg0 interface{}) *gomock.Call { +func (mr *MockPeerChooserMockRecorder) UpdatePeers(serviceName, members interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdatePeers", reflect.TypeOf((*MockPeerChooser)(nil).UpdatePeers), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdatePeers", reflect.TypeOf((*MockPeerChooser)(nil).UpdatePeers), serviceName, members) } diff --git a/common/rpc/peer_chooser_test.go b/common/rpc/peer_chooser_test.go index efd24db99bb..06a3a6561ed 100644 --- a/common/rpc/peer_chooser_test.go +++ b/common/rpc/peer_chooser_test.go @@ -34,6 +34,7 @@ import ( "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/testlogger" + "github.com/uber/cadence/common/metrics" ) type ( @@ -82,8 +83,9 @@ func TestDNSPeerChooserFactory(t *testing.T) { func TestDirectPeerChooserFactory(t *testing.T) { logger := testlogger.New(t) + metricCl := metrics.NewNoopMetricsClient() serviceName := "service" - pcf := NewDirectPeerChooserFactory(serviceName, logger) + pcf := NewDirectPeerChooserFactory(serviceName, logger, metricCl) directConnRetainFn := func(opts ...dynamicconfig.FilterOption) bool { return false } grpcTransport := grpc.NewTransport() chooser, err := pcf.CreatePeerChooser(grpcTransport, PeerChooserOptions{ diff --git a/host/onebox.go b/host/onebox.go index 876397629c9..dabc40ac3eb 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -562,12 +562,12 @@ func (c *cadenceImpl) startFrontend(hosts map[string][]membership.HostInfo, star params.ThrottledLogger = c.logger params.TimeSource = c.timeSource params.PProfInitializer = newPProfInitializerImpl(c.logger, c.FrontendPProfPort()) - params.RPCFactory = c.newRPCFactory(service.Frontend, c.FrontendHost()) params.MetricScope = tally.NewTestScope(service.Frontend, make(map[string]string)) + params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger)) + params.RPCFactory = c.newRPCFactory(service.Frontend, c.FrontendHost(), params.MetricsClient) params.MembershipResolver = newMembershipResolver(params.Name, hosts, c.FrontendHost()) params.ClusterMetadata = c.clusterMetadata params.MessagingClient = c.messagingClient - params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger)) params.DynamicConfig = newIntegrationConfigClient(dynamicconfig.NewNopClient(), c.frontendDynCfgOverrides) params.ArchivalMetadata = c.archiverMetadata params.ArchiverProvider = c.archiverProvider @@ -644,12 +644,12 @@ func (c *cadenceImpl) startHistory(hosts map[string][]membership.HostInfo, start params.ThrottledLogger = c.logger params.TimeSource = c.timeSource params.PProfInitializer = newPProfInitializerImpl(c.logger, pprofPorts[i]) - params.RPCFactory = c.newRPCFactory(service.History, hostport) params.MetricScope = tally.NewTestScope(service.History, make(map[string]string)) + params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger)) + params.RPCFactory = c.newRPCFactory(service.History, hostport, params.MetricsClient) params.MembershipResolver = newMembershipResolver(params.Name, hosts, hostport) params.ClusterMetadata = c.clusterMetadata params.MessagingClient = c.messagingClient - params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger)) integrationClient := newIntegrationConfigClient(dynamicconfig.NewNopClient(), c.historyDynCfgOverrides) c.overrideHistoryDynamicConfig(integrationClient) params.DynamicConfig = integrationClient @@ -723,11 +723,11 @@ func (c *cadenceImpl) startMatching(hosts map[string][]membership.HostInfo, star params.ThrottledLogger = c.logger params.TimeSource = c.timeSource params.PProfInitializer = newPProfInitializerImpl(c.logger, pprofPorts[i]) - params.RPCFactory = c.newRPCFactory(service.Matching, hostport) params.MetricScope = tally.NewTestScope(service.Matching, map[string]string{"matching-host": matchingHost}) + params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger)) + params.RPCFactory = c.newRPCFactory(service.Matching, hostport, params.MetricsClient) params.MembershipResolver = newMembershipResolver(params.Name, hosts, hostport) params.ClusterMetadata = c.clusterMetadata - params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger)) params.DynamicConfig = newIntegrationConfigClient(dynamicconfig.NewNopClient(), c.matchingDynCfgOverrides) params.ArchivalMetadata = c.archiverMetadata params.ArchiverProvider = c.archiverProvider @@ -783,11 +783,11 @@ func (c *cadenceImpl) startWorker(hosts map[string][]membership.HostInfo, startW params.ThrottledLogger = c.logger params.TimeSource = c.timeSource params.PProfInitializer = newPProfInitializerImpl(c.logger, c.WorkerPProfPort()) - params.RPCFactory = c.newRPCFactory(service.Worker, c.WorkerServiceHost()) params.MetricScope = tally.NewTestScope(service.Worker, make(map[string]string)) + params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger)) + params.RPCFactory = c.newRPCFactory(service.Worker, c.WorkerServiceHost(), params.MetricsClient) params.MembershipResolver = newMembershipResolver(params.Name, hosts, c.WorkerServiceHost()) params.ClusterMetadata = c.clusterMetadata - params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger)) params.DynamicConfig = newIntegrationConfigClient(dynamicconfig.NewNopClient(), c.workerDynCfgOverrides) params.ArchivalMetadata = c.archiverMetadata params.ArchiverProvider = c.archiverProvider @@ -1024,7 +1024,7 @@ func newPublicClient(dispatcher *yarpc.Dispatcher) cwsc.Interface { ) } -func (c *cadenceImpl) newRPCFactory(serviceName string, host membership.HostInfo) rpc.Factory { +func (c *cadenceImpl) newRPCFactory(serviceName string, host membership.HostInfo, metricsCl metrics.Client) rpc.Factory { tchannelAddress, err := host.GetNamedAddress(membership.PortTchannel) if err != nil { c.logger.Fatal("failed to get PortTchannel port from host", tag.Value(host), tag.Error(err)) @@ -1040,7 +1040,7 @@ func (c *cadenceImpl) newRPCFactory(serviceName string, host membership.HostInfo c.logger.Fatal("failed to get frontend PortGRPC", tag.Value(c.FrontendHost()), tag.Error(err)) } - directOutboundPCF := rpc.NewDirectPeerChooserFactory(serviceName, c.logger) + directOutboundPCF := rpc.NewDirectPeerChooserFactory(serviceName, c.logger, metricsCl) directConnRetainFn := func(opts ...dynamicconfig.FilterOption) bool { return false } return rpc.NewFactory(c.logger, rpc.Params{