Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Doc Clarity #587

Merged
merged 5 commits into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 112 additions & 16 deletions validator_api/gpt_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import uuid

import numpy as np
from fastapi import APIRouter, Depends, Header, HTTPException, Request
from fastapi import APIRouter, Depends, Header, HTTPException
from loguru import logger
from openai.types.chat.chat_completion_chunk import ChatCompletionChunk, Choice, ChoiceDelta
from starlette.responses import StreamingResponse
Expand All @@ -21,19 +21,63 @@
from validator_api.test_time_inference import generate_response
from validator_api.utils import filter_available_uids

from .serializers import ChatCompletionRequest, ErrorResponse, SearchResult, WebSearchResponse

router = APIRouter()
N_MINERS = 5


def validate_api_key(api_key: str = Header(...)):
"""Validates API key for authentication."""
if api_key not in _keys:
raise HTTPException(status_code=403, detail="Invalid API key")
return _keys[api_key]


@router.post("/v1/chat/completions")
async def completions(request: Request, api_key: str = Depends(validate_api_key)):
"""Main endpoint that handles both regular and mixture of miners chat completion."""
@router.post(
"/v1/chat/completions",
response_model=WebSearchResponse,
responses={
200: {
"description": "Successfully generated chat completion",
"model": WebSearchResponse,
"content": {"application/json": {"example": {"results": [{"content": "This is a sample response..."}]}}},
},
403: {
"description": "Invalid API key provided",
"model": ErrorResponse,
"content": {"application/json": {"example": {"detail": "Invalid API key"}}},
},
500: {
"description": "Server error occurred",
"model": ErrorResponse,
"content": {"application/json": {"example": {"detail": "No available miners"}}},
},
},
summary="Generate chat completions",
description="""
Generates chat completions using various strategies:

- Standard chat completion
- Mixture-of-miners strategy
- Test-time inference

The endpoint automatically selects the appropriate strategy based on request parameters.
Results are streamed back to the client as they are generated.
""",
)
async def completions(request: ChatCompletionRequest, api_key: str = Depends(validate_api_key)):
"""
Executes a chat completion request.

- **request**: JSON request body following `ChatCompletionRequest` model.
- **api_key**: Authentication header for API access.

Determines whether to use:
- Standard chat completion (`chat_completion`).
- Mixture-of-miners strategy (`mixture_of_miners`).
- Test-time inference (`test_time_inference`).
"""
try:
body = await request.json()
body["seed"] = int(body.get("seed") or random.randint(0, 1000000))
Expand All @@ -55,8 +99,35 @@ async def completions(request: Request, api_key: str = Depends(validate_api_key)
return StreamingResponse(content="Internal Server Error", status_code=500)


@router.post("/web_retrieval")
@router.post(
"/web_retrieval",
response_model=WebSearchResponse,
responses={
200: {"description": "Successfully retrieved search results", "model": WebSearchResponse},
403: {"description": "Invalid API key provided", "model": ErrorResponse},
422: {"description": "Invalid request parameters", "model": ErrorResponse},
500: {"description": "Server error occurred", "model": ErrorResponse},
},
summary="Search the web using distributed miners",
description="""
Executes web searches using a distributed network of miners:

1. Queries multiple miners in parallel
2. Aggregates and deduplicates results
3. Parses and validates all responses
4. Returns a unified set of search results

The search is performed using DuckDuckGo through the miner network.
""",
)
async def web_retrieval(search_query: str, n_miners: int = 10, n_results: int = 5, max_response_time: int = 10):
"""
Handles web retrieval through distributed miners.

- **request**: JSON request body following `WebSearchQuery` model.

If no miners are available, an HTTPException is raised.
"""
uids = filter_available_uids(task="WebRetrievalTask", test=shared_settings.API_TEST_MODE, n_miners=n_miners)
if not uids:
raise HTTPException(status_code=500, detail="No available miners")
Expand All @@ -79,36 +150,61 @@ async def web_retrieval(search_query: str, n_miners: int = 10, n_results: int =

timeout_seconds = 30
stream_results = await query_miners(uids, body, timeout_seconds)

results = [
"".join(res.accumulated_chunks)
for res in stream_results
if isinstance(res, SynapseStreamResult) and res.accumulated_chunks
]

distinct_results = list(np.unique(results))
loaded_results = []
search_results = []
for result in distinct_results:
try:
loaded_results.append(json.loads(result))
logger.info(f"🔍 Result: {result}")
parsed_result = json.loads(result)
search_results.append(SearchResult(**parsed_result))
logger.info(f"🔍 Parsed Result: {parsed_result}")
except Exception:
logger.error(f"🔍 Result: {result}")
if len(loaded_results) == 0:
logger.error(f"🔍 Failed to parse result: {result}")

if len(search_results) == 0:
raise HTTPException(status_code=500, detail="No miner responded successfully")

collected_chunks_list = [res.accumulated_chunks if res and res.accumulated_chunks else [] for res in stream_results]
asyncio.create_task(scoring_queue.scoring_queue.append_response(uids=uids, body=body, chunks=collected_chunks_list))
return loaded_results
asyncio.create_task(scoring_queue.scoring_queue.append_response(uids=uids, body=body, chunks=[]))
return WebSearchResponse(results=search_results)


@router.post(
"/test_time_inference",
responses={
200: {"description": "Successfully generated inference response", "content": {"text/event-stream": {}}},
500: {"description": "Server error occurred", "model": ErrorResponse},
},
summary="Generate responses using test-time inference",
description="""
Generates responses using test-time inference strategy:

- Streams response steps as they are generated
- Includes thinking time metrics
- Returns results in a streaming event format
""",
)
async def test_time_inference(messages: list[dict], model: str | None = None):
"""
Handles test-time inference requests.

- **messages**: List of messages used for inference.
- **model**: Optional model to use for generating responses.

Returns a streaming response of the generated chat output.
"""

@router.post("/test_time_inference")
async def test_time_inference(messages: list[dict], model: str = None):
async def create_response_stream(messages):
async for steps, total_thinking_time in generate_response(messages, model=model):
if total_thinking_time is not None:
logger.debug(f"**Total thinking time: {total_thinking_time:.2f} seconds**")
yield steps, total_thinking_time

# Create a streaming response that yields each step
async def stream_steps():
try:
i = 0
Expand Down
144 changes: 144 additions & 0 deletions validator_api/serializers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
from typing import List

from pydantic import BaseModel, Field


class WebSearchQuery(BaseModel):
"""Request model for web search queries."""

search_query: str = Field(
...,
description="The search query to execute using DuckDuckGo",
example="latest developments in quantum computing",
)
n_miners: int = Field(
default=10, description="Number of miners to query for results (between 1-100)", example=10, ge=1, le=100
)
uids: List[int] | None = Field(
default=None,
description="List of specific miner UIDs to query. If not provided, miners will be automatically selected.",
example=[1, 2, 3, 4],
)
n_results: int = Field(
default=5, description="Number of results each miner should return (between 1-30)", example=5, ge=1, le=30
)

class Config:
schema_extra = {
"example": {
"search_query": "latest developments in quantum computing",
"n_miners": 10,
"uids": [1, 2, 3, 4],
"n_results": 5,
}
}


class SearchResult(BaseModel):
"""Model representing a single search result."""

url: str = Field(
..., description="URL of the search result", example="https://www.nature.com/articles/d41586-023-02192-x"
)
title: str = Field(
...,
description="Title of the webpage or document",
example="Quantum computing breakthrough: New superconducting qubits",
)
snippet: str = Field(
...,
description="Brief excerpt or summary of the content",
example="Researchers have developed a new type of superconducting qubit that increases stability by 50%.",
)
timestamp: str = Field(..., description="Timestamp when the result was retrieved", example="2024-01-01 12:00:00")


class WebSearchResponse(BaseModel):
"""Response model containing search results from distributed miners."""

results: List[SearchResult] = Field(..., description="List of deduplicated and parsed search results")

class Config:
schema_extra = {
"example": {
"results": [
{
"url": "https://www.nature.com/articles/d41586-023-02192-x",
"title": "Quantum computing breakthrough: New superconducting qubits",
"snippet": "Researchers have developed a new type of superconducting qubit that increases stability by 50%.",
"timestamp": "2024-01-01 12:00:00",
},
{
"url": "https://arxiv.org/abs/2307.05230",
"title": "Arxiv paper: Enhancing Quantum Error Correction",
"snippet": "A novel quantum error correction technique reduces noise in superconducting circuits.",
"timestamp": "2024-01-02 09:30:00",
},
]
}
}


class ErrorResponse(BaseModel):
"""Model for API error responses."""

detail: str = Field(
...,
description="Detailed error message explaining what went wrong",
example="No available miners found to process the request",
)


class ChatMessage(BaseModel):
"""Model representing a single chat message."""

role: str = Field(
..., description="Role of the message sender", example="user", enum=["user", "assistant", "system"]
)
content: str = Field(..., description="The message content", example="What is the meaning of life?")


class ChatCompletionRequest(BaseModel):
"""Request model for chat completion."""

messages: List[dict] = Field(
...,
description="List of chat messages containing user input and system responses",
example=[
{"role": "user", "content": "What is the meaning of life?"},
{"role": "assistant", "content": "Philosophers have debated this question for centuries."},
],
)
model: str = Field(
default="gpt-4",
description="Model to use for generating responses",
example="gpt-4",
enum=["gpt-4", "gpt-3.5-turbo", "custom-model"],
)
seed: int | None = Field(
default=None,
description="Random seed for response generation. If not provided, a random seed will be generated.",
example=42,
)
test_time_inference: bool = Field(
default=False, description="When true, enables test-time inference mode", example=False
)
mixture: bool = Field(default=False, description="When true, enables mixture-of-miners strategy", example=False)
uids: List[int] | None = Field(
default=None,
title="Miner UIDs",
description="List of specific miner UIDs to query. If not provided, miners will be automatically selected.",
example=[1, 5, 7],
)

class Config:
schema_extra = {
"example": {
"messages": [{"role": "user", "content": "What is quantum computing?"}],
"model": "gpt-4",
"seed": 42,
"test_time_inference": False,
"mixture": False,
"uids": [1, 5, 7],
}
}
Loading