Skip to content

Commit

Permalink
Fix resource leak and improved wait_for_ready (#2675)
Browse files Browse the repository at this point in the history
  • Loading branch information
RunDevelopment authored Mar 12, 2024
1 parent f478f86 commit 5b4fe11
Showing 1 changed file with 48 additions and 21 deletions.
69 changes: 48 additions & 21 deletions backend/src/server_process_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,15 @@ def __init__(self):
self._port = _find_free_port()
self._base_url = f"http://127.0.0.1:{self._port}"
self._session = None
self._is_ready = False
self._is_checking_ready = False

async def start(self, flags: Iterable[str] = []):
logger.info("Starting worker process...")
logger.info(f"Starting worker process on port {self._port}...")
self._process = _WorkerProcess([str(self._port), *flags])
self._session = aiohttp.ClientSession(base_url=self._base_url)
self._is_ready = False
self._is_checking_ready = False
await self.wait_for_ready()
logger.info("Worker process started")

Expand All @@ -107,32 +111,55 @@ async def stop(self):
logger.info("Worker process stopped")

async def restart(self, flags: Iterable[str] = []):
logger.info("Restarting worker...")
await self.stop()
await self.start(flags)

async def wait_for_ready(self, timeout: float = 300):
start = time.time()
while time.time() - start < timeout:
if (
self._process is not None
and self._session is not None
and _port_in_use(self._port)
):
try:
await self._session.get("/nodes", timeout=5)
return
except Exception:
pass
if self._is_ready:
return

async def test_connection(session: aiohttp.ClientSession):
async with session.get("/nodes", timeout=5) as resp:
resp.raise_for_status()

start = time.time()
while self._is_checking_ready and time.time() - start < timeout:
await asyncio.sleep(0.1)

raise TimeoutError("Server did not start in time")
if self._is_ready:
return

try:
self._is_checking_ready = True

while time.time() - start < timeout:
if (
self._process is not None
and self._session is not None
and _port_in_use(self._port)
):
try:
if not self._is_ready:
await test_connection(self._session)
self._is_ready = True
return
except asyncio.TimeoutError:
logger.warn("Server not ready yet due to timeout")
except Exception as e:
logger.warn("Server not ready yet", exc_info=e)

await asyncio.sleep(0.1)

raise TimeoutError("Server did not start in time")
finally:
self._is_checking_ready = False

async def proxy_request(self, request: Request, timeout: int | None = 300):
await self.wait_for_ready()
assert self._session is not None
if request.route is None:
raise ValueError("Route not found")
await self.wait_for_ready()
assert self._session is not None
async with self._session.request(
request.method,
f"/{request.route.path}",
Expand Down Expand Up @@ -167,9 +194,9 @@ async def get_packages(self):
await self.wait_for_ready()
assert self._session is not None
logger.debug("Fetching packages...")
packages_resp = await self._session.get(
async with self._session.get(
"/packages", params={"hideInternal": "false"}
)
packages_json = await packages_resp.json()
packages = [Package.from_dict(p) for p in packages_json]
return packages
) as packages_resp:
packages_json = await packages_resp.json()
packages = [Package.from_dict(p) for p in packages_json]
return packages

0 comments on commit 5b4fe11

Please sign in to comment.