Skip to content

Commit

Permalink
DM-48941: Only check out notebook repos once
Browse files Browse the repository at this point in the history
Add a reference-counting process-global notebook repo cache.
Use this in the NotebookRunner business to only clone notebook repos once per process, not once per monkey.
  • Loading branch information
fajpunk committed Feb 18, 2025
1 parent 52fcd67 commit 6729a14
Show file tree
Hide file tree
Showing 17 changed files with 412 additions and 48 deletions.
3 changes: 3 additions & 0 deletions changelog.d/20250217_173118_danfuchs_DM_48941.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### Bug fixes

- Notebook repos are only cloned once per process (and once per refresh request), instead of once per monkey. This should speed up how fast NotebookRunner flocks start, especially in load testing usecases.
5 changes: 5 additions & 0 deletions src/mobu/dependencies/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from ..events import Events
from ..factory import Factory, ProcessContext
from ..services.manager import FlockManager
from ..services.repo import RepoManager

__all__ = [
"ContextDependency",
Expand All @@ -41,6 +42,9 @@ class RequestContext:
manager: FlockManager
"""Global singleton flock manager."""

repo_manager: RepoManager
"""Global singleton git repo manager."""

factory: Factory
"""Component factory."""

Expand Down Expand Up @@ -80,6 +84,7 @@ async def __call__(
request=request,
logger=logger,
manager=self._process_context.manager,
repo_manager=self._process_context.repo_manager,
factory=Factory(self._process_context, logger),
)

Expand Down
1 change: 1 addition & 0 deletions src/mobu/dependencies/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def initialize(
scopes=scopes,
http_client=base_context.process_context.http_client,
events=base_context.process_context.events,
repo_manager=base_context.process_context.repo_manager,
gafaelfawr_storage=base_context.process_context.gafaelfawr,
logger=base_context.process_context.logger,
)
Expand Down
13 changes: 12 additions & 1 deletion src/mobu/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from .events import Events
from .models.solitary import SolitaryConfig
from .services.manager import FlockManager
from .services.repo import RepoManager
from .services.solitary import Solitary
from .storage.gafaelfawr import GafaelfawrStorage

Expand Down Expand Up @@ -38,17 +39,25 @@ class ProcessContext:
Manager for all running flocks.
events
Object with attributes for all metrics event publishers.
repo_manager
For efficiently cloning git repos.
"""

def __init__(self, http_client: AsyncClient, events: Events) -> None:
def __init__(
self,
http_client: AsyncClient,
events: Events,
) -> None:
self.http_client = http_client
self.logger = structlog.get_logger("mobu")
self.gafaelfawr = GafaelfawrStorage(self.http_client, self.logger)
self.events = events
self.repo_manager = RepoManager(self.logger)
self.manager = FlockManager(
gafaelfawr_storage=self.gafaelfawr,
http_client=self.http_client,
logger=self.logger,
repo_manager=self.repo_manager,
events=self.events,
)

Expand All @@ -58,6 +67,7 @@ async def aclose(self) -> None:
Called before shutdown to free resources.
"""
await self.manager.aclose()
self.repo_manager.close()


class Factory:
Expand Down Expand Up @@ -114,6 +124,7 @@ def create_solitary(self, solitary_config: SolitaryConfig) -> Solitary:
),
http_client=self._context.http_client,
events=self._context.events,
repo_manager=self._context.repo_manager,
logger=self._logger,
)

Expand Down
5 changes: 1 addition & 4 deletions src/mobu/handlers/github_refresh_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,7 @@ async def post_webhook(
context.logger.debug("Received GitHub webhook", payload=event.data)
# Give GitHub some time to reach internal consistency.
await asyncio.sleep(GITHUB_WEBHOOK_WAIT_SECONDS)
await gidgethub_router.dispatch(
event=event,
context=context,
)
await gidgethub_router.dispatch(event=event, context=context)


@gidgethub_router.register("push")
Expand Down
13 changes: 13 additions & 0 deletions src/mobu/models/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@

from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from tempfile import TemporaryDirectory

from pydantic import BaseModel, ConfigDict, Field

__all__ = ["ClonedRepoInfo", "RepoConfig"]


class RepoConfig(BaseModel):
"""In-repo configuration for mobu behavior.
Expand All @@ -25,3 +29,12 @@ class RepoConfig(BaseModel):
),
examples=["some-dir", "some-dir/some-other-dir"],
)


@dataclass(frozen=True)
class ClonedRepoInfo:
"""Information about a cloned git repo."""

dir: TemporaryDirectory
path: Path
hash: str
54 changes: 26 additions & 28 deletions src/mobu/services/business/notebookrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,10 @@
import contextlib
import json
import random
import shutil
from collections.abc import AsyncGenerator, Iterator
from contextlib import asynccontextmanager
from datetime import timedelta
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any, override

import sentry_sdk
Expand Down Expand Up @@ -46,7 +44,7 @@
from ...models.user import AuthenticatedUser
from ...sentry import capturing_start_span, start_transaction
from ...services.business.base import CommonEventAttrs
from ...storage.git import Git
from ...services.repo import RepoManager
from .nublado import NubladoBusiness

__all__ = ["NotebookRunner"]
Expand Down Expand Up @@ -83,6 +81,7 @@ def __init__(
*,
options: NotebookRunnerOptions | ListNotebookRunnerOptions,
user: AuthenticatedUser,
repo_manager: RepoManager,
events: Events,
logger: BoundLogger,
flock: str | None,
Expand All @@ -97,13 +96,13 @@ def __init__(
self._config = config_dependency.config
self._notebook: Path | None = None
self._notebook_paths: list[Path] | None = None
self._repo_dir: Path | None = None
self._repo_path: Path | None = None
self._repo_hash: str | None = None
self._exclude_paths: set[Path] = set()
self._running_code: str | None = None
self._git = Git(logger=logger)
self._max_executions: int | None = None
self._notebooks_to_run: list[Path] | None = None
self._repo_manager = repo_manager

match options:
case NotebookRunnerOptions(max_executions=max_executions):
Expand All @@ -117,22 +116,30 @@ async def startup(self) -> None:
await super().startup()

async def cleanup(self) -> None:
shutil.rmtree(str(self._repo_dir))
self._repo_dir = None
if self._repo_hash is not None:
await self._repo_manager.invalidate(
url=self.options.repo_url,
ref=self.options.repo_ref,
repo_hash=self._repo_hash,
)
self._repo_path = None
self._repo_hash = None
self._notebook_filter_results = None

async def initialize(self) -> None:
"""Prepare to run the business.
* Check out the repository
* Get notebook repo files from the repo manager
* Parse the in-repo config
* Filter the notebooks
"""
if self._repo_dir is None:
self._repo_dir = Path(TemporaryDirectory(delete=False).name)
await self.clone_repo()
info = await self._repo_manager.clone(
url=self.options.repo_url, ref=self.options.repo_ref
)
self._repo_path = info.path
self._repo_hash = info.hash

repo_config_path = self._repo_dir / GITHUB_REPO_CONFIG_PATH
repo_config_path = self._repo_path / GITHUB_REPO_CONFIG_PATH
set_context(
"repo_info",
{
Expand All @@ -155,7 +162,7 @@ async def initialize(self) -> None:
repo_config = RepoConfig()

exclude_dirs = repo_config.exclude_dirs
self._exclude_paths = {self._repo_dir / path for path in exclude_dirs}
self._exclude_paths = {self._repo_path / path for path in exclude_dirs}
self._notebooks = self.find_notebooks()
set_context(
"notebook_filter_info", self._notebooks.model_dump(mode="json")
Expand All @@ -168,20 +175,11 @@ async def shutdown(self) -> None:
await super().shutdown()

async def refresh(self) -> None:
self.logger.info("Recloning notebooks and forcing new execution")
self.logger.info("Getting new notebooks and forcing new execution")
await self.cleanup()
await self.initialize()
self.refreshing = False

async def clone_repo(self) -> None:
url = self.options.repo_url
ref = self.options.repo_ref
with capturing_start_span(op="clone_repo"):
self._git.repo = self._repo_dir
await self._git.clone(url, str(self._repo_dir))
await self._git.checkout(ref)
self._repo_hash = await self._git.repo_hash()

def is_excluded(self, notebook: Path) -> bool:
# A notebook is excluded if any of its parent directories are excluded
return bool(set(notebook.parents) & self._exclude_paths)
Expand All @@ -207,12 +205,12 @@ def missing_services(self, notebook: Path) -> bool:

def find_notebooks(self) -> NotebookFilterResults:
with capturing_start_span(op="find_notebooks"):
if self._repo_dir is None:
if self._repo_path is None:
raise NotebookRepositoryError(
"Repository directory must be set", self.user.username
)

all_notebooks = set(self._repo_dir.glob("**/*.ipynb"))
all_notebooks = set(self._repo_path.glob("**/*.ipynb"))
if not all_notebooks:
msg = "No notebooks found in {self._repo_dir}"
raise NotebookRepositoryError(msg, self.user.username)
Expand All @@ -227,14 +225,14 @@ def find_notebooks(self) -> NotebookFilterResults:

if self._notebooks_to_run:
requested = {
self._repo_dir / notebook
self._repo_path / notebook
for notebook in self._notebooks_to_run
}
not_found = requested - filter_results.all
if not_found:
msg = (
"Requested notebooks do not exist in"
f" {self._repo_dir}: {not_found}"
f" {self._repo_path}: {not_found}"
)
raise NotebookRepositoryError(msg, self.user.username)
filter_results.excluded_by_requested = (
Expand Down Expand Up @@ -348,7 +346,7 @@ async def execute_notebook(
) -> None:
self._notebook = self.next_notebook()
relative_notebook = str(
self._notebook.relative_to(self._repo_dir or "/")
self._notebook.relative_to(self._repo_path or "/")
)
iteration = f"{count + 1}/{num_executions}"
msg = f"Notebook {self._notebook.name} iteration {iteration}"
Expand Down
6 changes: 6 additions & 0 deletions src/mobu/services/flock.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
)
from ..models.flock import FlockConfig, FlockData, FlockSummary
from ..models.user import AuthenticatedUser, User, UserSpec
from ..services.repo import RepoManager
from ..storage.gafaelfawr import GafaelfawrStorage
from .monkey import Monkey

Expand All @@ -39,6 +40,8 @@ class Flock:
Shared HTTP client.
events
Event publishers.
repo_manager
For efficiently cloning git repos.
logger
Global logger.
"""
Expand All @@ -51,6 +54,7 @@ def __init__(
gafaelfawr_storage: GafaelfawrStorage,
http_client: AsyncClient,
events: Events,
repo_manager: RepoManager,
logger: BoundLogger,
) -> None:
self.name = flock_config.name
Expand All @@ -59,6 +63,7 @@ def __init__(
self._gafaelfawr = gafaelfawr_storage
self._http_client = http_client
self._events = events
self._repo_manager = repo_manager
self._logger = logger.bind(flock=self.name)
self._monkeys: dict[str, Monkey] = {}
self._start_time: datetime | None = None
Expand Down Expand Up @@ -166,6 +171,7 @@ def _create_monkey(self, user: AuthenticatedUser) -> Monkey:
user=user,
http_client=self._http_client,
events=self._events,
repo_manager=self._repo_manager,
logger=self._logger,
)

Expand Down
4 changes: 4 additions & 0 deletions src/mobu/services/github_ci/ci_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from ...events import Events
from ...models.ci_manager import CiManagerSummary, CiWorkerSummary
from ...models.user import User
from ...services.repo import RepoManager
from ...storage.gafaelfawr import GafaelfawrStorage
from ...storage.github import GitHubStorage
from .ci_notebook_job import CiNotebookJob
Expand Down Expand Up @@ -79,6 +80,7 @@ def __init__(
users: list[User],
http_client: AsyncClient,
events: Events,
repo_manager: RepoManager,
gafaelfawr_storage: GafaelfawrStorage,
logger: BoundLogger,
) -> None:
Expand All @@ -88,6 +90,7 @@ def __init__(
self._gafaelfawr = gafaelfawr_storage
self._http_client = http_client
self._events = events
self._repo_manager = repo_manager
self._logger = logger.bind(ci_manager=True)
self._scheduler: Scheduler = Scheduler()
self._queue: Queue[QueueItem] = Queue()
Expand Down Expand Up @@ -257,6 +260,7 @@ async def enqueue(
check_run=check_run,
http_client=self._http_client,
events=self._events,
repo_manager=self._repo_manager,
logger=self._logger,
gafaelfawr_storage=self._gafaelfawr,
)
Expand Down
4 changes: 4 additions & 0 deletions src/mobu/services/github_ci/ci_notebook_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ...models.ci_manager import CiJobSummary
from ...models.solitary import SolitaryConfig
from ...models.user import User
from ...services.repo import RepoManager
from ...services.solitary import Solitary
from ...storage.gafaelfawr import GafaelfawrStorage
from ...storage.github import CheckRun, GitHubStorage
Expand Down Expand Up @@ -44,13 +45,15 @@ def __init__(
check_run: CheckRun,
http_client: AsyncClient,
events: Events,
repo_manager: RepoManager,
gafaelfawr_storage: GafaelfawrStorage,
logger: BoundLogger,
) -> None:
self._github = github_storage
self.check_run = check_run
self._http_client = http_client
self._events = events
self._repo_manager = repo_manager
self._gafaelfawr = gafaelfawr_storage
self._logger = logger.bind(ci_job_type="NotebookJob")
self._notebooks: list[Path] = []
Expand Down Expand Up @@ -101,6 +104,7 @@ async def run(self, user: User, scopes: list[str]) -> None:
gafaelfawr_storage=self._gafaelfawr,
http_client=self._http_client,
events=self._events,
repo_manager=self._repo_manager,
logger=self._logger,
)

Expand Down
Loading

0 comments on commit 6729a14

Please sign in to comment.