Skip to content

Commit

Permalink
Backoff on shard creation and adjust default number of persistence co…
Browse files Browse the repository at this point in the history
…nns (#876)

Shard creation happens on start of each history service host.  So if we
have large number of history hosts then we could overwhelm the cluster.
This change puts in a backoff shard creation and also update the logic
to only create shards if neccessary by doing a get first.
Also adjusted the default number of persistence connection for history
and execution manger to 50.
  • Loading branch information
samarabbas authored Jun 21, 2018
1 parent 0e397c5 commit 1844352
Showing 1 changed file with 48 additions and 16 deletions.
64 changes: 48 additions & 16 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ package history
import (
"time"

"github.com/uber-common/bark"
"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service"
Expand Down Expand Up @@ -134,8 +137,8 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config {
ReplicatorProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.ReplicatorProcessorMaxPollInterval, 1*time.Minute),
ReplicatorProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.ReplicatorProcessorMaxPollIntervalJitterCoefficient, 0.15),
ReplicatorProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.ReplicatorProcessorUpdateAckInterval, 5*time.Second),
ExecutionMgrNumConns: dc.GetIntProperty(dynamicconfig.ExecutionMgrNumConns, 100),
HistoryMgrNumConns: dc.GetIntProperty(dynamicconfig.HistoryMgrNumConns, 100),
ExecutionMgrNumConns: dc.GetIntProperty(dynamicconfig.ExecutionMgrNumConns, 50),
HistoryMgrNumConns: dc.GetIntProperty(dynamicconfig.HistoryMgrNumConns, 50),
MaximumBufferedEventsBatch: dc.GetIntProperty(dynamicconfig.MaximumBufferedEventsBatch, 100),
ShardUpdateMinInterval: dc.GetDurationProperty(dynamicconfig.ShardUpdateMinInterval, 5*time.Minute),
// history client: client/history/client.go set the client timeout 30s
Expand Down Expand Up @@ -198,20 +201,7 @@ func (s *Service) Start() {

// Hack to create shards for bootstrap purposes
// TODO: properly pre-create all shards before deployment.
for shardID := 0; shardID < p.CassandraConfig.NumHistoryShards; shardID++ {
err := shardMgr.CreateShard(&persistence.CreateShardRequest{
ShardInfo: &persistence.ShardInfo{
ShardID: shardID,
RangeID: 0,
TransferAckLevel: 0,
}})

if err != nil {
if _, ok := err.(*persistence.ShardAlreadyExistError); !ok {
log.Fatalf("failed to create shard for ShardId: %v, with error: %v", shardID, err)
}
}
}
s.createAllShards(p.CassandraConfig.NumHistoryShards, shardMgr, log)

metadata, err := persistence.NewMetadataManagerProxy(p.CassandraConfig.Hosts,
p.CassandraConfig.Port,
Expand Down Expand Up @@ -292,3 +282,45 @@ func (s *Service) Stop() {
}
s.params.Logger.Infof("%v stopped", common.HistoryServiceName)
}

func (s *Service) createAllShards(numShards int, shardMgr persistence.ShardManager, log bark.Logger) {
policy := backoff.NewExponentialRetryPolicy(50 * time.Millisecond)
policy.SetMaximumInterval(time.Second)
policy.SetExpirationInterval(5 * time.Second)

log.Infof("Starting check for shard creation of '%v' shards.", numShards)
for shardID := 0; shardID < numShards; shardID++ {
getShardOperation := func() error {
_, err := shardMgr.GetShard(&persistence.GetShardRequest{
ShardID: shardID,
})

return err
}

err := backoff.Retry(getShardOperation, policy, common.IsPersistenceTransientError)
if err != nil {
if _, ok := err.(*shared.EntityNotExistsError); !ok {
log.Fatalf("failed to get shard for ShardId: %v, with error: %v", shardID, err)
}

// Shard not found. Let's create shard for the very first time
createShardOperation := func() error {
return shardMgr.CreateShard(&persistence.CreateShardRequest{
ShardInfo: &persistence.ShardInfo{
ShardID: shardID,
RangeID: 0,
TransferAckLevel: 0,
}})
}

err := backoff.Retry(createShardOperation, policy, common.IsPersistenceTransientError)
if err != nil {
if _, ok := err.(*persistence.ShardAlreadyExistError); !ok {
log.Fatalf("failed to create shard for ShardId: %v, with error: %v", shardID, err)
}
}
}
}
log.Infof("All '%v' shards are created.", numShards)
}

0 comments on commit 1844352

Please sign in to comment.