Skip to content

Commit

Permalink
DM-49009: Stagered monkey start
Browse files Browse the repository at this point in the history
Add two new flock config parameters to let us start monkeys in batches
with waits in between:
* `start_batch_size`: the number of monkeys to start in each batch
* `start_batch_wait`: the amount of time to wait in between starting
  batches
  • Loading branch information
fajpunk committed Feb 18, 2025
1 parent f7f97ed commit 87278e8
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 1 deletion.
3 changes: 3 additions & 0 deletions changelog.d/20250218_151607_danfuchs_HEAD.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### New features

- Add `start_batch_size` and `start_batch_wait` flock config parameters to allow starting monkeys in a more slow and controlled way.
27 changes: 27 additions & 0 deletions docs/user-guide/flocks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,33 @@ Important points to note here:
* ``restart: true`` tells mobu to shut down and respawn the pod if there is any failure.
The default is to attempt to keep using the same pod despite the failure.

Starting monkeys in batches
---------------------------

In situations, you may want to start monkeys in a flock in batches with time in between each batch, rather than all at once.
Load testing notebook business monkeys is a good example of this, where starting a large number of these monkeys might overwhelm JupyterHub.

You can use add `start_batch_size` and `start_batch_wait` parameters to a flock configuration to do this.

`start_batch_size` specifies how many monkeys should be started in each batch, and `start_batch_wait` specifies how long to wait in between starting each batch.

.. code-block:: yaml
autostart:
- name: "python"
count: 10
start_batch_size: 2
start_batch_wait: "60s"
users:
- username: "bot-mobu-user"
scopes: ["exec:notebook"]
business:
type: "JupyterPythonLoop"
restart: true
options:
max_executions: 1
code: "print(1+1)"
Testing with notebooks
----------------------

Expand Down
23 changes: 23 additions & 0 deletions src/mobu/models/flock.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Self

from pydantic import BaseModel, Field, model_validator
from safir.pydantic import HumanTimedelta

from .business.business_config_type import BusinessConfigType
from .monkey import MonkeyData
Expand All @@ -21,6 +22,24 @@ class FlockConfig(BaseModel):

count: int = Field(..., title="How many monkeys to run", examples=[100])

start_batch_size: int | None = Field(
None,
title="Start batch size",
description=(
"The number of monkeys to start in each batch. If not provided,"
" all monkeys will be started at the same time."
),
)

start_batch_wait: HumanTimedelta | None = Field(
None,
title="Start batch wait",
description=(
"The amount of time to wait before starting each batch of monkeys."
" Must be provided if start_batch_size is provided."
),
)

users: list[User] | None = Field(
None,
title="Explicit list of users to run as",
Expand Down Expand Up @@ -56,6 +75,10 @@ def _validate(self) -> Self:
raise ValueError("both users and user_spec provided")
if self.count and self.users and len(self.users) != self.count:
raise ValueError(f"users list must contain {self.count} elements")
if self.start_batch_size and not self.start_batch_wait:
raise ValueError(
"start_batch_wait must be given if start_batch_size is given"
)
return self


Expand Down
33 changes: 32 additions & 1 deletion src/mobu/services/flock.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import asyncio
import math
from datetime import UTC, datetime
from itertools import batched

from aiojobs import Scheduler
from httpx import AsyncClient
Expand Down Expand Up @@ -129,7 +130,37 @@ async def start(self) -> None:
for user in users:
monkey = self._create_monkey(user)
self._monkeys[user.username] = monkey
await monkey.start(self._scheduler)

# Start in staggered batches
if self._config.start_batch_size and self._config.start_batch_wait:
size = self._config.start_batch_size
wait_secs = self._config.start_batch_wait.total_seconds()
batches = list(batched(self._monkeys.values(), size))
num = len(batches)
for i in range(num):
batch = batches[i]
logger = self._logger.bind(
current_batch=i + 1,
num_batches=num,
monkeys_in_batch=len(batch),
)
logger.info("starting batch")
tasks = [monkey.start(self._scheduler) for monkey in batch]
await asyncio.gather(*tasks)

# Don't wait after starting the last batch
if i < num - 1:
logger.info("pausing for batch", wait_secs=wait_secs)
await asyncio.sleep(wait_secs)

# Start all at the same time
else:
tasks = [
monkey.start(self._scheduler)
for monkey in self._monkeys.values()
]
await asyncio.gather(*tasks)

self._start_time = datetime.now(tz=UTC)

async def stop(self) -> None:
Expand Down
123 changes: 123 additions & 0 deletions tests/autostart_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,126 @@ async def test_autostart(client: AsyncClient, jupyter: MockJupyter) -> None:
assert r.status_code == 204
r = await client.delete("/mobu/flocks/basic")
assert r.status_code == 204


@pytest.mark.asyncio
async def test_batched_autostart(client: AsyncClient) -> None:
r = await client.get("/mobu/flocks/basic")
assert r.status_code == 200
expected_monkeys = [
{
"name": f"bot-mobu-testuser{i:02d}",
"business": {
"failure_count": 0,
"name": "EmptyLoop",
"refreshing": False,
"success_count": ANY,
},
"state": ANY,
"user": {
"scopes": ["exec:notebook"],
"token": ANY,
"uidnumber": 1000 + i - 1,
"gidnumber": 2000 + i - 1,
"username": f"bot-mobu-testuser{i:02d}",
},
}
for i in range(1, 11)
]
assert r.json() == {
"name": "basic",
"config": {
"name": "basic",
"count": 10,
"user_spec": {
"username_prefix": "bot-mobu-testuser",
"uid_start": 1000,
"gid_start": 2000,
},
"scopes": ["exec:notebook"],
"business": {"type": "EmptyLoop"},
},
"monkeys": expected_monkeys,
}

await wait_for_flock_start(client, "python")
r = await client.get("/mobu/flocks/python")
assert r.status_code == 200
assert r.json() == {
"name": "python",
"config": {
"name": "python",
"count": 2,
"users": [
{
"username": "bot-mobu-python",
"uidnumber": 60000,
},
{
"username": "bot-mobu-otherpython",
"uidnumber": 70000,
},
],
"scopes": ["exec:notebook"],
"business": {
"type": "NubladoPythonLoop",
"restart": True,
"options": {
"image": {
"image_class": "latest-weekly",
"size": "Large",
},
"spawn_settle_time": 0,
},
},
},
"monkeys": [
{
"name": "bot-mobu-python",
"business": {
"failure_count": 0,
"image": {
"description": ANY,
"reference": ANY,
},
"name": "NubladoPythonLoop",
"refreshing": False,
"success_count": ANY,
},
"state": "RUNNING",
"user": {
"scopes": ["exec:notebook"],
"token": ANY,
"username": "bot-mobu-python",
"uidnumber": 60000,
"gidnumber": 60000,
},
},
{
"name": "bot-mobu-otherpython",
"business": {
"failure_count": 0,
"image": {
"description": ANY,
"reference": ANY,
},
"name": "NubladoPythonLoop",
"refreshing": False,
"success_count": ANY,
},
"state": "RUNNING",
"user": {
"scopes": ["exec:notebook"],
"token": ANY,
"username": "bot-mobu-otherpython",
"uidnumber": 70000,
"gidnumber": 70000,
},
},
],
}

r = await client.delete("/mobu/flocks/python")
assert r.status_code == 204
r = await client.delete("/mobu/flocks/basic")
assert r.status_code == 204
42 changes: 42 additions & 0 deletions tests/services/flock_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""Tests for flock functionality."""

from time import perf_counter

import pytest
import respx
from httpx import AsyncClient

from ..support.gafaelfawr import mock_gafaelfawr


@pytest.mark.asyncio
async def test_batched_start(
client: AsyncClient,
respx_mock: respx.Router,
) -> None:
mock_gafaelfawr(respx_mock)

# Set up our mocked business. This will wait for all batches to have
# attempted to start.
start = perf_counter()
r = await client.put(
"/mobu/flocks",
json={
"name": "test",
"count": 10,
"start_batch_size": 3,
"start_batch_wait": "1s",
"user_spec": {"username_prefix": "bot-mobu-testuser"},
"scopes": ["exec:notebook"],
"business": {
"type": "EmptyLoop",
},
},
)
end = perf_counter()

assert r.status_code == 201

# Make sure it took at least as much time as the total of the waits
elapsed = end - start
assert elapsed > 3

0 comments on commit 87278e8

Please sign in to comment.