Skip to content

Commit

Permalink
[serve] Stop scheduling task early when requests have been cancelled (#…
Browse files Browse the repository at this point in the history
…47847)

In `fulfill_pending_requests`, there are two nested loops:
- the outer loop greedily fulfills more requests so that if backoff
doesn't occur, it's not necessary for new asyncio tasks to be started to
fulfill each request
- the inner loop handles backoff if replicas can't be found to fulfill
the next request

The outer loop will be stopped if there are enough tasks to handle all
pending requests. However if all replicas are at max capacity, it's
possible for the inner loop to continue to loop even when the task is no
longer needed (e.g. when a request has been cancelled), because the
inner loop simply continues to try to find an available replica without
checking if the current task is even necessary.

This PR makes sure that at the end of each iteration of the inner loop,
it clears out requests in `pending_requests_to_fulfill` that have been
cancelled, and then breaks out of the loop if there are enough tasks to
handle the remaining requests.

Tests:
- Added a test that tests for the scenario where a request is cancelled
while it's trying to find an available replica
- Also modified the tests in `test_pow_2_scheduler.py` so that the
backoff sequence is small values (1ms), and the timeouts in the tests
are also low `10ms`, so that the unit tests run much faster (~5s now
compared to ~30s before).

## Related issue number

related: #47585

---------

Signed-off-by: Cindy Zhang <[email protected]>
  • Loading branch information
zcin authored Oct 8, 2024
1 parent 02d65eb commit f1cccba
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 28 deletions.
15 changes: 13 additions & 2 deletions python/ray/serve/_private/replica_scheduler/pow_2_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,6 @@ async def select_from_candidate_replicas(

def _get_pending_request_matching_metadata(
self,
replica: ReplicaWrapper,
request_metadata: Optional[RequestMetadata] = None,
) -> Optional[PendingRequest]:
if request_metadata is None or not request_metadata.multiplexed_model_id:
Expand Down Expand Up @@ -676,7 +675,7 @@ def fulfill_next_pending_request(
# First try to match a pending request based on the request metadata (currently
# this only looks at the multiplexed model ID).
matched_pending_request = self._get_pending_request_matching_metadata(
replica, request_metadata
request_metadata
)
if matched_pending_request is not None:
matched_pending_request.future.set_result(replica)
Expand Down Expand Up @@ -718,6 +717,18 @@ async def fulfill_pending_requests(self):
async for candidates in self.choose_two_replicas_with_backoff(
request_metadata
):
# Clear out pending requests at the front of the
# queue that have been cancelled, then reevaluate
# if we need to continue this scheduling task.
while (
len(self._pending_requests_to_fulfill) > 0
and self._pending_requests_to_fulfill[0].future.done()
):
self._pending_requests_to_fulfill.popleft()

if len(self._scheduling_tasks) > self.target_num_scheduling_tasks:
break

replica = await self.select_from_candidate_replicas(
candidates, backoff_index
)
Expand Down
97 changes: 71 additions & 26 deletions python/ray/serve/tests/unit/test_pow_2_replica_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def pow_2_scheduler(request) -> PowerOfTwoChoicesReplicaScheduler:
# In order to prevent issues like https://github.com/ray-project/ray/issues/40631,
# construct the scheduler on a different loop to mimic the deployment handle path.
async def construct_scheduler(loop: asyncio.AbstractEventLoop):
return PowerOfTwoChoicesReplicaScheduler(
scheduler = PowerOfTwoChoicesReplicaScheduler(
loop,
DeploymentID(name="TEST_DEPLOYMENT"),
handle_source=request.param.get(
Expand All @@ -151,6 +151,8 @@ async def construct_scheduler(loop: asyncio.AbstractEventLoop):
),
get_curr_time_s=TIMER.time,
)
scheduler.backoff_sequence_s = [0, 0.001, 0.001, 0.001, 0.001, 0.001, 0.001]
return scheduler

s = asyncio.new_event_loop().run_until_complete(
construct_scheduler(get_or_create_event_loop())
Expand Down Expand Up @@ -220,7 +222,7 @@ async def test_no_replicas_available_then_one_available(pow_2_scheduler):
loop = get_or_create_event_loop()

task = loop.create_task(s.choose_replica_for_request(fake_pending_request()))
done, _ = await asyncio.wait([task], timeout=0.1)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 0

r1 = FakeReplicaWrapper("r1")
Expand Down Expand Up @@ -250,14 +252,14 @@ async def test_replica_does_not_accept_then_accepts(pow_2_scheduler):
loop = get_or_create_event_loop()

task = loop.create_task(s.choose_replica_for_request(fake_pending_request()))
done, _ = await asyncio.wait([task], timeout=0.1)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 0

r1 = FakeReplicaWrapper("r1")
r1.set_queue_len_response(DEFAULT_MAX_ONGOING_REQUESTS + 1)
s.update_replicas([r1])

done, _ = await asyncio.wait([task], timeout=0.1)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 0

r1.set_queue_len_response(0)
Expand All @@ -284,14 +286,14 @@ async def test_no_replicas_accept_then_new_one_accepts(pow_2_scheduler):
loop = get_or_create_event_loop()

task = loop.create_task(s.choose_replica_for_request(fake_pending_request()))
done, _ = await asyncio.wait([task], timeout=0.1)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 0

r1 = FakeReplicaWrapper("r1")
r1.set_queue_len_response(DEFAULT_MAX_ONGOING_REQUESTS + 1)
s.update_replicas([r1])

done, _ = await asyncio.wait([task], timeout=0.1)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 0

r2 = FakeReplicaWrapper("r2")
Expand Down Expand Up @@ -325,11 +327,11 @@ async def test_one_replica_available_then_none_then_one(pow_2_scheduler):
s.update_replicas([r1])

task = loop.create_task(s.choose_replica_for_request(fake_pending_request()))
done, _ = await asyncio.wait([task], timeout=0.1)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 0

s.update_replicas([])
done, _ = await asyncio.wait([task], timeout=0.1)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 0

r1.set_queue_len_response(0)
Expand Down Expand Up @@ -490,7 +492,7 @@ async def test_tasks_scheduled_fifo(pow_2_scheduler):
loop.create_task(s.choose_replica_for_request(fake_pending_request()))
)

done, _ = await asyncio.wait(tasks, timeout=0.1)
done, _ = await asyncio.wait(tasks, timeout=0.01)
assert len(done) == 0

# Only a single request will be accepted at a time due to
Expand Down Expand Up @@ -539,7 +541,7 @@ async def test_retried_tasks_scheduled_fifo(pow_2_scheduler):
)
)

done, _ = await asyncio.wait(tasks, timeout=0.1)
done, _ = await asyncio.wait(tasks, timeout=0.01)
assert len(done) == 0

# Only a single request will be accepted at a time due to
Expand Down Expand Up @@ -587,7 +589,7 @@ async def test_cancellation(pow_2_scheduler):
task1 = loop.create_task(s.choose_replica_for_request(fake_pending_request()))
task2 = loop.create_task(s.choose_replica_for_request(fake_pending_request()))

done, _ = await asyncio.wait([task1, task2], timeout=0.1)
done, _ = await asyncio.wait([task1, task2], timeout=0.01)
assert len(done) == 0

task1.cancel()
Expand All @@ -603,6 +605,49 @@ async def test_cancellation(pow_2_scheduler):
assert s.num_pending_requests == 0


@pytest.mark.asyncio
@pytest.mark.parametrize(
"pow_2_scheduler",
[
{"prefer_local_node": True, "prefer_local_az": True},
{"prefer_local_node": True, "prefer_local_az": False},
{"prefer_local_node": False, "prefer_local_az": True},
{"prefer_local_node": False, "prefer_local_az": False},
],
indirect=True,
)
async def test_cancellation_when_replicas_maxed(pow_2_scheduler):
"""
If a pending assignment is cancelled, it shouldn't get fulfilled and the next
request in the queue should be.
"""
s = pow_2_scheduler
loop = get_or_create_event_loop()

task = loop.create_task(s.choose_replica_for_request(fake_pending_request()))

# There is only one replica that is maxed out on requests
r1 = FakeReplicaWrapper("r1")
r1.set_queue_len_response(DEFAULT_MAX_ONGOING_REQUESTS)
s.update_replicas([r1])
# So one scheduling task should have been started to try to schedule
# the request to a replica, but it should be blocked because the
# replica doesn't have capacity to accept new requests
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 0
assert s.curr_num_scheduling_tasks == 1

# Cancel while the scheduling task is repeatedly trying to find an
# available replica
task.cancel()

# Verify that the scheduling tasks exit and there are no assignments left.
await async_wait_for_condition(
lambda: s.curr_num_scheduling_tasks == 0, retry_interval_ms=1
)
assert s.num_pending_requests == 0


@pytest.mark.asyncio
@pytest.mark.parametrize(
"pow_2_scheduler",
Expand All @@ -624,7 +669,7 @@ async def test_only_task_cancelled(pow_2_scheduler):

task = loop.create_task(s.choose_replica_for_request(fake_pending_request()))

done, _ = await asyncio.wait([task], timeout=0.1)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 0

task.cancel()
Expand Down Expand Up @@ -669,7 +714,7 @@ async def test_scheduling_task_cap(pow_2_scheduler):
loop.create_task(s.choose_replica_for_request(fake_pending_request()))
)

done, _ = await asyncio.wait(tasks, timeout=0.1)
done, _ = await asyncio.wait(tasks, timeout=0.01)
assert len(done) == 0

# There should be zero scheduling tasks while there are no replicas.
Expand All @@ -679,7 +724,7 @@ async def test_scheduling_task_cap(pow_2_scheduler):
r1.set_queue_len_response(DEFAULT_MAX_ONGOING_REQUESTS + 1)
s.update_replicas([r1])

done, _ = await asyncio.wait(tasks, timeout=0.1)
done, _ = await asyncio.wait(tasks, timeout=0.01)
assert len(done) == 0

# Now that there is at least one replica available, there should be nonzero
Expand Down Expand Up @@ -734,7 +779,7 @@ async def test_scheduling_task_cap_hard_limit(pow_2_scheduler):
loop.create_task(s.choose_replica_for_request(fake_pending_request()))
)

done, _ = await asyncio.wait(tasks, timeout=0.1)
done, _ = await asyncio.wait(tasks, timeout=0.01)
assert len(done) == 0

# There should be zero scheduling tasks while there are no replicas.
Expand All @@ -744,7 +789,7 @@ async def test_scheduling_task_cap_hard_limit(pow_2_scheduler):
r1.set_queue_len_response(DEFAULT_MAX_ONGOING_REQUESTS + 1)
s.update_replicas([r1])

done, _ = await asyncio.wait(tasks, timeout=0.1)
done, _ = await asyncio.wait(tasks, timeout=0.01)
assert len(done) == 0

# Now that there is at least one replica available, there should be nonzero
Expand Down Expand Up @@ -797,7 +842,7 @@ async def test_replica_responds_after_being_removed(pow_2_scheduler):
# Start the scheduling task, which will hang waiting for the queue length response.
task = loop.create_task(s.choose_replica_for_request(fake_pending_request()))

done, _ = await asyncio.wait([task], timeout=0.1)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 0
assert s.curr_num_scheduling_tasks == 1

Expand All @@ -808,7 +853,7 @@ async def test_replica_responds_after_being_removed(pow_2_scheduler):
r1.set_queue_len_response(0)

# The original replica should *not* be scheduled.
done, _ = await asyncio.wait([task], timeout=0.1)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 0
assert s.curr_num_scheduling_tasks == 1

Expand Down Expand Up @@ -1208,7 +1253,7 @@ async def test_multiple_queries_with_different_model_ids(self, pow_2_scheduler):
),
]

done, _ = await asyncio.wait(tasks, timeout=0.1)
done, _ = await asyncio.wait(tasks, timeout=0.01)
assert len(done) == len(tasks)

assert all(
Expand Down Expand Up @@ -1241,7 +1286,7 @@ async def test_no_replicas_available_then_choose_one_with_id(self, pow_2_schedul
]

# Scheduling tasks should be in backoff.
done, _ = await asyncio.wait(tasks, timeout=0.1)
done, _ = await asyncio.wait(tasks, timeout=0.01)
assert len(done) == 0

# Now add two more replicas, one of which has the model ID.
Expand Down Expand Up @@ -1279,7 +1324,7 @@ async def test_tasks_scheduled_fifo_among_model_ids(self, pow_2_scheduler):
)
)

done, _ = await asyncio.wait(m1_tasks + m2_tasks, timeout=0.1)
done, _ = await asyncio.wait(m1_tasks + m2_tasks, timeout=0.01)
assert len(done) == 0

r1 = FakeReplicaWrapper("r1", model_ids={"m1"}, reset_after_response=True)
Expand Down Expand Up @@ -1359,7 +1404,7 @@ async def test_queue_len_response_deadline_backoff(pow_2_scheduler):
# Attempt to schedule; the replica will be attempted and a timeout will occur
# due to the short timeout set above.
task = loop.create_task(s.choose_replica_for_request(fake_pending_request()))
done, _ = await asyncio.wait([task], timeout=0.2)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 0

# Verify first ping
Expand Down Expand Up @@ -1404,7 +1449,7 @@ async def test_max_queue_len_response_deadline(pow_2_scheduler):
# Attempt to schedule; the replica will be attempted and a timeout will occur
# due to the short timeout set above.
task = loop.create_task(s.choose_replica_for_request(fake_pending_request()))
done, _ = await asyncio.wait([task], timeout=0.2)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 0

assert all(
Expand Down Expand Up @@ -1555,15 +1600,15 @@ async def test_queue_len_cache_replica_at_capacity_is_probed(pow_2_scheduler):
s.replica_queue_len_cache.update(r1.replica_id, DEFAULT_MAX_ONGOING_REQUESTS)

task = loop.create_task(s.choose_replica_for_request(fake_pending_request()))
done, _ = await asyncio.wait([task], timeout=0.1)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 0
# 1 probe from scheduling requests
# + 1 probe from when the replica set was updated with replica r1
assert len(r1.queue_len_deadline_history) - 1 == 1

# Now let the replica respond and accept the request, it should be scheduled.
r1.set_queue_len_response(DEFAULT_MAX_ONGOING_REQUESTS - 1)
done, _ = await asyncio.wait([task], timeout=0.1)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 1
assert (await task) == r1

Expand Down Expand Up @@ -1591,7 +1636,7 @@ async def test_queue_len_cache_background_probing(pow_2_scheduler):
s.replica_queue_len_cache.update(r1.replica_id, 0)

task = loop.create_task(s.choose_replica_for_request(fake_pending_request()))
done, _ = await asyncio.wait([task], timeout=0.1)
done, _ = await asyncio.wait([task], timeout=0.01)
assert len(done) == 1
assert (await task) == r1
# 0 probes from scheduling requests
Expand Down

0 comments on commit f1cccba

Please sign in to comment.