generated from opentensor/bittensor-subnet-template
-
Notifications
You must be signed in to change notification settings - Fork 53
/
Copy pathminer.py
210 lines (179 loc) · 7.57 KB
/
miner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# ruff: noqa: E402
from shared import settings
settings.shared_settings = settings.SharedSettings.load(mode="miner")
shared_settings = settings.shared_settings
import asyncio
import json
import time
import httpx
import netaddr
import requests
import uvicorn
from bittensor.core.axon import FastAPIThreadedServer
from bittensor.core.extrinsics.serving import serve_extrinsic
from fastapi import APIRouter, Depends, FastAPI, HTTPException, Request
from loguru import logger
from starlette.background import BackgroundTask
from starlette.responses import StreamingResponse
from prompting.llms.hf_llm import ReproducibleHF
from shared.epistula import verify_signature
MODEL_ID: str = "gpt-3.5-turbo"
NEURON_MAX_TOKENS: int = 256
NEURON_TEMPERATURE: float = 0.7
NEURON_TOP_K: int = 50
NEURON_TOP_P: float = 0.95
NEURON_STREAMING_BATCH_SIZE: int = 12
NEURON_STOP_ON_FORWARD_EXCEPTION: bool = False
SHOULD_SERVE_LLM: bool = False
LOCAL_MODEL_ID = "casperhansen/llama-3-8b-instruct-awq"
class OpenAIMiner:
def __init__(self):
self.should_exit = False
self.client = httpx.AsyncClient(
base_url="https://api.openai.com/v1",
headers={
"Authorization": f"Bearer {shared_settings.OPENAI_API_KEY}",
"Content-Type": "application/json",
},
)
if SHOULD_SERVE_LLM:
self.llm = ReproducibleHF(model_id=LOCAL_MODEL_ID)
else:
self.llm = None
async def format_openai_query(self, request: Request):
data = await request.json()
# Extract the required fields
openai_request = {}
for key in ["messages", "model", "stream"]:
if key in data:
openai_request[key] = data[key]
openai_request["model"] = MODEL_ID
return openai_request
async def create_chat_completion(self, request: Request):
if self.llm and request.headers.get("task", None) == "inference":
return await self.create_inference_completion(request)
req = self.client.build_request("POST", "chat/completions", json=await self.format_openai_query(request))
r = await self.client.send(req, stream=True)
return StreamingResponse(r.aiter_raw(), background=BackgroundTask(r.aclose), headers=r.headers)
async def create_inference_completion(self, request: Request):
async def word_stream():
inference = await self.run_inference(request)
words = inference.split()
for word in words:
# Simulate the OpenAI streaming response format
data = {"choices": [{"delta": {"content": word + " "}, "index": 0, "finish_reason": None}]}
yield f"data: {json.dumps(data)}\n\n"
await asyncio.sleep(0.1) # Simulate a delay between words
# Indicate the end of the stream
data = {"choices": [{"delta": {}, "index": 0, "finish_reason": "stop"}]}
yield f"data: {json.dumps(data)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(word_stream(), media_type="text/event-stream")
async def check_availability(self, request: Request):
print("Checking availability")
data = await request.json()
task_availabilities = data.get("task_availabilities", {})
llm_model_availabilities = data.get("llm_model_availabilities", {})
# Set all task availabilities to True
task_response = {key: True for key in task_availabilities}
# Set all model availabilities to False (openai will not be able to handle seeded inference)
model_response = {key: key == LOCAL_MODEL_ID for key in llm_model_availabilities}
response = {"task_availabilities": task_response, "llm_model_availabilities": model_response}
return response
async def verify_request(
self,
request: Request,
):
now = round(time.time() * 1000)
signed_by = request.headers.get("Epistula-Signed-By")
signed_for = request.headers.get("Epistula-Signed-For")
if signed_for != shared_settings.WALLET.hotkey.ss58_address:
raise HTTPException(status_code=400, detail="Bad Request, message is not intended for self")
if signed_by not in shared_settings.METAGRAPH.hotkeys:
raise HTTPException(status_code=401, detail="Signer not in metagraph")
uid = shared_settings.METAGRAPH.hotkeys.index(signed_by)
stake = shared_settings.METAGRAPH.S[uid].item()
if not shared_settings.NETUID == 61 and stake < 10000:
logger.warning(f"Blacklisting request from {signed_by} [uid={uid}], not enough stake -- {stake}")
raise HTTPException(status_code=401, detail="Stake below minimum: {stake}")
body = await request.body()
err = verify_signature(
request.headers.get("Epistula-Request-Signature"),
body,
request.headers.get("Epistula-Timestamp"),
request.headers.get("Epistula-Uuid"),
signed_for,
signed_by,
now,
)
if err:
logger.error(err)
raise HTTPException(status_code=400, detail=err)
def run(self):
external_ip = None # shared_settings.EXTERNAL_IP
if not external_ip or external_ip == "[::]":
try:
external_ip = requests.get("https://checkip.amazonaws.com").text.strip()
netaddr.IPAddress(external_ip)
except Exception:
logger.error("Failed to get external IP")
logger.info(
f"Serving miner endpoint {external_ip}:{shared_settings.AXON_PORT} on network: {shared_settings.SUBTENSOR_NETWORK} with netuid: {shared_settings.NETUID}"
)
serve_success = serve_extrinsic(
subtensor=shared_settings.SUBTENSOR,
wallet=shared_settings.WALLET,
ip=external_ip,
port=shared_settings.AXON_PORT,
protocol=4,
netuid=shared_settings.NETUID,
)
if not serve_success:
logger.error("Failed to serve endpoint")
return
app = FastAPI()
router = APIRouter()
router.add_api_route(
"/v1/chat/completions",
self.create_chat_completion,
dependencies=[Depends(self.verify_request)],
methods=["POST"],
)
router.add_api_route(
"/availability",
self.check_availability,
methods=["POST"],
)
app.include_router(router)
fast_config = uvicorn.Config(
app,
host="0.0.0.0",
port=shared_settings.AXON_PORT,
log_level="info",
loop="asyncio",
workers=4,
)
self.fast_api = FastAPIThreadedServer(config=fast_config)
self.fast_api.start()
logger.info(f"Miner starting at block: {shared_settings.SUBTENSOR.block}")
# Main execution loop.
try:
while not self.should_exit:
time.sleep(1)
except Exception as e:
logger.error(str(e))
self.shutdown()
async def run_inference(self, request: Request) -> str:
data = await request.json()
try:
response = self.llm.generate(
data.get("messages"), sampling_params=data.get("sampling_parameters"), seed=data.get("seed")
)
return response
except Exception as e:
logger.error(f"An error occurred during text generation: {e}")
return str(e)
if __name__ == "__main__":
miner = OpenAIMiner()
miner.run()
logger.warning("Ending miner...")