Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emits a counter value for every unique view of the hashring #5672

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ func (s *server) startService() common.Daemon {
log.Fatalf("ringpop provider failed: %v", err)
}

params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, params.Logger))

params.MembershipResolver, err = membership.NewResolver(
peerProvider,
params.Logger,
Expand All @@ -192,8 +194,6 @@ func (s *server) startService() common.Daemon {

params.ClusterRedirectionPolicy = s.cfg.ClusterGroupMetadata.ClusterRedirectionPolicy

params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, params.Logger))

params.ClusterMetadata = cluster.NewMetadata(
clusterGroupMetadata.FailoverVersionIncrement,
clusterGroupMetadata.PrimaryClusterName,
Expand Down
52 changes: 47 additions & 5 deletions common/membership/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ package membership

import (
"fmt"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -33,6 +35,7 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/types"
)

Expand Down Expand Up @@ -64,6 +67,7 @@ type ring struct {
refreshChan chan *ChangedEvent
shutdownCh chan struct{}
shutdownWG sync.WaitGroup
scope metrics.Scope
logger log.Logger

value atomic.Value // this stores the current hashring
Expand All @@ -84,21 +88,23 @@ func newHashring(
service string,
provider PeerProvider,
logger log.Logger,
scope metrics.Scope,
) *ring {
hashring := &ring{
ring := &ring{
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

linter was complaiing about a naming collision with the package

status: common.DaemonStatusInitialized,
service: service,
peerProvider: provider,
shutdownCh: make(chan struct{}),
logger: logger,
refreshChan: make(chan *ChangedEvent),
scope: scope,
}

hashring.members.keys = make(map[string]HostInfo)
hashring.subscribers.keys = make(map[string]chan<- *ChangedEvent)
ring.members.keys = make(map[string]HostInfo)
ring.subscribers.keys = make(map[string]chan<- *ChangedEvent)

hashring.value.Store(emptyHashring())
return hashring
ring.value.Store(emptyHashring())
return ring
}

func emptyHashring() *hashring.HashRing {
Expand Down Expand Up @@ -264,6 +270,7 @@ func (r *ring) refreshRingWorker() {
r.logger.Error("refreshing ring", tag.Error(err))
}
case <-refreshTicker.C: // periodically refresh membership
r.emitHashIdentifier()
if err := r.refresh(); err != nil {
r.logger.Error("periodically refreshing ring", tag.Error(err))
}
Expand All @@ -275,6 +282,41 @@ func (r *ring) ring() *hashring.HashRing {
return r.value.Load().(*hashring.HashRing)
}

func (r *ring) emitHashIdentifier() float64 {
members, err := r.peerProvider.GetMembers(r.service)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does GetMembers return full list including self? If not we should add it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know to be honest, but also I think a view of the hashring that excludes the current machine is a valid view (if it's drained, for example), and you still want to ensure it's consistent across both drained and undrained zones.

I would not want to mutate the membership list to add this because it would make a drain case look like a split when it wasn't in this case

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If each host sees a list of members that doesn't include themselves then the hashed value will never match. Am I missing something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something?

If I'm on host 10.0.0.1 and I have a view of the hashrhing as equivalent to the hashed version of

10.0.0.2
10.0.0.3

then, this should be the same as the view for 10.0.0.2 and 3 respectively? The fact that it's not actually on the active list is not a problem?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the only reason I fetched self, was to add it as a dimension for the guage to prevent interleaving of the metric

if err != nil {
r.logger.Error("Observed a problem getting peer members while emitting hash identifier metrics", tag.Error(err))
return -1
}
self, err := r.peerProvider.WhoAmI()
if err != nil {
r.logger.Error("Observed a problem looking up self from the membership provider while emitting hash identifier metrics", tag.Error(err))
self = HostInfo{
identity: "unknown",
}
}

sort.Slice(members, func(i int, j int) bool {
return members[i].addr > members[j].addr
})
var sb strings.Builder
for i := range members {
sb.WriteString(members[i].addr)
sb.WriteString("\n")
}
hashedView := farm.Hash32([]byte(sb.String()))
// Trimming the metric because collisions are unlikely and I didn't want to use the full Float64
// in-case it overflowed something. The number itself is meaningless, so additional precision
// doesn't really give any advantage, besides reducing the risk of collision
trimmedForMetric := float64(hashedView % 1000)
r.logger.Debug("Hashring view", tag.Dynamic("hashring-view", sb.String()), tag.Dynamic("trimmed-hash-id", trimmedForMetric), tag.Service(r.service))
r.scope.Tagged(
metrics.ServiceTag(r.service),
metrics.HostTag(self.identity),
).UpdateGauge(metrics.HashringViewIdentifier, trimmedForMetric)
return trimmedForMetric
}

func (r *ring) compareMembers(members []HostInfo) (map[string]HostInfo, bool) {
changed := false
newMembersMap := make(map[string]HostInfo, len(members))
Expand Down
80 changes: 71 additions & 9 deletions common/membership/hashring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/metrics"
)

var letters = []rune("abcdefghijklmnopqrstuvwxyz")
Expand Down Expand Up @@ -103,7 +104,7 @@ func TestFailedLookupWillAskProvider(t *testing.T) {
pp.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1)
pp.EXPECT().GetMembers("test-service").Times(1)

hr := newHashring("test-service", pp, log.NewNoop())
hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))
hr.Start()
_, err := hr.Lookup("a")

Expand All @@ -117,7 +118,7 @@ func TestRefreshUpdatesRingOnlyWhenRingHasChanged(t *testing.T) {
pp.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1)
pp.EXPECT().GetMembers("test-service").Times(3)

hr := newHashring("test-service", pp, log.NewNoop())
hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))
hr.Start()

hr.refresh()
Expand All @@ -132,7 +133,7 @@ func TestSubscribeIgnoresDuplicates(t *testing.T) {
ctrl := gomock.NewController(t)
pp := NewMockPeerProvider(ctrl)

hr := newHashring("test-service", pp, log.NewNoop())
hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))

assert.NoError(t, hr.Subscribe("test-service", changeCh))
assert.Error(t, hr.Subscribe("test-service", changeCh))
Expand All @@ -143,7 +144,7 @@ func TestUnsubcribeIgnoresDeletionOnEmpty(t *testing.T) {
ctrl := gomock.NewController(t)
pp := NewMockPeerProvider(ctrl)

hr := newHashring("test-service", pp, log.NewNoop())
hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))
assert.Equal(t, 0, len(hr.subscribers.keys))
assert.NoError(t, hr.Unsubscribe("test-service"))
assert.NoError(t, hr.Unsubscribe("test-service"))
Expand All @@ -155,7 +156,7 @@ func TestUnsubcribeDeletes(t *testing.T) {
pp := NewMockPeerProvider(ctrl)
var changeCh = make(chan *ChangedEvent)

hr := newHashring("test-service", pp, log.NewNoop())
hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))

assert.Equal(t, 0, len(hr.subscribers.keys))
assert.NoError(t, hr.Subscribe("testservice1", changeCh))
Expand All @@ -171,7 +172,7 @@ func TestMemberCountReturnsNumber(t *testing.T) {
ctrl := gomock.NewController(t)
pp := NewMockPeerProvider(ctrl)

hr := newHashring("test-service", pp, log.NewNoop())
hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))
assert.Equal(t, 0, hr.MemberCount())

ring := emptyHashring()
Expand All @@ -188,7 +189,7 @@ func TestErrorIsPropagatedWhenProviderFails(t *testing.T) {
pp := NewMockPeerProvider(ctrl)
pp.EXPECT().GetMembers(gomock.Any()).Return(nil, errors.New("error"))

hr := newHashring("test-service", pp, log.NewNoop())
hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))
assert.Error(t, hr.refresh())
}

Expand All @@ -198,7 +199,7 @@ func TestStopWillStopProvider(t *testing.T) {

pp.EXPECT().Stop().Times(1)

hr := newHashring("test-service", pp, log.NewNoop())
hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))
hr.status = common.DaemonStatusStarted
hr.Stop()

Expand All @@ -213,7 +214,7 @@ func TestLookupAndRefreshRaceCondition(t *testing.T) {
pp.EXPECT().GetMembers("test-service").AnyTimes().DoAndReturn(func(service string) ([]HostInfo, error) {
return randomHostInfo(5), nil
})
hr := newHashring("test-service", pp, log.NewNoop())
hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))
hr.Start()
wg.Add(2)
go func() {
Expand All @@ -233,3 +234,64 @@ func TestLookupAndRefreshRaceCondition(t *testing.T) {

wg.Wait()
}

func TestEmitHashringView(t *testing.T) {

tests := map[string]struct {
hosts []HostInfo
lookuperr error
selfInfo HostInfo
selfErr error
expectedResult float64
}{
"example one - sorted set 1 - the output should be some random hashed value": {
hosts: []HostInfo{
{addr: "10.0.0.1:1234", ip: "10.0.0.1", identity: "host1", portMap: nil},
{addr: "10.0.0.2:1234", ip: "10.0.0.2", identity: "host2", portMap: nil},
{addr: "10.0.0.3:1234", ip: "10.0.0.3", identity: "host3", portMap: nil},
},
selfInfo: HostInfo{identity: "host123"},
expectedResult: 835.0, // the number here is meaningless
},
"example one - unsorted set 1 - the order of the hosts should not matter": {
hosts: []HostInfo{
{addr: "10.0.0.1:1234", ip: "10.0.0.1", identity: "host1", portMap: nil},
{addr: "10.0.0.3:1234", ip: "10.0.0.3", identity: "host3", portMap: nil},
{addr: "10.0.0.2:1234", ip: "10.0.0.2", identity: "host2", portMap: nil},
},
selfInfo: HostInfo{identity: "host123"},
expectedResult: 835.0, // the test here is that it's the same as test 1
},
"example 2 - empty set": {
hosts: []HostInfo{},
selfInfo: HostInfo{identity: "host123"},
expectedResult: 242.0, // meaningless hash value
},
"example 3 - nil set": {
hosts: nil,
selfInfo: HostInfo{identity: "host123"},
expectedResult: 242.0, // meaningless hash value
},
}

for name, td := range tests {

t.Run(name, func(t *testing.T) {

ctrl := gomock.NewController(t)
pp := NewMockPeerProvider(ctrl)

pp.EXPECT().GetMembers("test-service").DoAndReturn(func(service string) ([]HostInfo, error) {
return td.hosts, td.lookuperr
})

pp.EXPECT().WhoAmI().DoAndReturn(func() (HostInfo, error) {
return td.selfInfo, td.selfErr
})

hr := newHashring("test-service", pp, log.NewNoop(), metrics.NoopScope(0))

assert.Equal(t, td.expectedResult, hr.emitHashIdentifier())
})
}
}
6 changes: 3 additions & 3 deletions common/membership/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,17 @@ func NewMultiringResolver(
services []string,
provider PeerProvider,
logger log.Logger,
metrics metrics.Client,
metricsClient metrics.Client,
) *MultiringResolver {
rpo := &MultiringResolver{
status: common.DaemonStatusInitialized,
provider: provider,
rings: make(map[string]*ring),
metrics: metrics,
metrics: metricsClient,
}

for _, s := range services {
rpo.rings[s] = newHashring(s, provider, logger)
rpo.rings[s] = newHashring(s, provider, logger, metricsClient.Scope(metrics.HashringScope))
}
return rpo
}
Expand Down
6 changes: 6 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ const (

// ResolverHostNotFoundScope is a simple low level error indicating a lookup failed in the membership resolver
ResolverHostNotFoundScope
// HashringScope is a metrics scope for emitting events for the service hashrhing
HashringScope
// HistoryClientStartWorkflowExecutionScope tracks RPC calls to history service
HistoryClientStartWorkflowExecutionScope
// HistoryClientDescribeHistoryHostScope tracks RPC calls to history service
Expand Down Expand Up @@ -1707,6 +1709,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
TaskValidatorScope: {operation: "TaskValidation"},
DomainReplicationQueueScope: {operation: "DomainReplicationQueue"},
ClusterMetadataScope: {operation: "ClusterMetadata"},
HashringScope: {operation: "Hashring"},
},
// Frontend Scope Names
Frontend: {
Expand Down Expand Up @@ -2174,6 +2177,8 @@ const (
IsolationGroupStateHealthy
ValidatedWorkflowCount

HashringViewIdentifier

NumCommonMetrics // Needs to be last on this list for iota numbering
)

Expand Down Expand Up @@ -2812,6 +2817,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
IsolationGroupStateDrained: {metricName: "isolation_group_drained", metricType: Counter},
IsolationGroupStateHealthy: {metricName: "isolation_group_healthy", metricType: Counter},
ValidatedWorkflowCount: {metricName: "task_validator_count", metricType: Counter},
HashringViewIdentifier: {metricName: "hashring_view_identifier", metricType: Counter},
},
History: {
TaskRequests: {metricName: "task_requests", metricType: Counter},
Expand Down
12 changes: 12 additions & 0 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ const (
kafkaPartition = "kafkaPartition"
transport = "transport"
caller = "caller"
service = "service"
signalName = "signalName"
workflowVersion = "workflow_version"
shardID = "shard_id"
matchingHost = "matching_host"
host = "host"
pollerIsolationGroup = "poller_isolation_group"
asyncWFRequestType = "async_wf_request_type"

Expand Down Expand Up @@ -201,6 +203,16 @@ func CallerTag(value string) Tag {
return simpleMetric{key: caller, value: value}
}

// CallerTag returns a new RPC Caller type tag.
func ServiceTag(value string) Tag {
return simpleMetric{key: service, value: value}
}

// Hosttag emits the host identifier
func HostTag(value string) Tag {
return metricWithUnknown(host, value)
}

// SignalNameTag returns a new SignalName tag
func SignalNameTag(value string) Tag {
return metricWithUnknown(signalName, value)
Expand Down