Skip to content

Commit

Permalink
Fixes balancing issues under heavy load.
Browse files Browse the repository at this point in the history
Signed-off-by: rafa-be <[email protected]>
  • Loading branch information
rafa-be committed Jan 29, 2025
1 parent 5153efc commit 32bbc16
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 27 deletions.
2 changes: 1 addition & 1 deletion scaler/about.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.9.0"
__version__ = "1.9.1"
19 changes: 10 additions & 9 deletions scaler/protocol/capnp/common.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@

enum TaskStatus {
# task is accepted by scheduler, but will have below status
success @0; # if submit and task is done and get result
failed @1; # if submit and task is failed on worker
canceled @2; # if submit and task is canceled
notFound @3; # if submit and task is not found in scheduler
workerDied @4; # if submit and worker died (only happened when scheduler keep_task=False)
noWorker @5; # if submit and scheduler is full (not implemented yet)
success @0; # if submit and task is done and get result
failed @1; # if submit and task is failed on worker
canceled @2; # if submit and task is canceled
cancelFailed @3; # if submit and task is canceled
notFound @4; # if submit and task is not found in scheduler
workerDied @5; # if submit and worker died (only happened when scheduler keep_task=False)
noWorker @6; # if submit and scheduler is full (not implemented yet)

# below are only used for monitoring channel, not sent to client
inactive @6; # task is scheduled but not allocate to worker
running @7; # task is running in worker
canceling @8; # task is canceling (can be in Inactive or Running state)
inactive @7; # task is scheduled but not allocate to worker
running @8; # task is running in worker
canceling @9; # task is canceling (can be in Inactive or Running state)
}

struct ObjectContent {
Expand Down
1 change: 1 addition & 0 deletions scaler/protocol/python/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class TaskStatus(enum.Enum):
Success = _common.TaskStatus.success # if submit and task is done and get result
Failed = _common.TaskStatus.failed # if submit and task is failed on worker
Canceled = _common.TaskStatus.canceled # if submit and task is canceled
CancelFailed = _common.TaskStatus.cancelFailed # if submit and task is canceled
NotFound = _common.TaskStatus.notFound # if submit and task is not found in scheduler
WorkerDied = (
_common.TaskStatus.workerDied
Expand Down
2 changes: 1 addition & 1 deletion scaler/scheduler/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ async def on_task_cancel(self, task_cancel: TaskCancel):
raise NotImplementedError()

@abc.abstractmethod
async def on_task_result(self, task_result: TaskResult):
async def on_task_result(self, worker: bytes, task_result: TaskResult):
raise NotImplementedError()

@abc.abstractmethod
Expand Down
2 changes: 1 addition & 1 deletion scaler/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ async def on_receive_message(self, source: bytes, message: Message):

# receive task result from downstream
if isinstance(message, TaskResult):
await self._worker_manager.on_task_result(message)
await self._worker_manager.on_task_result(source, message)
return

# scheduler receives worker disconnect request from downstream
Expand Down
54 changes: 41 additions & 13 deletions scaler/scheduler/worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,26 +69,54 @@ async def on_task_cancel(self, task_cancel: TaskCancel):

await self._binder.send(worker, TaskCancel.new_msg(task_cancel.task_id))

async def on_task_result(self, task_result: TaskResult):
worker = self._allocator.remove_task(task_result.task_id)

if task_result.status in {TaskStatus.Canceled, TaskStatus.NotFound}:
if worker is not None:
# The worker canceled the task, but the scheduler still had it queued. Re-route the task to another
# worker.
await self.__reroute_tasks([task_result.task_id])
else:
await self._task_manager.on_task_done(task_result)
async def on_task_result(self, worker: bytes, task_result: TaskResult):
valid_statuses = [
TaskStatus.Canceled, TaskStatus.CancelFailed, TaskStatus.Failed, TaskStatus.NotFound, TaskStatus.Success
]

if task_result.status not in valid_statuses:
logging.error(
f"received invalid task status={task_result.status} for task_id={task_result.task_id.hex()} from "
f"worker={worker!r}")
return

if worker is None:
assigned_worker = self._allocator.get_assigned_worker(task_result.task_id)

if assigned_worker is None:
logging.error(
f"received unknown task result for task_id={task_result.task_id.hex()}, status={task_result.status} "
f"received task result for unknown task_id={task_result.task_id.hex()}, status={task_result.status} "
f"might due to worker get disconnected or canceled"
)
return

if worker != assigned_worker:
# Assigned worker might be different from the received message's worker if the task previously got rerouted
# to another worker.
logging.warning(
f"received task result from invalid worker for task_id={task_result.task_id.hex()}, "
f"status={task_result.status} might be due to cancelled or re-routed task"
)
return

assert worker == assigned_worker

if task_result.status == TaskStatus.CancelFailed:
# Cancel failures occur when a task cancellation was requested on an actively running task, with the `force`
# parameter set to `False` (i.e. during balancing).
# We can safely ignore these messages. In the future, we might want to stop considering these tasks for
# balancing.
return

self._allocator.remove_task(task_result.task_id)

if task_result.status == TaskStatus.Canceled:
# The worker canceled the task, but the scheduler still had it assigned to this worker. Re-route the task to
# another worker.
await self.__reroute_tasks([task_result.task_id])
return

assert task_result.status in [TaskStatus.Failed, TaskStatus.NotFound, TaskStatus.Success]

await self._task_manager.on_task_done(task_result)

async def on_heartbeat(self, worker: bytes, info: WorkerHeartbeat):
Expand Down Expand Up @@ -190,7 +218,7 @@ async def __do_balance(self, current_advice: Dict[bytes, List[bytes]]):
for worker, task_ids in current_advice.items():
await self._binder_monitor.send(StateBalanceAdvice.new_msg(worker, task_ids))

task_cancel_flags = TaskCancel.TaskCancelFlags(force=True, retrieve_task_object=False)
task_cancel_flags = TaskCancel.TaskCancelFlags(force=False, retrieve_task_object=False)

self._last_balance_advice = current_advice
for worker, task_ids in current_advice.items():
Expand Down
10 changes: 8 additions & 2 deletions scaler/worker/agent/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,19 @@ async def on_cancel_task(self, task_cancel: TaskCancel):
task_id = task_cancel.task_id

task_not_found = task_id not in self._processing_task_ids and task_id not in self._queued_task_id_to_task
task_is_processing = task_id in self._processing_task_ids

if task_not_found or (task_is_processing and not task_cancel.flags.force):
if task_not_found:
result = TaskResult.new_msg(task_id, TaskStatus.NotFound)
await self._connector_external.send(result)
return

task_is_processing = task_id in self._processing_task_ids

if task_is_processing and not task_cancel.flags.force:
result = TaskResult.new_msg(task_id, TaskStatus.CancelFailed)
await self._connector_external.send(result)
return

# A suspended task will be both processing AND queued

if task_cancel.task_id in self._queued_task_id_to_task:
Expand Down

0 comments on commit 32bbc16

Please sign in to comment.