Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SN1-402 clean logging #586

Merged
merged 11 commits into from
Feb 17, 2025
4 changes: 0 additions & 4 deletions neurons/miners/epistula_miner/web_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import numpy as np
import trafilatura
from loguru import logger
from openai import OpenAI

from prompting.base.duckduckgo_patch import PatchedDDGS
Expand Down Expand Up @@ -55,10 +54,8 @@ async def get_websites_with_similarity(
Returns:
List of dictionaries containing website URLs and their best matching chunks
"""
logger.debug("Getting results")
ddgs = PatchedDDGS(proxy=settings.shared_settings.PROXY_URL, verify=False)
results = list(ddgs.text(query))
logger.debug(f"Got {len(results)} results")
urls = [r["href"] for r in results][:n_results]

# Fetch and extract content
Expand All @@ -74,7 +71,6 @@ async def get_websites_with_similarity(
if not text: # Skip if extraction failed
continue

# logger.debug(f"TEXTS: {text}")
chunks = create_chunks(text)
chunk_embeddings = client.embeddings.create(model="text-embedding-ada-002", input=chunks).data

Expand Down
33 changes: 9 additions & 24 deletions neurons/validator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
import multiprocessing as mp
import sys
import time

import loguru
import netaddr
Expand Down Expand Up @@ -65,21 +64,13 @@ async def spawn_loops(task_queue, scoring_queue, reward_events):
logger.info("Starting WeightSetter...")
asyncio.create_task(weight_setter.start(reward_events))

# Main monitoring loop
start = time.time()
# while True:
# await asyncio.sleep(5)

logger.info("Starting Main Monitoring Loop...")
while True:
await asyncio.sleep(5)
current_time = time.time()
time_diff = current_time - start
start = current_time

# Check if all tasks are still running
logger.debug(f"Running {time_diff:.2f} seconds")
logger.debug(f"Number of tasks in Task Queue: {len(task_queue)}")
logger.debug(f"Number of tasks in Scoring Queue: {len(scoring_queue)}")
logger.debug(f"Number of tasks in Reward Events: {len(reward_events)}")
# # Check if all tasks are still running
# logger.debug(f"Number of tasks in Task Queue: {len(task_queue)}")
# logger.debug(f"Number of tasks in Scoring Queue: {len(scoring_queue)}")
# logger.debug(f"Number of tasks in Reward Events: {len(reward_events)}")

asyncio.run(spawn_loops(task_queue, scoring_queue, reward_events))

Expand Down Expand Up @@ -114,7 +105,6 @@ async def start():

while True:
await asyncio.sleep(10)
logger.debug("Running API...")

asyncio.run(start())

Expand All @@ -131,7 +121,6 @@ async def main():

try:
# # Start checking the availability of miners at regular intervals

if settings.shared_settings.DEPLOY_SCORING_API:
# Use multiprocessing to bypass API blocking issue
api_process = mp.Process(target=start_api, args=(scoring_queue, reward_events), name="API_Process")
Expand All @@ -141,13 +130,9 @@ async def main():
loop_process = mp.Process(
target=create_loop_process, args=(task_queue, scoring_queue, reward_events), name="LoopProcess"
)
# task_loop_process = mp.Process(
# target=create_task_loop, args=(task_queue, scoring_queue), name="TaskLoopProcess"
# )

loop_process.start()
# task_loop_process.start()
processes.append(loop_process)
# processes.append(task_loop_process)
GPUInfo.log_gpu_info()

step = 0
Expand All @@ -162,9 +147,9 @@ async def main():
current_block = settings.shared_settings.SUBTENSOR.get_current_block()
last_update_block = settings.shared_settings.METAGRAPH.last_update[settings.shared_settings.UID]
logger.warning(
f"UPDATES HAVE STALED FOR {current_block - last_update_block} BLOCKS AND {step} STEPS"
f"Metagraph hasn't been updated for {current_block - last_update_block} blocks. "
f"Staled block: {current_block}, Last update: {last_update_block}"
)
logger.warning(f"STALED: {current_block}, {settings.shared_settings.METAGRAPH.block}")
sys.exit(1)
step += 1

Expand Down
1 change: 0 additions & 1 deletion prompting/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

@app.get("/health")
def health():
logger.info("Health endpoint accessed.")
return {"status": "healthy"}


Expand Down
3 changes: 0 additions & 3 deletions prompting/api/miner_availabilities/api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import Literal

from fastapi import APIRouter
from loguru import logger

from prompting.miner_availability.miner_availability import miner_availabilities
from prompting.tasks.task_registry import TaskRegistry
Expand All @@ -13,7 +12,6 @@
async def get_miner_availabilities(uids: list[int] | None = None):
if uids:
return {uid: miner_availabilities.miners.get(uid) for uid in uids}
logger.info(f"Returning all miner availabilities for {len(miner_availabilities.miners)} miners")
return miner_availabilities.miners


Expand All @@ -23,7 +21,6 @@ async def get_available_miners(
model: str | None = None,
k: int = 10,
):
logger.info(f"Getting {k} available miners for task {task} and model {model}")
task_configs = [config for config in TaskRegistry.task_configs if config.task.__name__ == task]
task_config = task_configs[0] if task_configs else None
return miner_availabilities.get_available_miners(task=task_config, model=model, k=k)
7 changes: 1 addition & 6 deletions prompting/api/scoring/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ async def score_response(
logger.debug(f"Task name set: {task_name}")
logger.debug(f"Length pre-insertion: {len(task_scorer.scoring_queue)}")
if task_name == "InferenceTask":
logger.info(f"Received Organic InferenceTask with body: {body}")
logger.info(f"With model of type {type(body.get('model'))}")
organic_task = InferenceTask(
messages=body.get("messages"),
llm_model=llm_model,
Expand All @@ -93,8 +91,6 @@ async def score_response(
query=body.get("messages"),
organic=True,
)
logger.info(f"Task created: {organic_task}")

task_scorer.add_to_queue(
task=organic_task,
response=DendriteResponseEvent(
Expand All @@ -110,7 +106,6 @@ async def score_response(
)

elif task_name == "WebRetrievalTask":
logger.info(f"Received Organic WebRetrievalTask with body: {body}")
try:
search_term = body.get("messages")[0].get("content")
except Exception as ex:
Expand All @@ -134,5 +129,5 @@ async def score_response(
step=-1,
task_id=str(uuid.uuid4()),
)
logger.debug(f"Length post-insertion: {len(task_scorer.scoring_queue)}")
logger.debug(f"Current Queue: {len(task_scorer.scoring_queue)}")
logger.info("Organic task appended to scoring queue")
2 changes: 0 additions & 2 deletions prompting/datasets/huggingface_github.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from datasets import load_dataset
from loguru import logger
from pydantic import ConfigDict, model_validator

from shared.base import BaseDataset, DatasetEntry
Expand Down Expand Up @@ -61,7 +60,6 @@ def next(self) -> HuggingFaceGithubDatasetEntry:
entry = next(self.iterator)
return self._process_entry(entry) # Throws failed to get a valid file after multiple attempts
except StopIteration:
logger.warning("Reached end of dataset. Resetting iterator.")
self.reset()
raise Exception("Failed to get a valid file after multiple attempts")

Expand Down
13 changes: 0 additions & 13 deletions prompting/llms/apis/gpt_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ def test(self):
model="gpt-3.5-turbo",
)
assert response.choices[0].message.content.lower() == "hello"
logger.info("GPT test passed")
except Exception as ex:
logger.exception(f"Failed GPT test: {ex}")

Expand Down Expand Up @@ -86,10 +85,6 @@ def chat_complete(
break
else:
model = shared_settings.GPT_MODEL_CONFIG[model].get("upgrade")
logger.debug(f"INPUT TOKENS: {input_tokens}")
logger.warning(
f"Upgrading to model {model} because output tokens ({output_tokens}) < min tokens({min_tokens})"
)
if model is None:
raise ValueError(
f"Minimum tokens ({min_tokens}) exceed the available output tokens ({output_tokens})"
Expand All @@ -115,8 +110,6 @@ def chat_complete(
input_cost = (
response.usage.prompt_tokens * shared_settings.GPT_MODEL_CONFIG[model]["input_token_cost"]
) / 1000
logger.debug(f"MSG TOKENS: {messages.get_tokens(model)}")
logger.debug(f"USAGE: {response.usage}")
return response, output_cost + input_cost
except Exception as ex:
logger.exception(f"GPT call failed: {ex}")
Expand Down Expand Up @@ -166,10 +159,6 @@ async def chat_complete_async(
break
else:
model = shared_settings.GPT_MODEL_CONFIG[model].get("upgrade")
logger.debug(f"INPUT TOKENS: {input_tokens}")
logger.warning(
f"Upgrading to model {model} because output tokens ({output_tokens}) < min tokens({min_tokens})"
)
if model is None:
raise ValueError(
f"Minimum tokens ({min_tokens}) exceed the available output tokens ({output_tokens})"
Expand All @@ -195,8 +184,6 @@ async def chat_complete_async(
input_cost = (
response.usage.prompt_tokens * shared_settings.GPT_MODEL_CONFIG[model]["input_token_cost"]
) / 1000
logger.info(f"MSG TOKENS: {messages.get_tokens(model)}")
logger.info(f"USAGE: {response.usage}")
return response, output_cost + input_cost
except Exception as ex:
logger.exception(f"GPT call failed: {ex}")
Expand Down
1 change: 0 additions & 1 deletion prompting/llms/apis/llm_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,4 @@ def chat_complete(
logprobs=logprobs,
)
response = response.choices[0].message.content
logger.debug(f"Generated {len(response)} characters using {model}")
return response
4 changes: 0 additions & 4 deletions prompting/llms/model_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ def load_model(self, model_config: ModelConfig, force: bool = True):
if force:
logger.debug(f"Forcing model {model_config.llm_model_id} to load.")
for active_model in list(self.active_models.keys()):
logger.debug(f"Checking if model {active_model.llm_model_id} can be unloaded.")
if active_model in self.always_active_models:
logger.debug(f"Model {active_model.llm_model_id} is always active. Skipping.")
continue
Expand All @@ -63,9 +62,6 @@ def load_model(self, model_config: ModelConfig, force: bool = True):
)

try:
logger.debug(
f"Loading model... {model_config.llm_model_id} with GPU Utilization: {model_config.min_ram / GPUInfo.free_memory}"
)
GPUInfo.log_gpu_info()

model = ReproducibleHF(
Expand Down
1 change: 0 additions & 1 deletion prompting/miner_availability/miner_availability.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ async def run_step(self):
llm_model_availabilities={model: False for model in model_config},
)

logger.debug("Miner availabilities updated.")
self.current_index = end_index

if self.current_index >= len(self.uids):
Expand Down
2 changes: 0 additions & 2 deletions prompting/rewards/multi_choice.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import time

import numpy as np
from loguru import logger
from pydantic import Field, model_validator

from prompting.rewards.reward import BaseRewardModel, BatchRewardOutput
Expand Down Expand Up @@ -76,5 +75,4 @@ def reward(self, reference: str, response_event: DendriteResponseEvent, **kwargs
timings.append(time.perf_counter() - start_time)
rewards.append(reward)

logger.debug(f"Rewards: {rewards}")
return BatchRewardOutput(rewards=np.asarray(rewards), timings=np.asarray(timings))
6 changes: 0 additions & 6 deletions prompting/rewards/scoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ def add_to_queue(
task_id=task_id,
)
)
logger.debug(f"Added to queue: {task.__class__.__name__}. Queue size: {len(self.scoring_queue)}")

async def run_step(self) -> RewardLoggingEvent:
await asyncio.sleep(0.1)
Expand All @@ -63,7 +62,6 @@ async def run_step(self) -> RewardLoggingEvent:
or (scoring_config.task.llm_model is None)
]
if len(scorable) == 0:
logger.debug("Nothing to score. Skipping scoring step.")
# Run a model_scheduler step to load a new model as there are no more tasks to be scored
if len(self.scoring_queue) > 0:
await model_scheduler.run_step()
Expand All @@ -78,10 +76,6 @@ async def run_step(self) -> RewardLoggingEvent:

# and there we then calculate the reward
reward_pipeline = TaskRegistry.get_task_reward(scoring_config.task)
logger.debug(
f"{len(scoring_config.response.completions)} completions to score for task "
f"{scoring_config.task.__class__.__name__}"
)
reward_events = reward_pipeline.apply(
response_event=scoring_config.response,
challenge=scoring_config.task.query,
Expand Down
3 changes: 0 additions & 3 deletions prompting/tasks/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from typing import Any, ClassVar
from uuid import uuid4

from loguru import logger
from pydantic import BaseModel, ConfigDict, Field, model_validator

from prompting.llms.apis.gpt_wrapper import LLMMessage, LLMMessages
Expand Down Expand Up @@ -81,7 +80,6 @@ def make_reference(self, dataset_entry: DatasetEntry) -> str:

def generate_reference(self, messages: list[str]) -> str:
"""Generates a reference answer to be used for scoring miner completions"""
logger.info("🤖 Generating reference...")
self.reference = model_manager.get_model(settings.shared_settings.LLM_MODEL).generate(
messages=messages
) # This should be a list of dict
Expand All @@ -95,7 +93,6 @@ def generate_query(
messages: list[str],
) -> str:
"""Generates a query to be used for generating the challenge"""
logger.info("🤖 Generating query...")
llm_messages = [LLMMessage(role="system", content=self.query_system_prompt)] if self.query_system_prompt else []
llm_messages.extend([LLMMessage(role="user", content=message) for message in messages])

Expand Down
5 changes: 0 additions & 5 deletions prompting/tasks/multi_step_reasoning.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,6 @@ def make_query(self, dataset_entry: Context):
return self.query

def make_reference(self, dataset_entry: Context):
logger.info(f"Generating reference for Multi Step Reasoning task with query: {self.query}")
steps, total_thinking_time = execute_multi_step_reasoning(user_query=self.query)
logger.info(
f"**Steps: {steps}**, **Total thinking time for multi step reasoning: {total_thinking_time} seconds**"
)
logger.info(f"**Total thinking time for multi step reasoning: {total_thinking_time} seconds**")
self.reference = steps[-1][1]
return self.reference
2 changes: 0 additions & 2 deletions prompting/tasks/task_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ async def start(self, task_queue, scoring_queue):

async def run_step(self):
if len(self.task_queue) > shared_settings.TASK_QUEUE_LENGTH_THRESHOLD:
logger.debug("Task queue is full. Skipping task generation.")
return None
if len(self.scoring_queue) > shared_settings.SCORING_QUEUE_LENGTH_THRESHOLD:
logger.debug("Scoring queue is full. Skipping task generation.")
return None
await asyncio.sleep(0.1)
try:
Expand Down
Loading