Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Improve Doc Clarity" #601

Merged
merged 1 commit into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 16 additions & 112 deletions validator_api/gpt_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import uuid

import numpy as np
from fastapi import APIRouter, Depends, Header, HTTPException
from fastapi import APIRouter, Depends, Header, HTTPException, Request
from loguru import logger
from openai.types.chat.chat_completion_chunk import ChatCompletionChunk, Choice, ChoiceDelta
from starlette.responses import StreamingResponse
Expand All @@ -21,63 +21,19 @@
from validator_api.test_time_inference import generate_response
from validator_api.utils import filter_available_uids

from .serializers import ChatCompletionRequest, ErrorResponse, SearchResult, WebSearchResponse

router = APIRouter()
N_MINERS = 5


def validate_api_key(api_key: str = Header(...)):
"""Validates API key for authentication."""
if api_key not in _keys:
raise HTTPException(status_code=403, detail="Invalid API key")
return _keys[api_key]


@router.post(
"/v1/chat/completions",
response_model=WebSearchResponse,
responses={
200: {
"description": "Successfully generated chat completion",
"model": WebSearchResponse,
"content": {"application/json": {"example": {"results": [{"content": "This is a sample response..."}]}}},
},
403: {
"description": "Invalid API key provided",
"model": ErrorResponse,
"content": {"application/json": {"example": {"detail": "Invalid API key"}}},
},
500: {
"description": "Server error occurred",
"model": ErrorResponse,
"content": {"application/json": {"example": {"detail": "No available miners"}}},
},
},
summary="Generate chat completions",
description="""
Generates chat completions using various strategies:

- Standard chat completion
- Mixture-of-miners strategy
- Test-time inference

The endpoint automatically selects the appropriate strategy based on request parameters.
Results are streamed back to the client as they are generated.
""",
)
async def completions(request: ChatCompletionRequest, api_key: str = Depends(validate_api_key)):
"""
Executes a chat completion request.

- **request**: JSON request body following `ChatCompletionRequest` model.
- **api_key**: Authentication header for API access.

Determines whether to use:
- Standard chat completion (`chat_completion`).
- Mixture-of-miners strategy (`mixture_of_miners`).
- Test-time inference (`test_time_inference`).
"""
@router.post("/v1/chat/completions")
async def completions(request: Request, api_key: str = Depends(validate_api_key)):
"""Main endpoint that handles both regular and mixture of miners chat completion."""
try:
body = await request.json()
body["seed"] = int(body.get("seed") or random.randint(0, 1000000))
Expand All @@ -99,35 +55,8 @@ async def completions(request: ChatCompletionRequest, api_key: str = Depends(val
return StreamingResponse(content="Internal Server Error", status_code=500)


@router.post(
"/web_retrieval",
response_model=WebSearchResponse,
responses={
200: {"description": "Successfully retrieved search results", "model": WebSearchResponse},
403: {"description": "Invalid API key provided", "model": ErrorResponse},
422: {"description": "Invalid request parameters", "model": ErrorResponse},
500: {"description": "Server error occurred", "model": ErrorResponse},
},
summary="Search the web using distributed miners",
description="""
Executes web searches using a distributed network of miners:

1. Queries multiple miners in parallel
2. Aggregates and deduplicates results
3. Parses and validates all responses
4. Returns a unified set of search results

The search is performed using DuckDuckGo through the miner network.
""",
)
@router.post("/web_retrieval")
async def web_retrieval(search_query: str, n_miners: int = 10, n_results: int = 5, max_response_time: int = 10):
"""
Handles web retrieval through distributed miners.

- **request**: JSON request body following `WebSearchQuery` model.

If no miners are available, an HTTPException is raised.
"""
uids = filter_available_uids(task="WebRetrievalTask", test=shared_settings.API_TEST_MODE, n_miners=n_miners)
if not uids:
raise HTTPException(status_code=500, detail="No available miners")
Expand All @@ -150,61 +79,36 @@ async def web_retrieval(search_query: str, n_miners: int = 10, n_results: int =

timeout_seconds = 30
stream_results = await query_miners(uids, body, timeout_seconds)

results = [
"".join(res.accumulated_chunks)
for res in stream_results
if isinstance(res, SynapseStreamResult) and res.accumulated_chunks
]

distinct_results = list(np.unique(results))
search_results = []
loaded_results = []
for result in distinct_results:
try:
parsed_result = json.loads(result)
search_results.append(SearchResult(**parsed_result))
logger.info(f"🔍 Parsed Result: {parsed_result}")
loaded_results.append(json.loads(result))
logger.info(f"🔍 Result: {result}")
except Exception:
logger.error(f"🔍 Failed to parse result: {result}")

if len(search_results) == 0:
logger.error(f"🔍 Result: {result}")
if len(loaded_results) == 0:
raise HTTPException(status_code=500, detail="No miner responded successfully")

asyncio.create_task(scoring_queue.scoring_queue.append_response(uids=uids, body=body, chunks=[]))
return WebSearchResponse(results=search_results)


@router.post(
"/test_time_inference",
responses={
200: {"description": "Successfully generated inference response", "content": {"text/event-stream": {}}},
500: {"description": "Server error occurred", "model": ErrorResponse},
},
summary="Generate responses using test-time inference",
description="""
Generates responses using test-time inference strategy:

- Streams response steps as they are generated
- Includes thinking time metrics
- Returns results in a streaming event format
""",
)
async def test_time_inference(messages: list[dict], model: str | None = None):
"""
Handles test-time inference requests.

- **messages**: List of messages used for inference.
- **model**: Optional model to use for generating responses.
collected_chunks_list = [res.accumulated_chunks if res and res.accumulated_chunks else [] for res in stream_results]
asyncio.create_task(scoring_queue.scoring_queue.append_response(uids=uids, body=body, chunks=collected_chunks_list))
return loaded_results

Returns a streaming response of the generated chat output.
"""

@router.post("/test_time_inference")
async def test_time_inference(messages: list[dict], model: str = None):
async def create_response_stream(messages):
async for steps, total_thinking_time in generate_response(messages, model=model):
if total_thinking_time is not None:
logger.debug(f"**Total thinking time: {total_thinking_time:.2f} seconds**")
yield steps, total_thinking_time

# Create a streaming response that yields each step
async def stream_steps():
try:
i = 0
Expand Down
144 changes: 0 additions & 144 deletions validator_api/serializers.py

This file was deleted.

Loading