Skip to content

Commit

Permalink
make simple resolver match new start behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
alfred-landrum committed Jul 5, 2023
1 parent 593ed05 commit 9a9a499
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 13 deletions.
16 changes: 10 additions & 6 deletions tests/simple_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,27 @@ import (
)

type simpleMonitor struct {
resolvers map[primitives.ServiceName]membership.ServiceResolver
hosts map[primitives.ServiceName][]string
resolvers map[primitives.ServiceName]*simpleResolver
}

// NewSimpleMonitor returns a simple monitor interface
func newSimpleMonitor(hosts map[primitives.ServiceName][]string) *simpleMonitor {
resolvers := make(map[primitives.ServiceName]membership.ServiceResolver, len(hosts))
resolvers := make(map[primitives.ServiceName]*simpleResolver, len(hosts))
for service, hostList := range hosts {
resolvers[service] = newSimpleResolver(service, hostList)
}

return &simpleMonitor{resolvers}
return &simpleMonitor{
hosts: hosts,
resolvers: resolvers,
}
}

func (s *simpleMonitor) Start() {
}

func (s *simpleMonitor) Stop() {
for service, r := range s.resolvers {
r.start(s.hosts[service])
}
}

func (s *simpleMonitor) EvictSelf() error {
Expand Down
68 changes: 61 additions & 7 deletions tests/simple_service_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,47 +25,101 @@
package tests

import (
"sync"

"github.com/dgryski/go-farm"

"go.temporal.io/server/common/membership"
"go.temporal.io/server/common/primitives"
)

type simpleResolver struct {
hosts []membership.HostInfo
mu sync.Mutex
hostInfos []membership.HostInfo
listeners map[string]chan<- *membership.ChangedEvent

hashfunc func([]byte) uint32
}

// newSimpleResolver returns a service resolver that maintains static mapping
// between services and host info
func newSimpleResolver(service primitives.ServiceName, hosts []string) membership.ServiceResolver {
func newSimpleResolver(service primitives.ServiceName, hosts []string) *simpleResolver {
hostInfos := make([]membership.HostInfo, 0, len(hosts))
for _, host := range hosts {
hostInfos = append(hostInfos, membership.NewHostInfoFromAddress(host))
}
return &simpleResolver{hostInfos, farm.Fingerprint32}
return &simpleResolver{
hostInfos: hostInfos,
hashfunc: farm.Fingerprint32,
listeners: make(map[string]chan<- *membership.ChangedEvent),
}
}

func (s *simpleResolver) start(hosts []string) {
hostInfos := make([]membership.HostInfo, 0, len(hosts))
for _, host := range hosts {
hostInfos = append(hostInfos, membership.NewHostInfoFromAddress(host))
}
event := &membership.ChangedEvent{
HostsAdded: hostInfos,
}

s.mu.Lock()
defer s.mu.Unlock()

s.hostInfos = hostInfos

for _, ch := range s.listeners {
select {
case ch <- event:
default:
}
}
}

func (s *simpleResolver) Lookup(key string) (membership.HostInfo, error) {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.hostInfos) == 0 {
return nil, membership.ErrInsufficientHosts
}
hash := int(s.hashfunc([]byte(key)))
idx := hash % len(s.hosts)
return s.hosts[idx], nil
idx := hash % len(s.hostInfos)
return s.hostInfos[idx], nil
}

func (s *simpleResolver) AddListener(name string, notifyChannel chan<- *membership.ChangedEvent) error {
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.listeners[name]
if ok {
return membership.ErrListenerAlreadyExist
}
s.listeners[name] = notifyChannel
return nil
}

func (s *simpleResolver) RemoveListener(name string) error {
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.listeners[name]
if !ok {
return nil
}
delete(s.listeners, name)
return nil
}

func (s *simpleResolver) MemberCount() int {
return len(s.hosts)
s.mu.Lock()
defer s.mu.Unlock()
return len(s.hostInfos)
}

func (s *simpleResolver) Members() []membership.HostInfo {
return s.hosts
s.mu.Lock()
defer s.mu.Unlock()
return s.hostInfos
}

func (s *simpleResolver) RequestRefresh() {
Expand Down

0 comments on commit 9a9a499

Please sign in to comment.