diff --git a/common/membership/hostinfo.go b/common/membership/hostinfo.go index f24ca1a6f54..af6ef0956ff 100644 --- a/common/membership/hostinfo.go +++ b/common/membership/hostinfo.go @@ -24,41 +24,30 @@ package membership -// HostInfo is a type that contains the info about a temporal host -type HostInfo struct { - addr string // ip:port - labels map[string]string +// HostInfo represents the host of a Temporal service. +type HostInfo interface { + // Identity returns the unique identifier of the host. + // This may be the same as the address. + Identity() string + // GetAddress returns the socket address of the host (i.e. :). + // This must be a valid gRPC address. + GetAddress() string } -// NewHostInfo creates a new HostInfo instance -func NewHostInfo(addr string, labels map[string]string) *HostInfo { - if labels == nil { - labels = make(map[string]string) - } - return &HostInfo{ - addr: addr, - labels: labels, - } +// NewHostInfoFromAddress creates a new HostInfo instance from a socket address. +func NewHostInfoFromAddress(address string) HostInfo { + return hostAddress(address) } -// GetAddress returns the ip:port address -func (hi *HostInfo) GetAddress() string { - return hi.addr -} - -// Identity implements ringpop's Membership interface -func (hi *HostInfo) Identity() string { - // for now we just use the address as the identity - return hi.addr -} +// hostAddress is a HostInfo implementation that uses a string as the address and identity. +type hostAddress string -// Label implements ringpop's Membership interface -func (hi *HostInfo) Label(key string) (value string, has bool) { - value, has = hi.labels[key] - return +// GetAddress returns the value of the hostAddress. +func (a hostAddress) GetAddress() string { + return string(a) } -// SetLabel sets the label. -func (hi *HostInfo) SetLabel(key string, value string) { - hi.labels[key] = value +// Identity returns the value of the hostAddress. +func (a hostAddress) Identity() string { + return string(a) } diff --git a/common/membership/hostinfo_provider.go b/common/membership/hostinfo_provider.go index 63310549599..a14f5017b9b 100644 --- a/common/membership/hostinfo_provider.go +++ b/common/membership/hostinfo_provider.go @@ -37,7 +37,7 @@ var HostInfoProviderModule = fx.Options( type ( cachingHostInfoProvider struct { - hostInfo *HostInfo + hostInfo HostInfo membershipMonitor Monitor } ) @@ -57,7 +57,7 @@ func (hip *cachingHostInfoProvider) Start() error { return nil } -func (hip *cachingHostInfoProvider) HostInfo() *HostInfo { +func (hip *cachingHostInfoProvider) HostInfo() HostInfo { return hip.hostInfo } diff --git a/common/membership/interfaces.go b/common/membership/interfaces.go index 91881cc2f5d..42b2bb50893 100644 --- a/common/membership/interfaces.go +++ b/common/membership/interfaces.go @@ -52,9 +52,9 @@ type ( // ChangedEvent describes a change in membership ChangedEvent struct { - HostsAdded []*HostInfo - HostsUpdated []*HostInfo - HostsRemoved []*HostInfo + HostsAdded []HostInfo + HostsUpdated []HostInfo + HostsRemoved []HostInfo } // Monitor provides membership information for all temporal services. @@ -62,12 +62,12 @@ type ( Monitor interface { common.Daemon - WhoAmI() (*HostInfo, error) + WhoAmI() (HostInfo, error) // EvictSelf evicts this member from the membership ring. After this method is // called, other members will discover that this node is no longer part of the // ring. This primitive is useful to carry out graceful host shutdown during deployments. EvictSelf() error - Lookup(service primitives.ServiceName, key string) (*HostInfo, error) + Lookup(service primitives.ServiceName, key string) (HostInfo, error) GetResolver(service primitives.ServiceName) (ServiceResolver, error) // AddListener adds a listener for this service. // The listener will get notified on the given @@ -93,7 +93,7 @@ type ( // ServiceResolver provides membership information for a specific temporal service. // It can be used to resolve which member host is responsible for serving a given key. ServiceResolver interface { - Lookup(key string) (*HostInfo, error) + Lookup(key string) (HostInfo, error) // AddListener adds a listener which will get notified on the given // channel, whenever membership changes. // @name: The name for identifying the listener @@ -104,13 +104,13 @@ type ( // MemberCount returns host count in hashring for any particular role MemberCount() int // Members returns all host addresses in hashring for any particular role - Members() []*HostInfo + Members() []HostInfo // Requests to rebuild the hash ring RequestRefresh() } HostInfoProvider interface { Start() error - HostInfo() *HostInfo + HostInfo() HostInfo } ) diff --git a/common/membership/interfaces_mock.go b/common/membership/interfaces_mock.go index 620f401fc96..f20ddc6ce6d 100644 --- a/common/membership/interfaces_mock.go +++ b/common/membership/interfaces_mock.go @@ -133,10 +133,10 @@ func (mr *MockMonitorMockRecorder) GetResolver(service interface{}) *gomock.Call } // Lookup mocks base method. -func (m *MockMonitor) Lookup(service primitives.ServiceName, key string) (*HostInfo, error) { +func (m *MockMonitor) Lookup(service primitives.ServiceName, key string) (HostInfo, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Lookup", service, key) - ret0, _ := ret[0].(*HostInfo) + ret0, _ := ret[0].(HostInfo) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -200,10 +200,10 @@ func (mr *MockMonitorMockRecorder) WaitUntilInitialized(arg0 interface{}) *gomoc } // WhoAmI mocks base method. -func (m *MockMonitor) WhoAmI() (*HostInfo, error) { +func (m *MockMonitor) WhoAmI() (HostInfo, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "WhoAmI") - ret0, _ := ret[0].(*HostInfo) + ret0, _ := ret[0].(HostInfo) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -252,10 +252,10 @@ func (mr *MockServiceResolverMockRecorder) AddListener(name, notifyChannel inter } // Lookup mocks base method. -func (m *MockServiceResolver) Lookup(key string) (*HostInfo, error) { +func (m *MockServiceResolver) Lookup(key string) (HostInfo, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Lookup", key) - ret0, _ := ret[0].(*HostInfo) + ret0, _ := ret[0].(HostInfo) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -281,10 +281,10 @@ func (mr *MockServiceResolverMockRecorder) MemberCount() *gomock.Call { } // Members mocks base method. -func (m *MockServiceResolver) Members() []*HostInfo { +func (m *MockServiceResolver) Members() []HostInfo { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Members") - ret0, _ := ret[0].([]*HostInfo) + ret0, _ := ret[0].([]HostInfo) return ret0 } @@ -344,10 +344,10 @@ func (m *MockHostInfoProvider) EXPECT() *MockHostInfoProviderMockRecorder { } // HostInfo mocks base method. -func (m *MockHostInfoProvider) HostInfo() *HostInfo { +func (m *MockHostInfoProvider) HostInfo() HostInfo { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "HostInfo") - ret0, _ := ret[0].(*HostInfo) + ret0, _ := ret[0].(HostInfo) return ret0 } diff --git a/common/membership/ringpop_hostinfo.go b/common/membership/ringpop_hostinfo.go new file mode 100644 index 00000000000..e2c059f3462 --- /dev/null +++ b/common/membership/ringpop_hostinfo.go @@ -0,0 +1,63 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package membership + +type ringpopHostInfo struct { + addr string // ip:port + labels map[string]string +} + +// newRingpopHostInfo creates a new *ringpopHostInfo instance +func newRingpopHostInfo(addr string, labels map[string]string) *ringpopHostInfo { + if labels == nil { + labels = make(map[string]string) + } + return &ringpopHostInfo{ + addr: addr, + labels: labels, + } +} + +// GetAddress returns the ip:port address +func (hi *ringpopHostInfo) GetAddress() string { + return hi.addr +} + +// Identity implements ringpop's Membership interface +func (hi *ringpopHostInfo) Identity() string { + // for now we just use the address as the identity + return hi.addr +} + +// Label implements ringpop's Membership interface +func (hi *ringpopHostInfo) Label(key string) (value string, has bool) { + value, has = hi.labels[key] + return +} + +// SetLabel sets the label. +func (hi *ringpopHostInfo) SetLabel(key string, value string) { + hi.labels[key] = value +} diff --git a/common/membership/rp_monitor.go b/common/membership/rp_monitor.go index e6ac925991d..74fc58fc8b1 100644 --- a/common/membership/rp_monitor.go +++ b/common/membership/rp_monitor.go @@ -341,7 +341,7 @@ func (rpo *ringpopMonitor) Stop() { // This is different from service address as we register ringpop handlers on a separate port. // For this reason we need to lookup the port for the service and replace ringpop port with service port before // returning HostInfo back. -func (rpo *ringpopMonitor) WhoAmI() (*HostInfo, error) { +func (rpo *ringpopMonitor) WhoAmI() (HostInfo, error) { address, err := rpo.rp.WhoAmI() if err != nil { return nil, err @@ -360,7 +360,7 @@ func (rpo *ringpopMonitor) WhoAmI() (*HostInfo, error) { if err != nil { return nil, err } - return NewHostInfo(serviceAddress, labels.AsMap()), nil + return newRingpopHostInfo(serviceAddress, labels.AsMap()), nil } func (rpo *ringpopMonitor) EvictSelf() error { @@ -375,7 +375,7 @@ func (rpo *ringpopMonitor) GetResolver(service primitives.ServiceName) (ServiceR return ring, nil } -func (rpo *ringpopMonitor) Lookup(service primitives.ServiceName, key string) (*HostInfo, error) { +func (rpo *ringpopMonitor) Lookup(service primitives.ServiceName, key string) (HostInfo, error) { ring, err := rpo.GetResolver(service) if err != nil { return nil, err diff --git a/common/membership/rp_monitor_test.go b/common/membership/rp_monitor_test.go index 507a5a2ea4a..736c6ad366c 100644 --- a/common/membership/rp_monitor_test.go +++ b/common/membership/rp_monitor_test.go @@ -120,13 +120,13 @@ func (s *RpoSuite) testCompareMembers(curr []string, new []string, expectedDiff if event != nil { var diff []string for _, a := range event.HostsAdded { - diff = append(diff, "+"+a.addr) + diff = append(diff, "+"+a.GetAddress()) } for _, a := range event.HostsUpdated { - diff = append(diff, "~"+a.addr) + diff = append(diff, "~"+a.GetAddress()) } for _, a := range event.HostsRemoved { - diff = append(diff, "-"+a.addr) + diff = append(diff, "-"+a.GetAddress()) } s.ElementsMatch(expectedDiff, diff) } diff --git a/common/membership/rp_service_resolver.go b/common/membership/rp_service_resolver.go index 5f4f0185fe6..83f2f75cf58 100644 --- a/common/membership/rp_service_resolver.go +++ b/common/membership/rp_service_resolver.go @@ -158,9 +158,7 @@ func (r *ringpopServiceResolver) RequestRefresh() { } // Lookup finds the host in the ring responsible for serving the given key -func (r *ringpopServiceResolver) Lookup( - key string, -) (*HostInfo, error) { +func (r *ringpopServiceResolver) Lookup(key string) (HostInfo, error) { addr, found := r.ring().Lookup(key) if !found { @@ -168,7 +166,7 @@ func (r *ringpopServiceResolver) Lookup( return nil, ErrInsufficientHosts } - return NewHostInfo(addr, r.getLabelsMap()), nil + return newRingpopHostInfo(addr, r.getLabelsMap()), nil } func (r *ringpopServiceResolver) AddListener( @@ -204,10 +202,10 @@ func (r *ringpopServiceResolver) MemberCount() int { return r.ring().ServerCount() } -func (r *ringpopServiceResolver) Members() []*HostInfo { - var servers []*HostInfo +func (r *ringpopServiceResolver) Members() []HostInfo { + var servers []HostInfo for _, s := range r.ring().Servers() { - servers = append(servers, NewHostInfo(s, r.getLabelsMap())) + servers = append(servers, newRingpopHostInfo(s, r.getLabelsMap())) } return servers @@ -274,7 +272,7 @@ func (r *ringpopServiceResolver) refreshNoLock() (*ChangedEvent, error) { ring := newHashRing() for _, addr := range addrs { - host := NewHostInfo(addr, r.getLabelsMap()) + host := newRingpopHostInfo(addr, r.getLabelsMap()) ring.AddMembers(host) } @@ -373,13 +371,13 @@ func (r *ringpopServiceResolver) compareMembers(addrs []string) (map[string]stru for _, addr := range addrs { newMembersMap[addr] = struct{}{} if _, ok := r.membersMap[addr]; !ok { - event.HostsAdded = append(event.HostsAdded, NewHostInfo(addr, r.getLabelsMap())) + event.HostsAdded = append(event.HostsAdded, newRingpopHostInfo(addr, r.getLabelsMap())) changed = true } } for addr := range r.membersMap { if _, ok := newMembersMap[addr]; !ok { - event.HostsRemoved = append(event.HostsRemoved, NewHostInfo(addr, r.getLabelsMap())) + event.HostsRemoved = append(event.HostsRemoved, newRingpopHostInfo(addr, r.getLabelsMap())) changed = true } } diff --git a/common/membership/rp_test_cluster.go b/common/membership/rp_test_cluster.go index 9ecde17c2ab..fcffb941553 100644 --- a/common/membership/rp_test_cluster.go +++ b/common/membership/rp_test_cluster.go @@ -98,7 +98,7 @@ func NewTestRingpopCluster( logger.Error("Failed to build broadcast hostport", tag.Error(err)) return nil } - cluster.hostInfoList[i] = HostInfo{addr: cluster.hostAddrs[i]} + cluster.hostInfoList[i] = newRingpopHostInfo(cluster.hostAddrs[i], nil) } // if seed node is already supplied, use it; if not, set it @@ -209,9 +209,9 @@ func (c *TestRingpopCluster) GetHostAddrs() []string { // the given addr, if it exists func (c *TestRingpopCluster) FindHostByAddr(addr string) (HostInfo, bool) { for _, hi := range c.hostInfoList { - if hi.addr == addr { + if hi.GetAddress() == addr { return hi, true } } - return HostInfo{}, false + return nil, false } diff --git a/common/resourcetest/resourceTest.go b/common/resourcetest/resourceTest.go index 6003bc0107f..23ea70ea8f9 100644 --- a/common/resourcetest/resourceTest.go +++ b/common/resourcetest/resourceTest.go @@ -117,7 +117,7 @@ const ( testHostName = "test_host" ) -var testHostInfo = membership.NewHostInfo(testHostName, nil) +var testHostInfo = membership.NewHostInfoFromAddress(testHostName) // NewTest returns a new test resource instance func NewTest( @@ -248,7 +248,7 @@ func (t *Test) GetHostName() string { } // GetHostInfo for testing -func (t *Test) GetHostInfo() *membership.HostInfo { +func (t *Test) GetHostInfo() membership.HostInfo { return testHostInfo } diff --git a/service/frontend/adminHandler_test.go b/service/frontend/adminHandler_test.go index 1a730074902..8b8235d382a 100644 --- a/service/frontend/adminHandler_test.go +++ b/service/frontend/adminHandler_test.go @@ -1152,15 +1152,15 @@ func (s *adminHandlerSuite) Test_AddOrUpdateRemoteCluster_SaveClusterMetadata_No func (s *adminHandlerSuite) Test_DescribeCluster_CurrentCluster_Success() { var clusterId = uuid.New() clusterName := s.mockMetadata.GetCurrentClusterName() - s.mockResource.MembershipMonitor.EXPECT().WhoAmI().Return(&membership.HostInfo{}, nil) + s.mockResource.MembershipMonitor.EXPECT().WhoAmI().Return(membership.NewHostInfoFromAddress("test"), nil) s.mockResource.MembershipMonitor.EXPECT().GetReachableMembers().Return(nil, nil) - s.mockResource.HistoryServiceResolver.EXPECT().Members().Return([]*membership.HostInfo{}) + s.mockResource.HistoryServiceResolver.EXPECT().Members().Return([]membership.HostInfo{}) s.mockResource.HistoryServiceResolver.EXPECT().MemberCount().Return(0) - s.mockResource.FrontendServiceResolver.EXPECT().Members().Return([]*membership.HostInfo{}) + s.mockResource.FrontendServiceResolver.EXPECT().Members().Return([]membership.HostInfo{}) s.mockResource.FrontendServiceResolver.EXPECT().MemberCount().Return(0) - s.mockResource.MatchingServiceResolver.EXPECT().Members().Return([]*membership.HostInfo{}) + s.mockResource.MatchingServiceResolver.EXPECT().Members().Return([]membership.HostInfo{}) s.mockResource.MatchingServiceResolver.EXPECT().MemberCount().Return(0) - s.mockResource.WorkerServiceResolver.EXPECT().Members().Return([]*membership.HostInfo{}) + s.mockResource.WorkerServiceResolver.EXPECT().Members().Return([]membership.HostInfo{}) s.mockResource.WorkerServiceResolver.EXPECT().MemberCount().Return(0) s.mockResource.ExecutionMgr.EXPECT().GetName().Return("") s.mockVisibilityMgr.EXPECT().GetStoreNames().Return([]string{elasticsearch.PersistenceName}) @@ -1191,15 +1191,15 @@ func (s *adminHandlerSuite) Test_DescribeCluster_NonCurrentCluster_Success() { var clusterName = uuid.New() var clusterId = uuid.New() - s.mockResource.MembershipMonitor.EXPECT().WhoAmI().Return(&membership.HostInfo{}, nil) + s.mockResource.MembershipMonitor.EXPECT().WhoAmI().Return(membership.NewHostInfoFromAddress("test"), nil) s.mockResource.MembershipMonitor.EXPECT().GetReachableMembers().Return(nil, nil) - s.mockResource.HistoryServiceResolver.EXPECT().Members().Return([]*membership.HostInfo{}) + s.mockResource.HistoryServiceResolver.EXPECT().Members().Return([]membership.HostInfo{}) s.mockResource.HistoryServiceResolver.EXPECT().MemberCount().Return(0) - s.mockResource.FrontendServiceResolver.EXPECT().Members().Return([]*membership.HostInfo{}) + s.mockResource.FrontendServiceResolver.EXPECT().Members().Return([]membership.HostInfo{}) s.mockResource.FrontendServiceResolver.EXPECT().MemberCount().Return(0) - s.mockResource.MatchingServiceResolver.EXPECT().Members().Return([]*membership.HostInfo{}) + s.mockResource.MatchingServiceResolver.EXPECT().Members().Return([]membership.HostInfo{}) s.mockResource.MatchingServiceResolver.EXPECT().MemberCount().Return(0) - s.mockResource.WorkerServiceResolver.EXPECT().Members().Return([]*membership.HostInfo{}) + s.mockResource.WorkerServiceResolver.EXPECT().Members().Return([]membership.HostInfo{}) s.mockResource.WorkerServiceResolver.EXPECT().MemberCount().Return(0) s.mockResource.ExecutionMgr.EXPECT().GetName().Return("") s.mockVisibilityMgr.EXPECT().GetStoreNames().Return([]string{elasticsearch.PersistenceName}) diff --git a/service/history/shard/controller_test.go b/service/history/shard/controller_test.go index b9d340d1943..fbed9839105 100644 --- a/service/history/shard/controller_test.go +++ b/service/history/shard/controller_test.go @@ -69,7 +69,7 @@ type ( mockClusterMetadata *cluster.MockMetadata mockServiceResolver *membership.MockServiceResolver - hostInfo *membership.HostInfo + hostInfo membership.HostInfo mockShardManager *persistence.MockShardManager mockEngineFactory *MockEngineFactory @@ -196,7 +196,7 @@ func (s *controllerSuite) TestAcquireShardSuccess() { }).Return(nil).AnyTimes() } else { ownerHost := fmt.Sprintf("test-acquire-shard-host-%v", hostID) - s.mockServiceResolver.EXPECT().Lookup(convert.Int32ToString(shardID)).Return(membership.NewHostInfo(ownerHost, nil), nil) + s.mockServiceResolver.EXPECT().Lookup(convert.Int32ToString(shardID)).Return(membership.NewHostInfoFromAddress(ownerHost), nil) } } @@ -266,7 +266,9 @@ func (s *controllerSuite) TestAcquireShardsConcurrently() { }).Return(nil).AnyTimes() } else { ownerHost := fmt.Sprintf("test-acquire-shard-host-%v", hostID) - s.mockServiceResolver.EXPECT().Lookup(convert.Int32ToString(shardID)).Return(membership.NewHostInfo(ownerHost, nil), nil) + s.mockServiceResolver.EXPECT().Lookup(convert.Int32ToString(shardID)).Return( + membership.NewHostInfoFromAddress(ownerHost), nil, + ) } } @@ -476,7 +478,7 @@ func (s *controllerSuite) TestHistoryEngineClosed() { workerWG.Wait() - differentHostInfo := membership.NewHostInfo("another-host", nil) + differentHostInfo := membership.NewHostInfoFromAddress("another-host") for shardID := int32(1); shardID <= 2; shardID++ { mockEngine := historyEngines[shardID] mockEngine.EXPECT().Stop().Return() diff --git a/service/worker/pernamespaceworker.go b/service/worker/pernamespaceworker.go index ae06b7edd26..de74cba0938 100644 --- a/service/worker/pernamespaceworker.go +++ b/service/worker/pernamespaceworker.go @@ -80,7 +80,7 @@ type ( logger log.Logger sdkClientFactory sdk.ClientFactory namespaceRegistry namespace.Registry - self *membership.HostInfo + self membership.HostInfo hostName resource.HostName config *Config serviceResolver membership.ServiceResolver @@ -145,7 +145,7 @@ func (wm *perNamespaceWorkerManager) Running() bool { } func (wm *perNamespaceWorkerManager) Start( - self *membership.HostInfo, + self membership.HostInfo, serviceResolver membership.ServiceResolver, ) { if !atomic.CompareAndSwapInt32( diff --git a/service/worker/pernamespaceworker_test.go b/service/worker/pernamespaceworker_test.go index 61a71365e97..3dff4bf42bc 100644 --- a/service/worker/pernamespaceworker_test.go +++ b/service/worker/pernamespaceworker_test.go @@ -55,7 +55,7 @@ type perNsWorkerManagerSuite struct { logger log.Logger cfactory *sdk.MockClientFactory registry *namespace.MockRegistry - hostInfo *membership.HostInfo + hostInfo membership.HostInfo serviceResolver *membership.MockServiceResolver cmp1 *workercommon.MockPerNSWorkerComponent @@ -74,7 +74,7 @@ func (s *perNsWorkerManagerSuite) SetupTest() { s.logger = log.NewTestLogger() s.cfactory = sdk.NewMockClientFactory(s.controller) s.registry = namespace.NewMockRegistry(s.controller) - s.hostInfo = membership.NewHostInfo("self", nil) + s.hostInfo = membership.NewHostInfoFromAddress("self") s.serviceResolver = membership.NewMockServiceResolver(s.controller) s.cmp1 = workercommon.NewMockPerNSWorkerComponent(s.controller) s.cmp2 = workercommon.NewMockPerNSWorkerComponent(s.controller) @@ -160,7 +160,7 @@ func (s *perNsWorkerManagerSuite) TestEnabledButResolvedToOther() { Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfo("other1", nil), nil) + s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("other1"), nil) s.manager.namespaceCallback(ns, false) // main work happens in a goroutine @@ -177,7 +177,7 @@ func (s *perNsWorkerManagerSuite) TestEnabled() { Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfo("self", nil), nil) + s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("self"), nil) cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) wkr1 := mocksdk.NewMockWorker(s.controller) @@ -203,9 +203,9 @@ func (s *perNsWorkerManagerSuite) TestMultiplicity() { Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().Lookup("ns3/0").Return(membership.NewHostInfo("self", nil), nil) - s.serviceResolver.EXPECT().Lookup("ns3/1").Return(membership.NewHostInfo("other", nil), nil) - s.serviceResolver.EXPECT().Lookup("ns3/2").Return(membership.NewHostInfo("self", nil), nil) + s.serviceResolver.EXPECT().Lookup("ns3/0").Return(membership.NewHostInfoFromAddress("self"), nil) + s.serviceResolver.EXPECT().Lookup("ns3/1").Return(membership.NewHostInfoFromAddress("other"), nil) + s.serviceResolver.EXPECT().Lookup("ns3/2").Return(membership.NewHostInfoFromAddress("self"), nil) cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns3")).Return(cli1) wkr1 := mocksdk.NewMockWorker(s.controller) @@ -235,7 +235,7 @@ func (s *perNsWorkerManagerSuite) TestOptions() { Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().Lookup(gomock.Any()).Return(membership.NewHostInfo("self", nil), nil).AnyTimes() + s.serviceResolver.EXPECT().Lookup(gomock.Any()).Return(membership.NewHostInfoFromAddress("self"), nil).AnyTimes() cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) cli2 := mocksdk.NewMockClient(s.controller) @@ -286,9 +286,9 @@ func (s *perNsWorkerManagerSuite) TestTwoNamespacesTwoComponents() { return &workercommon.PerNSDedicatedWorkerOptions{Enabled: ns.Name().String() == "ns1"} }).AnyTimes() - s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfo("self", nil), nil) - s.serviceResolver.EXPECT().Lookup("ns2/0").Return(membership.NewHostInfo("self", nil), nil) - s.serviceResolver.EXPECT().Lookup("ns2/1").Return(membership.NewHostInfo("self", nil), nil) + s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("self"), nil) + s.serviceResolver.EXPECT().Lookup("ns2/0").Return(membership.NewHostInfoFromAddress("self"), nil) + s.serviceResolver.EXPECT().Lookup("ns2/1").Return(membership.NewHostInfoFromAddress("self"), nil) cli1 := mocksdk.NewMockClient(s.controller) cli2 := mocksdk.NewMockClient(s.controller) @@ -329,7 +329,7 @@ func (s *perNsWorkerManagerSuite) TestDeleteNs() { Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfo("self", nil), nil) + s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("self"), nil) cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) wkr1 := mocksdk.NewMockWorker(s.controller) @@ -349,7 +349,7 @@ func (s *perNsWorkerManagerSuite) TestDeleteNs() { // restore it nsRestored := testns("ns1", enumspb.NAMESPACE_STATE_REGISTERED) - s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfo("self", nil), nil) + s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("self"), nil) cli2 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli2) wkr2 := mocksdk.NewMockWorker(s.controller) @@ -378,13 +378,13 @@ func (s *perNsWorkerManagerSuite) TestMembershipChanged() { }).AnyTimes() // we don't own it at first - s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfo("other", nil), nil) + s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("other"), nil) s.manager.namespaceCallback(ns, false) time.Sleep(50 * time.Millisecond) // now we own it - s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfo("self", nil), nil) + s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("self"), nil) cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) wkr1 := mocksdk.NewMockWorker(s.controller) @@ -396,7 +396,7 @@ func (s *perNsWorkerManagerSuite) TestMembershipChanged() { time.Sleep(50 * time.Millisecond) // now we don't own it anymore - s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfo("other", nil), nil) + s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("other"), nil) wkr1.EXPECT().Stop() cli1.EXPECT().Close() @@ -416,7 +416,7 @@ func (s *perNsWorkerManagerSuite) TestServiceResolverError() { s.serviceResolver.EXPECT().Lookup("ns1/0").Return(nil, errors.New("resolver error")) s.serviceResolver.EXPECT().Lookup("ns1/0").Return(nil, errors.New("resolver error again")) - s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfo("self", nil), nil) + s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("self"), nil) cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) @@ -443,7 +443,7 @@ func (s *perNsWorkerManagerSuite) TestStartWorkerError() { Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfo("self", nil), nil).AnyTimes() + s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("self"), nil).AnyTimes() cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) diff --git a/service/worker/replicator/namespace_replication_message_processor.go b/service/worker/replicator/namespace_replication_message_processor.go index 1b7ab8aeea4..ecb42cfdfee 100644 --- a/service/worker/replicator/namespace_replication_message_processor.go +++ b/service/worker/replicator/namespace_replication_message_processor.go @@ -61,7 +61,7 @@ func newNamespaceReplicationMessageProcessor( remotePeer adminservice.AdminServiceClient, metricsHandler metrics.Handler, taskExecutor namespace.ReplicationTaskExecutor, - hostInfo *membership.HostInfo, + hostInfo membership.HostInfo, serviceResolver membership.ServiceResolver, namespaceReplicationQueue persistence.NamespaceReplicationQueue, ) *namespaceReplicationMessageProcessor { @@ -89,7 +89,7 @@ func newNamespaceReplicationMessageProcessor( type ( namespaceReplicationMessageProcessor struct { - hostInfo *membership.HostInfo + hostInfo membership.HostInfo serviceResolver membership.ServiceResolver status int32 currentCluster string diff --git a/service/worker/replicator/replicator.go b/service/worker/replicator/replicator.go index 490fba2e8cd..0da07a494ab 100644 --- a/service/worker/replicator/replicator.go +++ b/service/worker/replicator/replicator.go @@ -49,7 +49,7 @@ type ( clientBean client.Bean logger log.Logger metricsHandler metrics.Handler - hostInfo *membership.HostInfo + hostInfo membership.HostInfo serviceResolver membership.ServiceResolver namespaceReplicationQueue persistence.NamespaceReplicationQueue @@ -68,7 +68,7 @@ func NewReplicator( clientBean client.Bean, logger log.Logger, metricsHandler metrics.Handler, - hostInfo *membership.HostInfo, + hostInfo membership.HostInfo, serviceResolver membership.ServiceResolver, namespaceReplicationQueue persistence.NamespaceReplicationQueue, namespaceReplicationTaskExecutor namespace.ReplicationTaskExecutor, diff --git a/service/worker/service.go b/service/worker/service.go index efee4a541c2..afad2580ecb 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -69,7 +69,7 @@ type ( clientBean client.Bean clusterMetadataManager persistence.ClusterMetadataManager metadataManager persistence.MetadataManager - hostInfo *membership.HostInfo + hostInfo membership.HostInfo executionManager persistence.ExecutionManager taskManager persistence.TaskManager historyClient historyservice.HistoryServiceClient diff --git a/tests/simpleMonitor.go b/tests/simpleMonitor.go index b5868c54665..542769be97d 100644 --- a/tests/simpleMonitor.go +++ b/tests/simpleMonitor.go @@ -33,7 +33,7 @@ import ( ) type simpleMonitor struct { - hostInfo *membership.HostInfo + hostInfo membership.HostInfo resolvers map[primitives.ServiceName]membership.ServiceResolver } @@ -44,7 +44,7 @@ func newSimpleMonitor(serviceName primitives.ServiceName, hosts map[primitives.S resolvers[service] = newSimpleResolver(service, hostList) } - hostInfo := membership.NewHostInfo(hosts[serviceName][0], map[string]string{membership.RoleKey: string(serviceName)}) + hostInfo := membership.NewHostInfoFromAddress(hosts[serviceName][0]) return &simpleMonitor{hostInfo, resolvers} } @@ -58,7 +58,7 @@ func (s *simpleMonitor) EvictSelf() error { return nil } -func (s *simpleMonitor) WhoAmI() (*membership.HostInfo, error) { +func (s *simpleMonitor) WhoAmI() (membership.HostInfo, error) { return s.hostInfo, nil } @@ -70,7 +70,7 @@ func (s *simpleMonitor) GetResolver(service primitives.ServiceName) (membership. return resolver, nil } -func (s *simpleMonitor) Lookup(service primitives.ServiceName, key string) (*membership.HostInfo, error) { +func (s *simpleMonitor) Lookup(service primitives.ServiceName, key string) (membership.HostInfo, error) { resolver, ok := s.resolvers[service] if !ok { return nil, fmt.Errorf("cannot lookup host for service %v", service) diff --git a/tests/simpleServiceResolver.go b/tests/simpleServiceResolver.go index f6a1546e6e2..b8931da6db7 100644 --- a/tests/simpleServiceResolver.go +++ b/tests/simpleServiceResolver.go @@ -32,21 +32,21 @@ import ( ) type simpleResolver struct { - hosts []*membership.HostInfo + hosts []membership.HostInfo hashfunc func([]byte) uint32 } // newSimpleResolver returns a service resolver that maintains static mapping // between services and host info func newSimpleResolver(service primitives.ServiceName, hosts []string) membership.ServiceResolver { - hostInfos := make([]*membership.HostInfo, 0, len(hosts)) + hostInfos := make([]membership.HostInfo, 0, len(hosts)) for _, host := range hosts { - hostInfos = append(hostInfos, membership.NewHostInfo(host, map[string]string{membership.RoleKey: string(service)})) + hostInfos = append(hostInfos, membership.NewHostInfoFromAddress(host)) } return &simpleResolver{hostInfos, farm.Fingerprint32} } -func (s *simpleResolver) Lookup(key string) (*membership.HostInfo, error) { +func (s *simpleResolver) Lookup(key string) (membership.HostInfo, error) { hash := int(s.hashfunc([]byte(key))) idx := hash % len(s.hosts) return s.hosts[idx], nil @@ -64,7 +64,7 @@ func (s *simpleResolver) MemberCount() int { return len(s.hosts) } -func (s *simpleResolver) Members() []*membership.HostInfo { +func (s *simpleResolver) Members() []membership.HostInfo { return s.hosts }