Skip to content

Commit

Permalink
Handle throttle error and not propagate up to AddTask
Browse files Browse the repository at this point in the history
  • Loading branch information
madhuravi committed Jan 5, 2018
1 parent 7494dcb commit f69f892
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 36 deletions.
6 changes: 4 additions & 2 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,8 @@ const (
LeaseFailureCounter
ConditionFailedErrorCounter
RespondQueryTaskFailedCounter
AddThrottleCounter
SyncThrottleCounter
BufferThrottleCounter
)

// MetricDefs record the metrics for all services
Expand Down Expand Up @@ -680,7 +681,8 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
LeaseFailureCounter: {metricName: "lease.failures"},
ConditionFailedErrorCounter: {metricName: "condition-failed-errors"},
RespondQueryTaskFailedCounter: {metricName: "respond-query-failed"},
AddThrottleCounter: {metricName: "add.throttle.count"},
SyncThrottleCounter: {metricName: "sync.throttle.count"},
BufferThrottleCounter: {metricName: "buffer.throttle.count"},
},
}

Expand Down
51 changes: 22 additions & 29 deletions service/matching/matchingEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"fmt"
"net/http"
"os"
"strings"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -466,6 +465,9 @@ func (s *matchingEngineSuite) TestSyncMatchActivities() {
tl := "makeToast"
tlID := &taskListID{domainID: domainID, taskListName: tl, taskType: persistence.TaskListTypeActivity}
s.matchingEngine.config.RangeSize = rangeSize // override to low number for the test
// So we can get snapshots
scope := tally.NewTestScope("test", nil)
s.matchingEngine.metricsClient = metrics.NewClient(scope, metrics.Matching)

dispatchTTL := time.Nanosecond
dPtr := _defaultTaskDispatchRPS
Expand Down Expand Up @@ -510,16 +512,15 @@ func (s *matchingEngineSuite) TestSyncMatchActivities() {
})}
}, nil)

zeroDispatchCt := 0
for i := int64(0); i < taskCount; i++ {
scheduleID := i * 3

var wg sync.WaitGroup

var result *workflow.PollForActivityTaskResponse
var pollErr error
maxDispatch := float64(i)
if i%2 == 0 {
maxDispatch := _defaultTaskDispatchRPS
if i == taskCount/2 {
maxDispatch = 0
}
wg.Add(1)
Expand All @@ -544,14 +545,6 @@ func (s *matchingEngineSuite) TestSyncMatchActivities() {
ScheduleToStartTimeoutSeconds: common.Int32Ptr(1),
}
err := s.matchingEngine.AddActivityTask(&addRequest)
if err != nil && strings.Contains(err.Error(), "limit exceeded") {
wg.Wait()
s.NoError(pollErr)
s.Equal(0, len(result.TaskToken))
time.Sleep(dispatchTTL) // Sleep should be atleast ttl so max Dispatch gets updated
zeroDispatchCt++
continue
}
wg.Wait()
s.NoError(err)
s.NoError(pollErr)
Expand All @@ -576,8 +569,11 @@ func (s *matchingEngineSuite) TestSyncMatchActivities() {

s.EqualValues(string(taskToken), string(result.TaskToken))
}
s.Equal(taskCount/2, zeroDispatchCt) // Check zero dispatch = Polls with 0
s.EqualValues(0, s.taskManager.getCreateTaskCount(tlID)) // Not tasks stored in persistence

time.Sleep(20 * time.Millisecond) // So any buffer tasks from 0 rps get picked up
syncCtr := scope.Snapshot().Counters()["test.sync.throttle.count+operation=TaskListMgr"]
s.Equal(1, int(syncCtr.Value())) // Check times zero rps is set = throttle counter
s.EqualValues(1, s.taskManager.getCreateTaskCount(tlID)) // Check times zero rps is set = Tasks stored in persistence
s.EqualValues(0, s.taskManager.getTaskCount(tlID))
expectedRange := int64(initialRangeID + taskCount/rangeSize)
if taskCount%rangeSize > 0 {
Expand All @@ -593,8 +589,8 @@ func (s *matchingEngineSuite) TestConcurrentPublishConsumeActivities() {
}
const workerCount = 20
const taskCount = 100
errCt := s.concurrentPublishConsumeActivities(workerCount, taskCount, dispatchLimitFn)
s.Zero(errCt)
throttleCt := s.concurrentPublishConsumeActivities(workerCount, taskCount, dispatchLimitFn)
s.Zero(throttleCt)
}

func (s *matchingEngineSuite) TestConcurrentPublishConsumeActivitiesWithZeroDispatch() {
Expand All @@ -608,15 +604,17 @@ func (s *matchingEngineSuite) TestConcurrentPublishConsumeActivitiesWithZeroDisp
}
const workerCount = 20
const taskCount = 100
errCt := s.concurrentPublishConsumeActivities(workerCount, taskCount, dispatchLimitFn)
s.matchingEngine.metricsClient = metrics.NewClient(tally.NewTestScope("test", nil), metrics.Matching)
throttleCt := s.concurrentPublishConsumeActivities(workerCount, taskCount, dispatchLimitFn)
// atleast once from 0 dispatch poll, and until TTL is hit at which time throttle limit is reset
// hard to predict exactly how many times, since the atomic.Value load might not have updated.
s.True(errCt >= 1 && errCt < (workerCount*int(taskCount)))
s.True(throttleCt >= 1 && throttleCt < int64(workerCount*int(taskCount)))
}

func (s *matchingEngineSuite) concurrentPublishConsumeActivities(
workerCount int, taskCount int64, dispatchLimitFn func(int, int64) float64,
) int {
workerCount int, taskCount int64, dispatchLimitFn func(int, int64) float64) int64 {
scope := tally.NewTestScope("test", nil)
s.matchingEngine.metricsClient = metrics.NewClient(scope, metrics.Matching)
runID := "run1"
workflowID := "workflow1"
workflowExecution := workflow.WorkflowExecution{RunId: &runID, WorkflowId: &workflowID}
Expand Down Expand Up @@ -644,8 +642,6 @@ func (s *matchingEngineSuite) concurrentPublishConsumeActivities(
var wg sync.WaitGroup
wg.Add(2 * workerCount)

throttleCt := 0
var throttleMu sync.Mutex
for p := 0; p < workerCount; p++ {
go func() {
defer wg.Done()
Expand All @@ -661,12 +657,6 @@ func (s *matchingEngineSuite) concurrentPublishConsumeActivities(

err := s.matchingEngine.AddActivityTask(&addRequest)
if err != nil {
if strings.Contains(err.Error(), "limit exceeded") {
throttleMu.Lock()
throttleCt++
throttleMu.Unlock()
time.Sleep(dispatchTTL)
}
s.logger.Infof("Failure in AddActivityTask: %v", err)
i--
}
Expand Down Expand Up @@ -754,7 +744,10 @@ func (s *matchingEngineSuite) concurrentPublishConsumeActivities(
// Due to conflicts some ids are skipped and more real ranges are used.
s.True(expectedRange <= s.taskManager.getTaskListManager(tlID).rangeID)
s.EqualValues(0, s.taskManager.getTaskCount(tlID))
return throttleCt

syncCtr := scope.Snapshot().Counters()["test.sync.throttle.count+operation=TaskListMgr"]
bufCtr := scope.Snapshot().Counters()["test.buffer.throttle.count+operation=TaskListMgr"]
return syncCtr.Value() + bufCtr.Value()
}

func (s *matchingEngineSuite) TestConcurrentPublishConsumeDecisions() {
Expand Down
11 changes: 6 additions & 5 deletions service/matching/taskListManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ const (
_defaultTaskDispatchRPSTTL = 60 * time.Second
)

var errAddTasklistThrottled = fmt.Errorf("cannot add to tasklist, limit exceeded")

type taskListManager interface {
Start() error
Stop()
Expand Down Expand Up @@ -282,7 +284,7 @@ func (c *taskListManagerImpl) AddTask(execution *s.WorkflowExecution, taskInfo *
c.startWG.Wait()
_, err := c.executeWithRetry(func(rangeID int64) (interface{}, error) {
r, err := c.trySyncMatch(taskInfo)
if err != nil || r != nil {
if (err != nil && err != errAddTasklistThrottled) || r != nil {
return r, err
}
r, err = c.taskWriter.appendTask(execution, taskInfo, rangeID)
Expand Down Expand Up @@ -582,9 +584,8 @@ func (c *taskListManagerImpl) trySyncMatch(task *persistence.TaskInfo) (*persist

rsv := c.rateLimiter.Reserve()
if !rsv.OK() {
scope := metrics.MatchingTaskListMgrScope
c.metricsClient.IncCounter(scope, metrics.AddThrottleCounter)
return nil, fmt.Errorf("cannot add to tasklist, limit exceeded")
c.metricsClient.IncCounter(metrics.MatchingTaskListMgrScope, metrics.SyncThrottleCounter)
return nil, errAddTasklistThrottled
}
select {
case c.tasksForPoll <- request: // poller goroutine picked up the task
Expand All @@ -606,7 +607,7 @@ deliverBufferTasksLoop:
break deliverBufferTasksLoop
}
c.logger.Warnf("Unable to send tasks for poll, rate limit failed: %s", err.Error())
c.metricsClient.IncCounter(metrics.MatchingTaskListMgrScope, metrics.AddThrottleCounter)
c.metricsClient.IncCounter(metrics.MatchingTaskListMgrScope, metrics.BufferThrottleCounter)
continue
}
select {
Expand Down

0 comments on commit f69f892

Please sign in to comment.