Skip to content

Commit

Permalink
Merge branch 'master' into ratelimitIntegrationTest
Browse files Browse the repository at this point in the history
  • Loading branch information
jakobht committed Apr 5, 2024
2 parents 87211d6 + 0d9c014 commit 2a61173
Show file tree
Hide file tree
Showing 49 changed files with 13,064 additions and 434 deletions.
866 changes: 866 additions & 0 deletions client/history/client_test.go

Large diffs are not rendered by default.

24 changes: 17 additions & 7 deletions client/history/peerResolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,25 @@ import (
// PeerResolver is used to resolve history peers.
// Those are deployed instances of Cadence history services that participate in the cluster ring.
// The resulting peer is simply an address of form ip:port where RPC calls can be routed to.
type PeerResolver struct {
//
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination peerResolver_mock.go -package history github.com/uber/cadence/client/history PeerResolver
type PeerResolver interface {
FromWorkflowID(workflowID string) (string, error)
FromDomainID(domainID string) (string, error)
FromShardID(shardID int) (string, error)
FromHostAddress(hostAddress string) (string, error)
GetAllPeers() ([]string, error)
}

type peerResolver struct {
numberOfShards int
resolver membership.Resolver
namedPort string // grpc or tchannel, depends on yarpc configuration
}

// NewPeerResolver creates a new history peer resolver.
func NewPeerResolver(numberOfShards int, resolver membership.Resolver, namedPort string) PeerResolver {
return PeerResolver{
return peerResolver{
numberOfShards: numberOfShards,
resolver: resolver,
namedPort: namedPort,
Expand All @@ -47,23 +57,23 @@ func NewPeerResolver(numberOfShards int, resolver membership.Resolver, namedPort
// FromWorkflowID resolves the history peer responsible for a given workflowID.
// WorkflowID is converted to logical shardID using a consistent hash function.
// FromShardID is used for further resolving.
func (pr PeerResolver) FromWorkflowID(workflowID string) (string, error) {
func (pr peerResolver) FromWorkflowID(workflowID string) (string, error) {
shardID := common.WorkflowIDToHistoryShard(workflowID, pr.numberOfShards)
return pr.FromShardID(shardID)
}

// FromDomainID resolves the history peer responsible for a given domainID.
// DomainID is converted to logical shardID using a consistent hash function.
// FromShardID is used for further resolving.
func (pr PeerResolver) FromDomainID(domainID string) (string, error) {
func (pr peerResolver) FromDomainID(domainID string) (string, error) {
shardID := common.DomainIDToHistoryShard(domainID, pr.numberOfShards)
return pr.FromShardID(shardID)
}

// FromShardID resolves the history peer responsible for a given logical shardID.
// It uses our membership provider to lookup which instance currently owns the given shard.
// FromHostAddress is used for further resolving.
func (pr PeerResolver) FromShardID(shardID int) (string, error) {
func (pr peerResolver) FromShardID(shardID int) (string, error) {
shardIDString := string(rune(shardID))
host, err := pr.resolver.Lookup(service.History, shardIDString)
if err != nil {
Expand All @@ -75,7 +85,7 @@ func (pr PeerResolver) FromShardID(shardID int) (string, error) {

// FromHostAddress resolves the final history peer responsible for the given host address.
// The address is formed by adding port for specified transport
func (pr PeerResolver) FromHostAddress(hostAddress string) (string, error) {
func (pr peerResolver) FromHostAddress(hostAddress string) (string, error) {
host, err := pr.resolver.LookupByAddress(service.History, hostAddress)
if err != nil {
return "", common.ToServiceTransientError(err)
Expand All @@ -85,7 +95,7 @@ func (pr PeerResolver) FromHostAddress(hostAddress string) (string, error) {
}

// GetAllPeers returns all history service peers in the cluster ring.
func (pr PeerResolver) GetAllPeers() ([]string, error) {
func (pr peerResolver) GetAllPeers() ([]string, error) {
hosts, err := pr.resolver.Members(service.History)
if err != nil {
return nil, common.ToServiceTransientError(err)
Expand Down
131 changes: 131 additions & 0 deletions client/history/peerResolver_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ ignore:
- "**/version.go"
- "bench/**"
- "canary/**"
- "cmd/**"
- "common/persistence/persistence-tests/**"
- "common/domain/errors.go"
- "common/log/**"
Expand All @@ -57,4 +58,5 @@ ignore:
- "idls/**"
- "service/history/workflow/errors.go"
- "testflags/**"
- "tools/common/schema/test/**"
- "tools/linter/**"
4 changes: 2 additions & 2 deletions common/domain/failover_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (p *failoverWatcherImpl) Stop() {

func (p *failoverWatcherImpl) refreshDomainLoop() {

timer := time.NewTimer(backoff.JitDuration(
timer := p.timeSource.NewTimer(backoff.JitDuration(
p.refreshInterval(),
p.refreshJitter(),
))
Expand All @@ -129,7 +129,7 @@ func (p *failoverWatcherImpl) refreshDomainLoop() {
select {
case <-p.shutdownChan:
return
case <-timer.C:
case <-timer.Chan():
domains := p.domainCache.GetAllDomain()
for _, domain := range domains {
p.handleFailoverTimeout(domain)
Expand Down
77 changes: 76 additions & 1 deletion common/domain/failover_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package domain

import (
"errors"
"log"
"os"
"testing"
Expand Down Expand Up @@ -76,7 +77,7 @@ func (s *failoverWatcherSuite) SetupTest() {
s.controller = gomock.NewController(s.T())

s.mockDomainCache = cache.NewMockDomainCache(s.controller)
s.timeSource = clock.NewRealTimeSource()
s.timeSource = clock.NewMockedTimeSource()
s.mockMetadataMgr = &mocks.MetadataManager{}

s.mockMetadataMgr.On("GetMetadata", mock.Anything).Return(&persistence.GetMetadataResponse{
Expand Down Expand Up @@ -243,3 +244,77 @@ func (s *failoverWatcherSuite) TestHandleFailoverTimeout() {
)
s.watcher.handleFailoverTimeout(domainEntry)
}

func (s *failoverWatcherSuite) TestStart() {
s.Assertions.Equal(common.DaemonStatusInitialized, s.watcher.status)
s.watcher.Start()
s.Assertions.Equal(common.DaemonStatusStarted, s.watcher.status)

// Verify that calling Start again does not change the status
s.watcher.Start()
s.Assertions.Equal(common.DaemonStatusStarted, s.watcher.status)
s.watcher.Stop()
}

func (s *failoverWatcherSuite) TestIsUpdateDomainRetryable() {
testCases := []struct {
name string
inputErr error
wantRetry bool
}{
{"nil error", nil, true},
{"non-nil error", errors.New("some error"), true},
}

for _, tc := range testCases {
s.Run(tc.name, func() {
retry := isUpdateDomainRetryable(tc.inputErr)
s.Equal(tc.wantRetry, retry)
})
}
}

func (s *failoverWatcherSuite) TestRefreshDomainLoop() {

domainName := "testDomain"
domainID := uuid.New()
failoverEndTime := common.Int64Ptr(time.Now().Add(-time.Hour).UnixNano()) // 1 hour in the past
mockTimeSource, _ := s.timeSource.(clock.MockedTimeSource)

domainInfo := &persistence.DomainInfo{ID: domainID, Name: domainName}
domainConfig := &persistence.DomainConfig{Retention: 1, EmitMetric: true}
replicationConfig := &persistence.DomainReplicationConfig{ActiveClusterName: "active", Clusters: []*persistence.ClusterReplicationConfig{{ClusterName: "active"}}}
domainEntry := cache.NewDomainCacheEntryForTest(domainInfo, domainConfig, true, replicationConfig, 1, failoverEndTime)

domainsMap := map[string]*cache.DomainCacheEntry{domainID: domainEntry}
s.mockDomainCache.EXPECT().GetAllDomain().Return(domainsMap).AnyTimes()

s.mockMetadataMgr.On("GetMetadata", mock.Anything).Return(&persistence.GetMetadataResponse{NotificationVersion: 1}, nil).Maybe()

s.mockMetadataMgr.On("GetDomain", mock.Anything, mock.AnythingOfType("*persistence.GetDomainRequest")).Return(&persistence.GetDomainResponse{
Info: domainInfo,
Config: domainConfig,
ReplicationConfig: replicationConfig,
IsGlobalDomain: true,
ConfigVersion: 1,
FailoverVersion: 1,
FailoverNotificationVersion: 1,
FailoverEndTime: failoverEndTime,
NotificationVersion: 1,
}, nil).Once()

s.mockMetadataMgr.On("UpdateDomain", mock.Anything, mock.Anything).Return(nil).Once()

s.watcher.Start()

// Delay to allow loop to start
time.Sleep(1 * time.Second)
mockTimeSource.Advance(12 * time.Second)
// Now stop the watcher, which should trigger the shutdown case in refreshDomainLoop
s.watcher.Stop()

// Enough time for shutdown process to complete
time.Sleep(1 * time.Second)

s.mockMetadataMgr.AssertExpectations(s.T())
}
File renamed without changes.
Loading

0 comments on commit 2a61173

Please sign in to comment.