Skip to content

Commit

Permalink
refactor code
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll committed Nov 22, 2024
1 parent bee3d2b commit 31088f7
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 34 deletions.
2 changes: 1 addition & 1 deletion common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -4730,7 +4730,7 @@ var DurationKeys = map[DurationKey]DynamicDuration{
KeyName: "matching.partitionDownscaleSustainedDuration",
Filters: []Filter{DomainName, TaskListName, TaskType},
Description: "MatchingPartitionDownscaleSustainedDuration is the sustained period to wait before downscaling the number of partitions",
DefaultValue: time.Minute,
DefaultValue: 2 * time.Minute,
},
MatchingAdaptiveScalerUpdateInterval: {
KeyName: "matching.adaptiveScalerUpdateInterval",
Expand Down
71 changes: 38 additions & 33 deletions service/matching/tasklist/adaptive_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,29 +133,54 @@ func (a *adaptiveScalerImpl) run() {
return
}
qps := a.qpsTracker.QPS()
a.scope.UpdateGauge(metrics.EstimatedAddTaskQPSGauge, qps)
partitionConfig := a.getPartitionConfig()
// adjust the number of write partitions based on qps
numWritePartitions := a.adjustWritePartitions(qps, partitionConfig.NumWritePartitions)
// adjust the number of read partitions
numReadPartitions := a.adjustReadPartitions(partitionConfig.NumReadPartitions, numWritePartitions)

if numReadPartitions == partitionConfig.NumReadPartitions && numWritePartitions == partitionConfig.NumWritePartitions {
return
}
a.logger.Info("update the number of partitions", tag.CurrentQPS(qps), tag.NumReadPartitions(numReadPartitions), tag.NumWritePartitions(numWritePartitions))
a.scope.IncCounter(metrics.CadenceRequests)
err := a.tlMgr.UpdateTaskListPartitionConfig(a.ctx, &types.TaskListPartitionConfig{
NumReadPartitions: numReadPartitions,
NumWritePartitions: numWritePartitions,
})
if err != nil {
a.logger.Error("failed to update task list partition config", tag.Error(err))
a.scope.IncCounter(metrics.CadenceFailures)
}
}

func (a *adaptiveScalerImpl) getPartitionConfig() *types.TaskListPartitionConfig {
partitionConfig := a.tlMgr.TaskListPartitionConfig()
if partitionConfig == nil {
partitionConfig = &types.TaskListPartitionConfig{
NumReadPartitions: 1,
NumWritePartitions: 1,
}
}
// calculate the number of write partitions
return partitionConfig
}

func (a *adaptiveScalerImpl) adjustWritePartitions(qps float64, numWritePartitions int32) int32 {
upscaleThreshold := float64(a.config.PartitionUpscaleRPS())
downscaleThreshold := float64(a.config.PartitionDownscaleRPS())
if downscaleThreshold > upscaleThreshold {
downscaleThreshold = upscaleThreshold
a.logger.Warn("downscale threshold is larger than upscale threshold, use upscale threshold for downscale threshold instead")
}
numWritePartitions := partitionConfig.NumWritePartitions
numReadPartitions := partitionConfig.NumReadPartitions
a.scope.UpdateGauge(metrics.EstimatedAddTaskQPSGauge, qps)

result := numWritePartitions
if qps > upscaleThreshold {
if !a.overLoad {
a.overLoad = true
a.overLoadStartTime = a.timeSource.Now()
} else if a.timeSource.Now().Sub(a.overLoadStartTime) > a.config.PartitionUpscaleSustainedDuration() {
numWritePartitions = getNumberOfPartitions(partitionConfig.NumWritePartitions, qps, upscaleThreshold)
result = getNumberOfPartitions(numWritePartitions, qps, upscaleThreshold)
a.overLoad = false
}
} else {
Expand All @@ -166,26 +191,18 @@ func (a *adaptiveScalerImpl) run() {
a.underLoad = true
a.underLoadStartTime = a.timeSource.Now()
} else if a.timeSource.Now().Sub(a.underLoadStartTime) > a.config.PartitionDownscaleSustainedDuration() {
numWritePartitions = getNumberOfPartitions(partitionConfig.NumWritePartitions, qps, upscaleThreshold) // NOTE: this has to be upscaleThreshold
result = getNumberOfPartitions(numWritePartitions, qps, upscaleThreshold)
a.underLoad = false
}
} else {
a.underLoad = false
}
// determine the number of read partitions, it should be larger or equal to the number of write partitions
if numReadPartitions < numWritePartitions {
numReadPartitions = numWritePartitions
a.logger.Info("update the number of partitions", tag.CurrentQPS(qps), tag.NumReadPartitions(numReadPartitions), tag.NumWritePartitions(numWritePartitions))
a.scope.IncCounter(metrics.CadenceRequests)
err := a.tlMgr.UpdateTaskListPartitionConfig(a.ctx, &types.TaskListPartitionConfig{
NumReadPartitions: numReadPartitions,
NumWritePartitions: numWritePartitions,
})
if err != nil {
a.logger.Error("failed to update task list partition config", tag.Error(err))
a.scope.IncCounter(metrics.CadenceFailures)
}
return
return result
}

func (a *adaptiveScalerImpl) adjustReadPartitions(numReadPartitions, numWritePartitions int32) int32 {
if numReadPartitions <= numWritePartitions {
return numWritePartitions
}
// check the backlog of the drained partitions
for i := numReadPartitions - 1; i >= numWritePartitions; i-- {
Expand Down Expand Up @@ -218,19 +235,7 @@ func (a *adaptiveScalerImpl) run() {
break
}
}
if numReadPartitions == partitionConfig.NumReadPartitions && numWritePartitions == partitionConfig.NumWritePartitions {
return
}
a.logger.Info("update the number of partitions", tag.CurrentQPS(qps), tag.NumReadPartitions(numReadPartitions), tag.NumWritePartitions(numWritePartitions))
a.scope.IncCounter(metrics.CadenceRequests)
err := a.tlMgr.UpdateTaskListPartitionConfig(a.ctx, &types.TaskListPartitionConfig{
NumReadPartitions: numReadPartitions,
NumWritePartitions: numWritePartitions,
})
if err != nil {
a.logger.Error("failed to update task list partition config", tag.Error(err))
a.scope.IncCounter(metrics.CadenceFailures)
}
return numReadPartitions
}

func getTaskListType(taskListType int) *types.TaskListType {
Expand Down

0 comments on commit 31088f7

Please sign in to comment.