Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent shard loading #2888

Merged
merged 1 commit into from
Dec 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ var keys = map[Key]string{
EventsCacheMaxSize: "history.eventsCacheMaxSize",
EventsCacheTTL: "history.eventsCacheTTL",
AcquireShardInterval: "history.acquireShardInterval",
AcquireShardConcurrency: "history.acquireShardConcurrency",
StandbyClusterDelay: "history.standbyClusterDelay",
StandbyTaskMissingEventsResendDelay: "history.standbyTaskMissingEventsResendDelay",
StandbyTaskMissingEventsDiscardDelay: "history.standbyTaskMissingEventsDiscardDelay",
Expand Down Expand Up @@ -410,6 +411,8 @@ const (
EventsCacheTTL
// AcquireShardInterval is interval that timer used to acquire shard
AcquireShardInterval
// AcquireShardConcurrency is number of goroutines that can be used to acquire shards in the shard controller.
AcquireShardConcurrency
// StandbyClusterDelay is the artificial delay added to standby cluster's view of active cluster's time
StandbyClusterDelay
// StandbyTaskMissingEventsResendDelay is the amount of time standby cluster's will wait (if events are missing)
Expand Down
6 changes: 4 additions & 2 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ type Config struct {
EventsCacheTTL dynamicconfig.DurationPropertyFn

// ShardController settings
RangeSizeBits uint
AcquireShardInterval dynamicconfig.DurationPropertyFn
RangeSizeBits uint
AcquireShardInterval dynamicconfig.DurationPropertyFn
AcquireShardConcurrency dynamicconfig.IntPropertyFn

// the artificial delay added to standby cluster's view of active cluster's time
StandbyClusterDelay dynamicconfig.DurationPropertyFn
Expand Down Expand Up @@ -217,6 +218,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int, storeType strin
EventsCacheTTL: dc.GetDurationProperty(dynamicconfig.EventsCacheTTL, time.Hour),
RangeSizeBits: 20, // 20 bits for sequencer, 2^20 sequence number for any range
AcquireShardInterval: dc.GetDurationProperty(dynamicconfig.AcquireShardInterval, time.Minute),
AcquireShardConcurrency: dc.GetIntProperty(dynamicconfig.AcquireShardConcurrency, 1),
StandbyClusterDelay: dc.GetDurationProperty(dynamicconfig.StandbyClusterDelay, 5*time.Minute),
StandbyTaskMissingEventsResendDelay: dc.GetDurationProperty(dynamicconfig.StandbyTaskMissingEventsResendDelay, 15*time.Minute),
StandbyTaskMissingEventsDiscardDelay: dc.GetDurationProperty(dynamicconfig.StandbyTaskMissingEventsDiscardDelay, 25*time.Minute),
Expand Down
48 changes: 31 additions & 17 deletions service/history/shardController.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,25 +295,39 @@ func (c *shardController) acquireShards() {
sw := c.metricsScope.StartTimer(metrics.AcquireShardsLatency)
defer sw.Stop()

AcquireLoop:
for shardID := 0; shardID < c.config.NumberOfShards; shardID++ {
info, err := c.GetHistoryServiceResolver().Lookup(string(shardID))
if err != nil {
c.logger.Error("Error looking up host for shardID", tag.Error(err), tag.OperationFailed, tag.ShardID(shardID))
continue AcquireLoop
}

if info.Identity() == c.GetHostInfo().Identity() {
_, err1 := c.getEngineForShard(shardID)
if err1 != nil {
c.metricsScope.IncCounter(metrics.GetEngineForShardErrorCounter)
c.logger.Error("Unable to create history shard engine", tag.Error(err1), tag.OperationFailed, tag.ShardID(shardID))
continue AcquireLoop
concurrency := common.MaxInt(c.config.AcquireShardConcurrency(), 1)
shardActionCh := make(chan int, concurrency)
var wg sync.WaitGroup
wg.Add(concurrency)
// Spawn workers that would lookup and add/remove shards concurrently.
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
for shardID := range shardActionCh {
info, err := c.GetHistoryServiceResolver().Lookup(string(shardID))
if err != nil {
c.logger.Error("Error looking up host for shardID", tag.Error(err), tag.OperationFailed, tag.ShardID(shardID))
} else {
if info.Identity() == c.GetHostInfo().Identity() {
_, err1 := c.getEngineForShard(shardID)
if err1 != nil {
c.metricsScope.IncCounter(metrics.GetEngineForShardErrorCounter)
c.logger.Error("Unable to create history shard engine", tag.Error(err1), tag.OperationFailed, tag.ShardID(shardID))
}
} else {
c.removeEngineForShard(shardID)
}
}
}
} else {
c.removeEngineForShard(shardID)
}
}()
}
// Submit tasks to the channel.
for shardID := 0; shardID < c.config.NumberOfShards; shardID++ {
shardActionCh <- shardID
}
close(shardActionCh)
// Wait until all shards are processed.
wg.Wait()

c.metricsScope.UpdateGauge(metrics.NumShardsGauge, float64(c.numShards()))
}
Expand Down
83 changes: 83 additions & 0 deletions service/history/shardController_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package history
import (
"errors"
"fmt"
"github.com/uber/cadence/common/service/dynamicconfig"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -172,6 +173,88 @@ func (s *shardControllerSuite) TestAcquireShardSuccess() {
s.Equal(3, count)
}

func (s *shardControllerSuite) TestAcquireShardsConcurrently() {
numShards := 10
s.config.NumberOfShards = numShards
s.config.AcquireShardConcurrency = func(opts ...dynamicconfig.FilterOption) int {
return 10
}

replicationAck := int64(201)
currentClusterTransferAck := int64(210)
alternativeClusterTransferAck := int64(320)
currentClusterTimerAck := time.Now().Add(-100 * time.Second)
alternativeClusterTimerAck := time.Now().Add(-200 * time.Second)

var myShards []int
for shardID := 0; shardID < numShards; shardID++ {
hostID := shardID % 4
if hostID == 0 {
myShards = append(myShards, shardID)
s.mockHistoryEngine.EXPECT().Start().Return().Times(1)
s.mockServiceResolver.EXPECT().Lookup(string(shardID)).Return(s.hostInfo, nil).Times(2)
s.mockEngineFactory.On("CreateEngine", mock.Anything).Return(s.mockHistoryEngine).Once()
s.mockShardManager.On("GetShard", &persistence.GetShardRequest{ShardID: shardID}).Return(
&persistence.GetShardResponse{
ShardInfo: &persistence.ShardInfo{
ShardID: shardID,
Owner: s.hostInfo.Identity(),
RangeID: 5,
ReplicationAckLevel: replicationAck,
TransferAckLevel: currentClusterTransferAck,
TimerAckLevel: currentClusterTimerAck,
ClusterTransferAckLevel: map[string]int64{
cluster.TestCurrentClusterName: currentClusterTransferAck,
cluster.TestAlternativeClusterName: alternativeClusterTransferAck,
},
ClusterTimerAckLevel: map[string]time.Time{
cluster.TestCurrentClusterName: currentClusterTimerAck,
cluster.TestAlternativeClusterName: alternativeClusterTimerAck,
},
ClusterReplicationLevel: map[string]int64{},
},
}, nil).Once()
s.mockShardManager.On("UpdateShard", &persistence.UpdateShardRequest{
ShardInfo: &persistence.ShardInfo{
ShardID: shardID,
Owner: s.hostInfo.Identity(),
RangeID: 6,
StolenSinceRenew: 1,
ReplicationAckLevel: replicationAck,
TransferAckLevel: currentClusterTransferAck,
TimerAckLevel: currentClusterTimerAck,
ClusterTransferAckLevel: map[string]int64{
cluster.TestCurrentClusterName: currentClusterTransferAck,
cluster.TestAlternativeClusterName: alternativeClusterTransferAck,
},
ClusterTimerAckLevel: map[string]time.Time{
cluster.TestCurrentClusterName: currentClusterTimerAck,
cluster.TestAlternativeClusterName: alternativeClusterTimerAck,
},
TransferFailoverLevels: map[string]persistence.TransferFailoverLevel{},
TimerFailoverLevels: map[string]persistence.TimerFailoverLevel{},
ClusterReplicationLevel: map[string]int64{},
},
PreviousRangeID: 5,
}).Return(nil).Once()
} else {
ownerHost := fmt.Sprintf("test-acquire-shard-host-%v", hostID)
s.mockServiceResolver.EXPECT().Lookup(string(shardID)).Return(membership.NewHostInfo(ownerHost, nil), nil).Times(1)
}
}

// when shard is initialized, it will use the 2 mock function below to initialize the "current" time of each cluster
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestSingleDCClusterInfo).AnyTimes()
s.shardController.acquireShards()
count := 0
for _, shardID := range myShards {
s.NotNil(s.shardController.getEngineForShard(shardID))
count++
}
s.Equal(3, count)
}

func (s *shardControllerSuite) TestAcquireShardLookupFailure() {
numShards := 2
s.config.NumberOfShards = numShards
Expand Down