-
Notifications
You must be signed in to change notification settings - Fork 812
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add taskList throttling to allow users to limit activities executed per second #432
Changes from 25 commits
8d1f383
4cf7326
68be53c
f1e96bf
812e46b
6874f95
b932509
993bf87
2a62e42
b0f92fc
9c84525
92a4e91
8650e55
0fa0e62
43c1a44
b1ad7a8
40269a4
7c2cd17
ca9b497
3dba4b0
b63dbd6
4c13d6a
a9c88f8
7494dcb
f69f892
fe0e8cc
e4debb3
310dbdc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -129,7 +129,8 @@ func (e *matchingEngineImpl) Stop() { | |
} | ||
|
||
func (e *matchingEngineImpl) getTaskLists(maxCount int) (lists []taskListManager) { | ||
e.taskListsLock.Lock() | ||
e.taskListsLock.RLock() | ||
defer e.taskListsLock.RUnlock() | ||
lists = make([]taskListManager, 0, len(e.taskLists)) | ||
count := 0 | ||
for _, tlMgr := range e.taskLists { | ||
|
@@ -139,7 +140,6 @@ func (e *matchingEngineImpl) getTaskLists(maxCount int) (lists []taskListManager | |
break | ||
} | ||
} | ||
e.taskListsLock.Unlock() | ||
return | ||
} | ||
|
||
|
@@ -153,20 +153,33 @@ func (e *matchingEngineImpl) String() string { | |
return r | ||
} | ||
|
||
// Returns taskListManager for a task list. If not already cached gets new range from DB and if successful creates one. | ||
func (e *matchingEngineImpl) getTaskListManager(taskList *taskListID) (taskListManager, error) { | ||
return e.getTaskListManagerWithRPS(taskList, nil) | ||
} | ||
|
||
// Returns taskListManager for a task list. If not already cached gets new range from DB and | ||
// if successful creates one. The passed in throttling limit determines how many tasks are | ||
// dispatcher per second. | ||
func (e *matchingEngineImpl) getTaskListManagerWithRPS( | ||
taskList *taskListID, maxDispatchPerSecond *float64, | ||
) (taskListManager, error) { | ||
// The first check is an optimization so almost all requests will have a task list manager | ||
// and return avoiding the write lock | ||
e.taskListsLock.RLock() | ||
if result, ok := e.taskLists[*taskList]; ok { | ||
e.taskListsLock.RUnlock() | ||
result.UpdateMaxDispatch(maxDispatchPerSecond) | ||
return result, nil | ||
} | ||
e.taskListsLock.RUnlock() | ||
mgr := newTaskListManager(e, taskList, e.config) | ||
// If it gets here, write lock and check again in case a task list is created between the two locks | ||
e.taskListsLock.Lock() | ||
if result, ok := e.taskLists[*taskList]; ok { | ||
result.UpdateMaxDispatch(maxDispatchPerSecond) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm little worried we are switching to a write lock on this hot path. Can we figure out a way which does not require acquiring the write lock on the top level map for TaskListMgr. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is one way with So we will have some duplicate code, we will be checking if it is in the map twice. That way, if it is in map originally, it will onyl write lock. Makes code uglier but avoids write lock in majority of the cases. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We already have that logic in GetTaskList. I'm not sure if this new method is needed. If you move the logic of setting the throttling limit to GetTask than we can simplify this code a lot. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed Update method but FYI most of this code is still needed for task list creation. |
||
e.taskListsLock.Unlock() | ||
return result, nil | ||
} | ||
mgr := newTaskListManager(e, taskList, e.config, maxDispatchPerSecond) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. move this after your RUnlock() on line 178 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like we had some dup code here in checking the map. Cleaning up. |
||
e.taskLists[*taskList] = mgr | ||
e.taskListsLock.Unlock() | ||
logging.LogTaskListLoadingEvent(e.logger, taskList.taskListName, taskList.taskType) | ||
|
@@ -179,6 +192,13 @@ func (e *matchingEngineImpl) getTaskListManager(taskList *taskListID) (taskListM | |
return mgr, nil | ||
} | ||
|
||
// For use in tests | ||
func (e *matchingEngineImpl) updateTaskList(taskList *taskListID, mgr taskListManager) { | ||
e.taskListsLock.Lock() | ||
defer e.taskListsLock.Unlock() | ||
e.taskLists[*taskList] = mgr | ||
} | ||
|
||
func (e *matchingEngineImpl) removeTaskListManager(id *taskListID) { | ||
e.taskListsLock.Lock() | ||
defer e.taskListsLock.Unlock() | ||
|
@@ -247,7 +267,7 @@ pollLoop: | |
// long-poll when frontend calls CancelOutstandingPoll API | ||
pollerCtx := context.WithValue(ctx, pollerIDKey, pollerID) | ||
taskList := newTaskListID(domainID, taskListName, persistence.TaskListTypeDecision) | ||
tCtx, err := e.getTask(pollerCtx, taskList) | ||
tCtx, err := e.getTask(pollerCtx, taskList, nil) | ||
if err != nil { | ||
// TODO: Is empty poll the best reply for errPumpClosed? | ||
if err == ErrNoTasks || err == errPumpClosed { | ||
|
@@ -341,10 +361,14 @@ pollLoop: | |
} | ||
|
||
taskList := newTaskListID(domainID, taskListName, persistence.TaskListTypeActivity) | ||
var maxDispatch *float64 | ||
if request.TaskListMetadata != nil { | ||
maxDispatch = request.TaskListMetadata.MaxTasksPerSecond | ||
} | ||
// Add frontend generated pollerID to context so tasklistMgr can support cancellation of | ||
// long-poll when frontend calls CancelOutstandingPoll API | ||
pollerCtx := context.WithValue(ctx, pollerIDKey, pollerID) | ||
tCtx, err := e.getTask(pollerCtx, taskList) | ||
tCtx, err := e.getTask(pollerCtx, taskList, maxDispatch) | ||
if err != nil { | ||
// TODO: Is empty poll the best reply for errPumpClosed? | ||
if err == ErrNoTasks || err == errPumpClosed { | ||
|
@@ -450,8 +474,10 @@ func (e *matchingEngineImpl) CancelOutstandingPoll(ctx context.Context, request | |
} | ||
|
||
// Loads a task from persistence and wraps it in a task context | ||
func (e *matchingEngineImpl) getTask(ctx context.Context, taskList *taskListID) (*taskContext, error) { | ||
tlMgr, err := e.getTaskListManager(taskList) | ||
func (e *matchingEngineImpl) getTask( | ||
ctx context.Context, taskList *taskListID, maxDispatchPerSecond *float64, | ||
) (*taskContext, error) { | ||
tlMgr, err := e.getTaskListManagerWithRPS(taskList, maxDispatchPerSecond) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if aren't gonna switch to atomics, consider moving this outside the lock so that, we don't block addition of new task lists, when another tasklist is blocked on UpdateMaxDispatch for multiple seconds if/when they roll out a change in poll limit