Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HOTFIX: Fixing non-streaming timeout #548

Merged
merged 2 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions neurons/miners/epistula_miner/miner.py

Large diffs are not rendered by default.

101 changes: 101 additions & 0 deletions notebooks/demo.ipynb

Large diffs are not rendered by default.

17 changes: 14 additions & 3 deletions validator_api/chat_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,23 @@ async def collect_remaining_responses(
logger.exception(f"Error collecting remaining responses: {e}")


async def get_response_from_miner(body: dict[str, any], uid: int) -> tuple:
async def get_response_from_miner(body: dict[str, any], uid: int, timeout_seconds: int) -> tuple:
"""Get response from a single miner."""
return await make_openai_query(shared_settings.METAGRAPH, shared_settings.WALLET, body, uid, stream=False)
return await make_openai_query(
metagraph=shared_settings.METAGRAPH,
wallet=shared_settings.WALLET,
body=body,
uid=uid,
stream=False,
timeout_seconds=timeout_seconds,
)


async def chat_completion(
body: dict[str, any], uids: Optional[list[int]] = None, num_miners: int = 10
) -> tuple | StreamingResponse:
"""Handle chat completion with multiple miners in parallel."""
logger.debug(f"REQUEST_BODY: {body}")
# Get multiple UIDs if none specified
if uids is None:
uids = list(get_uids(sampling_mode="top_incentive", k=100))
Expand Down Expand Up @@ -229,7 +237,10 @@ async def chat_completion(
)
else:
# For non-streaming requests, wait for first valid response
response_tasks = [asyncio.create_task(get_response_from_miner(body, uid)) for uid in selected_uids]
response_tasks = [
asyncio.create_task(get_response_from_miner(body=body, uid=uid, timeout_seconds=timeout_seconds))
for uid in selected_uids
]

first_valid_response = None
collected_responses = []
Expand Down
10 changes: 7 additions & 3 deletions validator_api/mixture_of_miners.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import copy
import math
import random

from fastapi import HTTPException
Expand All @@ -26,10 +27,10 @@
TOP_INCENTIVE_POOL = 100


async def get_miner_response(body: dict, uid: str) -> tuple | None:
async def get_miner_response(body: dict, uid: str, timeout_seconds: int) -> tuple | None:
"""Get response from a single miner with error handling."""
try:
return await get_response_from_miner(body, uid)
return await get_response_from_miner(body, uid, timeout_seconds=timeout_seconds)
except Exception as e:
logger.error(f"Error getting response from miner {uid}: {e}")
return None
Expand Down Expand Up @@ -57,7 +58,10 @@ async def mixture_of_miners(body: dict[str, any]) -> tuple | StreamingResponse:
raise HTTPException(status_code=503, detail="No available miners found")

# Concurrently collect responses from all miners.
miner_tasks = [get_miner_response(body_first_step, uid) for uid in miner_uids]
timeout_seconds = max(
30, max(0, math.floor(math.log2(body["sampling_parameters"].get("max_new_tokens", 256) / 256))) * 10 + 30
)
miner_tasks = [get_miner_response(body_first_step, uid, timeout_seconds=timeout_seconds) for uid in miner_uids]
responses = await asyncio.gather(*miner_tasks)

# Filter out None responses (failed requests).
Expand Down
Loading