Skip to content

Commit

Permalink
Use concurrent loading to acquire shards:
Browse files Browse the repository at this point in the history
- added a test for concurrent shard loading
- refactored code to use worker pool instead of channel.
- added comments
- set size of wg and shardActionCh to concurrency instead of number of shards
- spawning workers early before writing data to the channel
  • Loading branch information
Vitaly Arbuzov committed Dec 6, 2019
1 parent a76b957 commit 77a3d9a
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 19 deletions.
3 changes: 3 additions & 0 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ var keys = map[Key]string{
EventsCacheMaxSize: "history.eventsCacheMaxSize",
EventsCacheTTL: "history.eventsCacheTTL",
AcquireShardInterval: "history.acquireShardInterval",
AcquireShardConcurrency: "history.acquireShardConcurrency",
StandbyClusterDelay: "history.standbyClusterDelay",
StandbyTaskMissingEventsResendDelay: "history.standbyTaskMissingEventsResendDelay",
StandbyTaskMissingEventsDiscardDelay: "history.standbyTaskMissingEventsDiscardDelay",
Expand Down Expand Up @@ -410,6 +411,8 @@ const (
EventsCacheTTL
// AcquireShardInterval is interval that timer used to acquire shard
AcquireShardInterval
// AcquireShardConcurrency is number of goroutines that can be used to acquire shards in the shard controller.
AcquireShardConcurrency
// StandbyClusterDelay is the artificial delay added to standby cluster's view of active cluster's time
StandbyClusterDelay
// StandbyTaskMissingEventsResendDelay is the amount of time standby cluster's will wait (if events are missing)
Expand Down
6 changes: 4 additions & 2 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ type Config struct {
EventsCacheTTL dynamicconfig.DurationPropertyFn

// ShardController settings
RangeSizeBits uint
AcquireShardInterval dynamicconfig.DurationPropertyFn
RangeSizeBits uint
AcquireShardInterval dynamicconfig.DurationPropertyFn
AcquireShardConcurrency dynamicconfig.IntPropertyFn

// the artificial delay added to standby cluster's view of active cluster's time
StandbyClusterDelay dynamicconfig.DurationPropertyFn
Expand Down Expand Up @@ -217,6 +218,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int, storeType strin
EventsCacheTTL: dc.GetDurationProperty(dynamicconfig.EventsCacheTTL, time.Hour),
RangeSizeBits: 20, // 20 bits for sequencer, 2^20 sequence number for any range
AcquireShardInterval: dc.GetDurationProperty(dynamicconfig.AcquireShardInterval, time.Minute),
AcquireShardConcurrency: dc.GetIntProperty(dynamicconfig.AcquireShardConcurrency, 1),
StandbyClusterDelay: dc.GetDurationProperty(dynamicconfig.StandbyClusterDelay, 5*time.Minute),
StandbyTaskMissingEventsResendDelay: dc.GetDurationProperty(dynamicconfig.StandbyTaskMissingEventsResendDelay, 15*time.Minute),
StandbyTaskMissingEventsDiscardDelay: dc.GetDurationProperty(dynamicconfig.StandbyTaskMissingEventsDiscardDelay, 25*time.Minute),
Expand Down
48 changes: 31 additions & 17 deletions service/history/shardController.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,25 +295,39 @@ func (c *shardController) acquireShards() {
sw := c.metricsScope.StartTimer(metrics.AcquireShardsLatency)
defer sw.Stop()

AcquireLoop:
for shardID := 0; shardID < c.config.NumberOfShards; shardID++ {
info, err := c.GetHistoryServiceResolver().Lookup(string(shardID))
if err != nil {
c.logger.Error("Error looking up host for shardID", tag.Error(err), tag.OperationFailed, tag.ShardID(shardID))
continue AcquireLoop
}

if info.Identity() == c.GetHostInfo().Identity() {
_, err1 := c.getEngineForShard(shardID)
if err1 != nil {
c.metricsScope.IncCounter(metrics.GetEngineForShardErrorCounter)
c.logger.Error("Unable to create history shard engine", tag.Error(err1), tag.OperationFailed, tag.ShardID(shardID))
continue AcquireLoop
concurrency := common.MaxInt(c.config.AcquireShardConcurrency(), 1)
shardActionCh := make(chan int, concurrency)
var wg sync.WaitGroup
wg.Add(concurrency)
// Spawn workers that would lookup and add/remove shards concurrently.
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
for shardID := range shardActionCh {
info, err := c.GetHistoryServiceResolver().Lookup(string(shardID))
if err != nil {
c.logger.Error("Error looking up host for shardID", tag.Error(err), tag.OperationFailed, tag.ShardID(shardID))
} else {
if info.Identity() == c.GetHostInfo().Identity() {
_, err1 := c.getEngineForShard(shardID)
if err1 != nil {
c.metricsScope.IncCounter(metrics.GetEngineForShardErrorCounter)
c.logger.Error("Unable to create history shard engine", tag.Error(err1), tag.OperationFailed, tag.ShardID(shardID))
}
} else {
c.removeEngineForShard(shardID)
}
}
}
} else {
c.removeEngineForShard(shardID)
}
}()
}
// Submit tasks to the channel.
for shardID := 0; shardID < c.config.NumberOfShards; shardID++ {
shardActionCh <- shardID
}
close(shardActionCh)
// Wait until all shards are processed.
wg.Wait()

c.metricsScope.UpdateGauge(metrics.NumShardsGauge, float64(c.numShards()))
}
Expand Down
83 changes: 83 additions & 0 deletions service/history/shardController_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package history
import (
"errors"
"fmt"
"github.com/uber/cadence/common/service/dynamicconfig"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -172,6 +173,88 @@ func (s *shardControllerSuite) TestAcquireShardSuccess() {
s.Equal(3, count)
}

func (s *shardControllerSuite) TestAcquireShardsConcurrently() {
numShards := 10
s.config.NumberOfShards = numShards
s.config.AcquireShardConcurrency = func(opts ...dynamicconfig.FilterOption) int {
return 10
}

replicationAck := int64(201)
currentClusterTransferAck := int64(210)
alternativeClusterTransferAck := int64(320)
currentClusterTimerAck := time.Now().Add(-100 * time.Second)
alternativeClusterTimerAck := time.Now().Add(-200 * time.Second)

var myShards []int
for shardID := 0; shardID < numShards; shardID++ {
hostID := shardID % 4
if hostID == 0 {
myShards = append(myShards, shardID)
s.mockHistoryEngine.EXPECT().Start().Return().Times(1)
s.mockServiceResolver.EXPECT().Lookup(string(shardID)).Return(s.hostInfo, nil).Times(2)
s.mockEngineFactory.On("CreateEngine", mock.Anything).Return(s.mockHistoryEngine).Once()
s.mockShardManager.On("GetShard", &persistence.GetShardRequest{ShardID: shardID}).Return(
&persistence.GetShardResponse{
ShardInfo: &persistence.ShardInfo{
ShardID: shardID,
Owner: s.hostInfo.Identity(),
RangeID: 5,
ReplicationAckLevel: replicationAck,
TransferAckLevel: currentClusterTransferAck,
TimerAckLevel: currentClusterTimerAck,
ClusterTransferAckLevel: map[string]int64{
cluster.TestCurrentClusterName: currentClusterTransferAck,
cluster.TestAlternativeClusterName: alternativeClusterTransferAck,
},
ClusterTimerAckLevel: map[string]time.Time{
cluster.TestCurrentClusterName: currentClusterTimerAck,
cluster.TestAlternativeClusterName: alternativeClusterTimerAck,
},
ClusterReplicationLevel: map[string]int64{},
},
}, nil).Once()
s.mockShardManager.On("UpdateShard", &persistence.UpdateShardRequest{
ShardInfo: &persistence.ShardInfo{
ShardID: shardID,
Owner: s.hostInfo.Identity(),
RangeID: 6,
StolenSinceRenew: 1,
ReplicationAckLevel: replicationAck,
TransferAckLevel: currentClusterTransferAck,
TimerAckLevel: currentClusterTimerAck,
ClusterTransferAckLevel: map[string]int64{
cluster.TestCurrentClusterName: currentClusterTransferAck,
cluster.TestAlternativeClusterName: alternativeClusterTransferAck,
},
ClusterTimerAckLevel: map[string]time.Time{
cluster.TestCurrentClusterName: currentClusterTimerAck,
cluster.TestAlternativeClusterName: alternativeClusterTimerAck,
},
TransferFailoverLevels: map[string]persistence.TransferFailoverLevel{},
TimerFailoverLevels: map[string]persistence.TimerFailoverLevel{},
ClusterReplicationLevel: map[string]int64{},
},
PreviousRangeID: 5,
}).Return(nil).Once()
} else {
ownerHost := fmt.Sprintf("test-acquire-shard-host-%v", hostID)
s.mockServiceResolver.EXPECT().Lookup(string(shardID)).Return(membership.NewHostInfo(ownerHost, nil), nil).Times(1)
}
}

// when shard is initialized, it will use the 2 mock function below to initialize the "current" time of each cluster
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestSingleDCClusterInfo).AnyTimes()
s.shardController.acquireShards()
count := 0
for _, shardID := range myShards {
s.NotNil(s.shardController.getEngineForShard(shardID))
count++
}
s.Equal(3, count)
}

func (s *shardControllerSuite) TestAcquireShardLookupFailure() {
numShards := 2
s.config.NumberOfShards = numShards
Expand Down

0 comments on commit 77a3d9a

Please sign in to comment.