From 8d4d76757ae135fe6417f525154bc6ad54b251a5 Mon Sep 17 00:00:00 2001 From: richwardle Date: Thu, 13 Feb 2025 10:51:07 +0000 Subject: [PATCH 1/7] Remove Unnecessary Logging --- neurons/miners/epistula_miner/miner.py | 6 +- .../miners/epistula_miner/web_retrieval.py | 5 +- neurons/validator.py | 29 +--------- prompting/api/api.py | 1 - prompting/api/miner_availabilities/api.py | 5 +- prompting/api/scoring/api.py | 5 -- prompting/datasets/huggingface_github.py | 2 - prompting/llms/apis/gpt_wrapper.py | 25 ++++----- prompting/llms/apis/llm_wrapper.py | 2 +- prompting/llms/model_manager.py | 10 ++-- .../miner_availability/miner_availability.py | 2 +- prompting/rewards/multi_choice.py | 3 +- prompting/rewards/scoring.py | 21 ++++--- prompting/rewards/web_retrieval.py | 15 ++--- prompting/tasks/base_task.py | 5 +- prompting/tasks/multi_step_reasoning.py | 10 ++-- prompting/tasks/task_creation.py | 4 +- prompting/tasks/task_sending.py | 55 +++++++++---------- prompting/weight_setting/weight_setter.py | 38 ++++++------- shared/config.py | 5 +- shared/loop_runner.py | 31 ++++++----- shared/misc.py | 16 +++--- validator_api/chat_completion.py | 5 +- validator_api/gpt_endpoints.py | 11 ++-- validator_api/mixture_of_miners.py | 2 - validator_api/scoring_queue.py | 9 +-- validator_api/test_time_inference.py | 1 - 27 files changed, 135 insertions(+), 188 deletions(-) diff --git a/neurons/miners/epistula_miner/miner.py b/neurons/miners/epistula_miner/miner.py index e8f32fa7a..8eaf0750f 100644 --- a/neurons/miners/epistula_miner/miner.py +++ b/neurons/miners/epistula_miner/miner.py @@ -162,9 +162,9 @@ def run(self): except Exception: logger.error("Failed to get external IP") - logger.info( - f"Serving miner endpoint {external_ip}:{shared_settings.AXON_PORT} on network: {shared_settings.SUBTENSOR_NETWORK} with netuid: {shared_settings.NETUID}" - ) + # logger.info( + # f"Serving miner endpoint {external_ip}:{shared_settings.AXON_PORT} on network: {shared_settings.SUBTENSOR_NETWORK} with netuid: {shared_settings.NETUID}" + # ) serve_success = serve_extrinsic( subtensor=shared_settings.SUBTENSOR, diff --git a/neurons/miners/epistula_miner/web_retrieval.py b/neurons/miners/epistula_miner/web_retrieval.py index 6a695d317..654a0b93c 100644 --- a/neurons/miners/epistula_miner/web_retrieval.py +++ b/neurons/miners/epistula_miner/web_retrieval.py @@ -3,7 +3,6 @@ import numpy as np import trafilatura -from loguru import logger from openai import OpenAI from prompting.base.duckduckgo_patch import PatchedDDGS @@ -55,10 +54,9 @@ async def get_websites_with_similarity( Returns: List of dictionaries containing website URLs and their best matching chunks """ - logger.debug("Getting results") ddgs = PatchedDDGS(proxy=settings.shared_settings.PROXY_URL, verify=False) results = list(ddgs.text(query)) - logger.debug(f"Got {len(results)} results") + # logger.debug(f"Got {len(results)} results") urls = [r["href"] for r in results][:n_results] # Fetch and extract content @@ -74,7 +72,6 @@ async def get_websites_with_similarity( if not text: # Skip if extraction failed continue - # logger.debug(f"TEXTS: {text}") chunks = create_chunks(text) chunk_embeddings = client.embeddings.create(model="text-embedding-ada-002", input=chunks).data diff --git a/neurons/validator.py b/neurons/validator.py index dbd229004..afb1cd65c 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -1,7 +1,6 @@ import asyncio import multiprocessing as mp import sys -import time import loguru import torch @@ -61,22 +60,6 @@ async def spawn_loops(task_queue, scoring_queue, reward_events): logger.info("Starting WeightSetter...") asyncio.create_task(weight_setter.start(reward_events)) - # Main monitoring loop - start = time.time() - - logger.info("Starting Main Monitoring Loop...") - while True: - await asyncio.sleep(5) - current_time = time.time() - time_diff = current_time - start - start = current_time - - # Check if all tasks are still running - logger.debug(f"Running {time_diff:.2f} seconds") - logger.debug(f"Number of tasks in Task Queue: {len(task_queue)}") - logger.debug(f"Number of tasks in Scoring Queue: {len(scoring_queue)}") - logger.debug(f"Number of tasks in Reward Events: {len(reward_events)}") - asyncio.run(spawn_loops(task_queue, scoring_queue, reward_events)) @@ -95,7 +78,6 @@ async def start(): while True: await asyncio.sleep(10) - logger.debug("Running API...") asyncio.run(start()) @@ -112,7 +94,6 @@ async def main(): try: # # Start checking the availability of miners at regular intervals - if settings.shared_settings.DEPLOY_SCORING_API: # Use multiprocessing to bypass API blocking issue api_process = mp.Process(target=start_api, args=(scoring_queue, reward_events), name="API_Process") @@ -122,13 +103,9 @@ async def main(): loop_process = mp.Process( target=create_loop_process, args=(task_queue, scoring_queue, reward_events), name="LoopProcess" ) - # task_loop_process = mp.Process( - # target=create_task_loop, args=(task_queue, scoring_queue), name="TaskLoopProcess" - # ) + loop_process.start() - # task_loop_process.start() processes.append(loop_process) - # processes.append(task_loop_process) GPUInfo.log_gpu_info() step = 0 @@ -143,9 +120,9 @@ async def main(): current_block = settings.shared_settings.SUBTENSOR.get_current_block() last_update_block = settings.shared_settings.METAGRAPH.last_update[settings.shared_settings.UID] logger.warning( - f"UPDATES HAVE STALED FOR {current_block - last_update_block} BLOCKS AND {step} STEPS" + f"Metagraph hasn't been updated for {current_block - last_update_block} blocks. " + f"Staled block: {current_block}, Last update: {last_update_block}" ) - logger.warning(f"STALED: {current_block}, {settings.shared_settings.METAGRAPH.block}") sys.exit(1) step += 1 diff --git a/prompting/api/api.py b/prompting/api/api.py index 34b5dadc7..fab7f94ee 100644 --- a/prompting/api/api.py +++ b/prompting/api/api.py @@ -13,7 +13,6 @@ @app.get("/health") def health(): - logger.info("Health endpoint accessed.") return {"status": "healthy"} diff --git a/prompting/api/miner_availabilities/api.py b/prompting/api/miner_availabilities/api.py index 44bee346c..48cf13b98 100644 --- a/prompting/api/miner_availabilities/api.py +++ b/prompting/api/miner_availabilities/api.py @@ -1,7 +1,6 @@ from typing import Literal from fastapi import APIRouter -from loguru import logger from prompting.miner_availability.miner_availability import miner_availabilities from prompting.tasks.task_registry import TaskRegistry @@ -13,7 +12,7 @@ async def get_miner_availabilities(uids: list[int] | None = None): if uids: return {uid: miner_availabilities.miners.get(uid) for uid in uids} - logger.info(f"Returning all miner availabilities for {len(miner_availabilities.miners)} miners") + # logger.info(f"Returning all miner availabilities for {len(miner_availabilities.miners)} miners") return miner_availabilities.miners @@ -23,7 +22,7 @@ async def get_available_miners( model: str | None = None, k: int = 10, ): - logger.info(f"Getting {k} available miners for task {task} and model {model}") + # logger.info(f"Getting {k} available miners for task {task} and model {model}") task_configs = [config for config in TaskRegistry.task_configs if config.task.__name__ == task] task_config = task_configs[0] if task_configs else None return miner_availabilities.get_available_miners(task=task_config, model=model, k=k) diff --git a/prompting/api/scoring/api.py b/prompting/api/scoring/api.py index d52ba74bc..b30305824 100644 --- a/prompting/api/scoring/api.py +++ b/prompting/api/scoring/api.py @@ -52,8 +52,6 @@ async def score_response( llm_model = None task_name = body.get("task") if task_name == "InferenceTask": - logger.info(f"Received Organic InferenceTask with body: {body}") - logger.info(f"With model of type {type(body.get('model'))}") organic_task = InferenceTask( messages=body.get("messages"), llm_model=llm_model, @@ -62,7 +60,6 @@ async def score_response( sampling_params=body.get("sampling_parameters", settings.shared_settings.SAMPLING_PARAMS), query=body.get("messages"), ) - logger.info(f"Task created: {organic_task}") task_scorer.add_to_queue( task=organic_task, response=DendriteResponseEvent( @@ -76,7 +73,6 @@ async def score_response( task_id=str(uuid.uuid4()), ) elif task_name == "WebRetrievalTask": - logger.info(f"Received Organic WebRetrievalTask with body: {body}") try: search_term = body.get("messages")[0].get("content") except Exception as ex: @@ -102,4 +98,3 @@ async def score_response( step=-1, task_id=str(uuid.uuid4()), ) - logger.info("Organic task appended to scoring queue") diff --git a/prompting/datasets/huggingface_github.py b/prompting/datasets/huggingface_github.py index b65e3fc20..21b7a0cf9 100644 --- a/prompting/datasets/huggingface_github.py +++ b/prompting/datasets/huggingface_github.py @@ -1,5 +1,4 @@ from datasets import load_dataset -from loguru import logger from pydantic import ConfigDict, model_validator from shared.base import BaseDataset, DatasetEntry @@ -61,7 +60,6 @@ def next(self) -> HuggingFaceGithubDatasetEntry: entry = next(self.iterator) return self._process_entry(entry) # Throws failed to get a valid file after multiple attempts except StopIteration: - logger.warning("Reached end of dataset. Resetting iterator.") self.reset() raise Exception("Failed to get a valid file after multiple attempts") diff --git a/prompting/llms/apis/gpt_wrapper.py b/prompting/llms/apis/gpt_wrapper.py index 05a58fb65..51fd0d04c 100644 --- a/prompting/llms/apis/gpt_wrapper.py +++ b/prompting/llms/apis/gpt_wrapper.py @@ -35,7 +35,6 @@ def test(self): model="gpt-3.5-turbo", ) assert response.choices[0].message.content.lower() == "hello" - logger.info("GPT test passed") except Exception as ex: logger.exception(f"Failed GPT test: {ex}") @@ -86,10 +85,10 @@ def chat_complete( break else: model = shared_settings.GPT_MODEL_CONFIG[model].get("upgrade") - logger.debug(f"INPUT TOKENS: {input_tokens}") - logger.warning( - f"Upgrading to model {model} because output tokens ({output_tokens}) < min tokens({min_tokens})" - ) + # logger.debug(f"INPUT TOKENS: {input_tokens}") + # logger.warning( + # f"Upgrading to model {model} because output tokens ({output_tokens}) < min tokens({min_tokens})" + # ) if model is None: raise ValueError( f"Minimum tokens ({min_tokens}) exceed the available output tokens ({output_tokens})" @@ -115,8 +114,8 @@ def chat_complete( input_cost = ( response.usage.prompt_tokens * shared_settings.GPT_MODEL_CONFIG[model]["input_token_cost"] ) / 1000 - logger.debug(f"MSG TOKENS: {messages.get_tokens(model)}") - logger.debug(f"USAGE: {response.usage}") + # logger.debug(f"MSG TOKENS: {messages.get_tokens(model)}") + # logger.debug(f"USAGE: {response.usage}") return response, output_cost + input_cost except Exception as ex: logger.exception(f"GPT call failed: {ex}") @@ -166,10 +165,10 @@ async def chat_complete_async( break else: model = shared_settings.GPT_MODEL_CONFIG[model].get("upgrade") - logger.debug(f"INPUT TOKENS: {input_tokens}") - logger.warning( - f"Upgrading to model {model} because output tokens ({output_tokens}) < min tokens({min_tokens})" - ) + # logger.debug(f"INPUT TOKENS: {input_tokens}") + # logger.warning( + # f"Upgrading to model {model} because output tokens ({output_tokens}) < min tokens({min_tokens})" + # ) if model is None: raise ValueError( f"Minimum tokens ({min_tokens}) exceed the available output tokens ({output_tokens})" @@ -195,8 +194,8 @@ async def chat_complete_async( input_cost = ( response.usage.prompt_tokens * shared_settings.GPT_MODEL_CONFIG[model]["input_token_cost"] ) / 1000 - logger.info(f"MSG TOKENS: {messages.get_tokens(model)}") - logger.info(f"USAGE: {response.usage}") + # logger.info(f"MSG TOKENS: {messages.get_tokens(model)}") + # logger.info(f"USAGE: {response.usage}") return response, output_cost + input_cost except Exception as ex: logger.exception(f"GPT call failed: {ex}") diff --git a/prompting/llms/apis/llm_wrapper.py b/prompting/llms/apis/llm_wrapper.py index 128af345a..20dacc338 100644 --- a/prompting/llms/apis/llm_wrapper.py +++ b/prompting/llms/apis/llm_wrapper.py @@ -50,5 +50,5 @@ def chat_complete( logprobs=logprobs, ) response = response.choices[0].message.content - logger.debug(f"Generated {len(response)} characters using {model}") + # logger.debug(f"Generated {len(response)} characters using {model}") return response diff --git a/prompting/llms/model_manager.py b/prompting/llms/model_manager.py index 4c02d8779..59927b46b 100644 --- a/prompting/llms/model_manager.py +++ b/prompting/llms/model_manager.py @@ -37,9 +37,9 @@ def load_model(self, model_config: ModelConfig, force: bool = True): # if force loading is enabled, unload models until there is enough RAM if force: - logger.debug(f"Forcing model {model_config.llm_model_id} to load.") + # logger.debug(f"Forcing model {model_config.llm_model_id} to load.") for active_model in list(self.active_models.keys()): - logger.debug(f"Checking if model {active_model.llm_model_id} can be unloaded.") + # logger.debug(f"Checking if model {active_model.llm_model_id} can be unloaded.") if active_model in self.always_active_models: logger.debug(f"Model {active_model.llm_model_id} is always active. Skipping.") continue @@ -63,9 +63,9 @@ def load_model(self, model_config: ModelConfig, force: bool = True): ) try: - logger.debug( - f"Loading model... {model_config.llm_model_id} with GPU Utilization: {model_config.min_ram / GPUInfo.free_memory}" - ) + # logger.debug( + # f"Loading model... {model_config.llm_model_id} with GPU Utilization: {model_config.min_ram / GPUInfo.free_memory}" + # ) GPUInfo.log_gpu_info() model = ReproducibleHF( diff --git a/prompting/miner_availability/miner_availability.py b/prompting/miner_availability/miner_availability.py index 1b2534f26..ec4e29763 100644 --- a/prompting/miner_availability/miner_availability.py +++ b/prompting/miner_availability/miner_availability.py @@ -87,7 +87,7 @@ async def run_step(self): llm_model_availabilities={model: False for model in model_config}, ) - logger.debug("Miner availabilities updated.") + # logger.debug("Miner availabilities updated.") self.current_index = end_index if self.current_index >= len(self.uids): diff --git a/prompting/rewards/multi_choice.py b/prompting/rewards/multi_choice.py index 13050d70f..442f0bc0c 100644 --- a/prompting/rewards/multi_choice.py +++ b/prompting/rewards/multi_choice.py @@ -3,7 +3,6 @@ import time import numpy as np -from loguru import logger from pydantic import Field, model_validator from prompting.rewards.reward import BaseRewardModel, BatchRewardOutput @@ -76,5 +75,5 @@ def reward(self, reference: str, response_event: DendriteResponseEvent, **kwargs timings.append(time.perf_counter() - start_time) rewards.append(reward) - logger.debug(f"Rewards: {rewards}") + # logger.debug(f"Rewards: {rewards}") return BatchRewardOutput(rewards=np.asarray(rewards), timings=np.asarray(timings)) diff --git a/prompting/rewards/scoring.py b/prompting/rewards/scoring.py index 2ae481434..6f100aae8 100644 --- a/prompting/rewards/scoring.py +++ b/prompting/rewards/scoring.py @@ -1,7 +1,6 @@ import asyncio import threading -from loguru import logger from pydantic import ConfigDict from prompting.llms.model_manager import model_manager, model_scheduler @@ -51,7 +50,7 @@ def add_to_queue( task_id=task_id, ) ) - logger.debug(f"Added to queue: {task.__class__.__name__}. Queue size: {len(self.scoring_queue)}") + # logger.debug(f"Added to queue: {task.__class__.__name__}. Queue size: {len(self.scoring_queue)}") async def run_step(self) -> RewardLoggingEvent: await asyncio.sleep(0.1) @@ -63,7 +62,7 @@ async def run_step(self) -> RewardLoggingEvent: or (scoring_config.task.llm_model is None) ] if len(scorable) == 0: - logger.debug("Nothing to score. Skipping scoring step.") + # logger.debug("Nothing to score. Skipping scoring step.") # Run a model_scheduler step to load a new model as there are no more tasks to be scored if len(self.scoring_queue) > 0: await model_scheduler.run_step() @@ -78,10 +77,10 @@ async def run_step(self) -> RewardLoggingEvent: # and there we then calculate the reward reward_pipeline = TaskRegistry.get_task_reward(scoring_config.task) - logger.debug( - f"{len(scoring_config.response.completions)} completions to score for task " - f"{scoring_config.task.__class__.__name__}" - ) + # logger.debug( + # f"{len(scoring_config.response.completions)} completions to score for task " + # f"{scoring_config.task.__class__.__name__}" + # ) reward_events = reward_pipeline.apply( response_event=scoring_config.response, challenge=scoring_config.task.query, @@ -90,10 +89,10 @@ async def run_step(self) -> RewardLoggingEvent: task=scoring_config.task, ) self.reward_events.append(reward_events) - logger.debug( - f"Scored {scoring_config.task.__class__.__name__} {scoring_config.task.task_id} with model " - f"{scoring_config.task.llm_model_id}" - ) + # logger.debug( + # f"Scored {scoring_config.task.__class__.__name__} {scoring_config.task.task_id} with model " + # f"{scoring_config.task.llm_model_id}" + # ) log_event( RewardLoggingEvent( response_event=scoring_config.response, diff --git a/prompting/rewards/web_retrieval.py b/prompting/rewards/web_retrieval.py index 6fa5f7bd3..e617b4363 100644 --- a/prompting/rewards/web_retrieval.py +++ b/prompting/rewards/web_retrieval.py @@ -2,7 +2,6 @@ import json import numpy as np -from loguru import logger from pydantic import BaseModel from scipy import spatial from thefuzz import fuzz @@ -39,18 +38,18 @@ def score_website_result( # Content scraped from the URL provided in the completion. reference_website_content = DDGDataset.extract_website_content(response_url) if not reference_website_content or len(reference_website_content) == 0: - logger.debug(f"Failed to extract miner's content from website: {response_url}") + # logger.debug(f"Failed to extract miner's content from website: {response_url}") return 0 if fuzz.ratio(response_content, reference_website_content) < MIN_MATCH_THRESHOLD: - logger.info("Miner returned text that doesn't match the website, scoring 0") + # logger.info("Miner returned text that doesn't match the website, scoring 0") return 0 if len(response_relevant) > len(response_content) or len(response_relevant) < MIN_RELEVANT_CHARS: - logger.info( - f"Relevant section is too short (<{MIN_RELEVANT_CHARS} chars) or longer than the whole website content " - f"{len(response_relevant)} > {len(response_content)}" - ) + # logger.info( + # f"Relevant section is too short (<{MIN_RELEVANT_CHARS} chars) or longer than the whole website content " + # f"{len(response_relevant)} > {len(response_content)}" + # ) return 0 if response_relevant not in response_content: @@ -95,8 +94,6 @@ def reward( rewards.append(self.score_miner_response(dataset_entry, completion, task=task)) timings.append(0) - logger.debug(f"REWARDWEBRETRIEVAL: {rewards}") - logger.debug(f"COMPLETIONS: {response_event.completions}") return BatchRewardOutput(rewards=np.array(rewards), timings=np.array(timings)) @staticmethod diff --git a/prompting/tasks/base_task.py b/prompting/tasks/base_task.py index ed202673d..cf8870415 100644 --- a/prompting/tasks/base_task.py +++ b/prompting/tasks/base_task.py @@ -4,7 +4,6 @@ from typing import Any, ClassVar from uuid import uuid4 -from loguru import logger from pydantic import BaseModel, ConfigDict, Field, model_validator from prompting.llms.apis.gpt_wrapper import LLMMessage, LLMMessages @@ -79,7 +78,7 @@ def make_reference(self, dataset_entry: DatasetEntry) -> str: def generate_reference(self, messages: list[str]) -> str: """Generates a reference answer to be used for scoring miner completions""" - logger.info("🤖 Generating reference...") + # logger.info("🤖 Generating reference...") self.reference = model_manager.get_model(settings.shared_settings.LLM_MODEL).generate( messages=messages ) # This should be a list of dict @@ -93,7 +92,7 @@ def generate_query( messages: list[str], ) -> str: """Generates a query to be used for generating the challenge""" - logger.info("🤖 Generating query...") + # logger.info("🤖 Generating query...") llm_messages = [LLMMessage(role="system", content=self.query_system_prompt)] if self.query_system_prompt else [] llm_messages.extend([LLMMessage(role="user", content=message) for message in messages]) diff --git a/prompting/tasks/multi_step_reasoning.py b/prompting/tasks/multi_step_reasoning.py index 503654e49..0e27c7ba5 100644 --- a/prompting/tasks/multi_step_reasoning.py +++ b/prompting/tasks/multi_step_reasoning.py @@ -231,11 +231,11 @@ def make_query(self, dataset_entry: Context): return self.query def make_reference(self, dataset_entry: Context): - logger.info(f"Generating reference for Multi Step Reasoning task with query: {self.query}") + # logger.info(f"Generating reference for Multi Step Reasoning task with query: {self.query}") steps, total_thinking_time = execute_multi_step_reasoning(user_query=self.query) - logger.info( - f"**Steps: {steps}**, **Total thinking time for multi step reasoning: {total_thinking_time} seconds**" - ) - logger.info(f"**Total thinking time for multi step reasoning: {total_thinking_time} seconds**") + # logger.info( + # f"**Steps: {steps}**, **Total thinking time for multi step reasoning: {total_thinking_time} seconds**" + # ) + # logger.info(f"**Total thinking time for multi step reasoning: {total_thinking_time} seconds**") self.reference = steps[-1][1] return self.reference diff --git a/prompting/tasks/task_creation.py b/prompting/tasks/task_creation.py index 624e0894a..89180280a 100644 --- a/prompting/tasks/task_creation.py +++ b/prompting/tasks/task_creation.py @@ -31,10 +31,10 @@ async def start(self, task_queue, scoring_queue): async def run_step(self): if len(self.task_queue) > shared_settings.TASK_QUEUE_LENGTH_THRESHOLD: - logger.debug("Task queue is full. Skipping task generation.") + # logger.debug("Task queue is full. Skipping task generation.") return None if len(self.scoring_queue) > shared_settings.SCORING_QUEUE_LENGTH_THRESHOLD: - logger.debug("Scoring queue is full. Skipping task generation.") + # logger.debug("Scoring queue is full. Skipping task generation.") return None await asyncio.sleep(0.1) try: diff --git a/prompting/tasks/task_sending.py b/prompting/tasks/task_sending.py index ae88b06e7..33c05a5aa 100644 --- a/prompting/tasks/task_sending.py +++ b/prompting/tasks/task_sending.py @@ -1,7 +1,6 @@ # ruff: noqa: E402 import asyncio import time -from typing import List import bittensor as bt from loguru import logger @@ -14,7 +13,7 @@ from prompting.tasks.inference import InferenceTask from prompting.tasks.web_retrieval import WebRetrievalTask from shared import settings -from shared.dendrite import DendriteResponseEvent, SynapseStreamResult +from shared.dendrite import DendriteResponseEvent from shared.epistula import query_miners from shared.logging import ErrorLoggingEvent, ValidatorLoggingEvent from shared.loop_runner import AsyncLoopRunner @@ -25,27 +24,26 @@ NEURON_SAMPLE_SIZE = 100 -# TODO: do we actually need this logging? -def log_stream_results(stream_results: List[SynapseStreamResult]): - failed_responses = [ - response for response in stream_results if response.exception is not None or response.completion is None - ] - empty_responses = [ - response for response in stream_results if response.exception is None and response.completion == "" - ] - non_empty_responses = [ - response for response in stream_results if response.exception is None and response.completion != "" - ] +# def log_stream_results(stream_results: List[SynapseStreamResult]): +# failed_responses = [ +# response for response in stream_results if response.exception is not None or response.completion is None +# ] +# empty_responses = [ +# response for response in stream_results if response.exception is None and response.completion == "" +# ] +# non_empty_responses = [ +# response for response in stream_results if response.exception is None and response.completion != "" +# ] - logger.debug(f"Total of non_empty responses: ({len(non_empty_responses)})") - logger.debug(f"Total of empty responses: ({len(empty_responses)})") - logger.debug(f"Total of failed responses: ({len(failed_responses)})") +# logger.debug(f"Total of non_empty responses: ({len(non_empty_responses)})") +# logger.debug(f"Total of empty responses: ({len(empty_responses)})") +# logger.debug(f"Total of failed responses: ({len(failed_responses)})") async def collect_responses(task: BaseTextTask) -> DendriteResponseEvent | None: # Get the list of uids and their axons to query for this step. uids = miner_availabilities.get_available_miners(task=task, model=task.llm_model_id, k=NEURON_SAMPLE_SIZE) - logger.debug(f"🔍 Querying uids: {uids}") + # logger.debug(f"🔍 Querying uids: {uids}") if len(uids) == 0: logger.warning("No available miners. This should already have been caught earlier.") return @@ -61,11 +59,10 @@ async def collect_responses(task: BaseTextTask) -> DendriteResponseEvent | None: body["target_results"] = task.target_results body["timeout"] = task.timeout - logger.info(f"🔍 Sending task {task.task_id} with body: {body}") + # logger.info(f"🔍 Sending task {task.task_id} with body: {body}") + # logger.debug(f"🔍 Collected responses from {len(stream_results)} miners") stream_results = await query_miners(uids, body) - logger.debug(f"🔍 Collected responses from {len(stream_results)} miners") - - log_stream_results(stream_results) + # log_stream_results(stream_results) response_event = DendriteResponseEvent( stream_results=stream_results, @@ -78,7 +75,7 @@ async def collect_responses(task: BaseTextTask) -> DendriteResponseEvent | None: shared_settings.INFERENCE_TIMEOUT if isinstance(task, InferenceTask) else shared_settings.NEURON_TIMEOUT ), ) - logger.debug("🔍 Response event created") + # logger.debug("🔍 Response event created") return response_event @@ -143,10 +140,10 @@ async def run_step(self) -> ValidatorLoggingEvent | ErrorLoggingEvent | None: exclude (list, optional): The list of uids to exclude from the query. Defaults to []. """ while len(self.scoring_queue) > shared_settings.SCORING_QUEUE_LENGTH_THRESHOLD: - logger.debug("Scoring queue is full. Waiting 1 second...") + # logger.debug("Scoring queue is full. Waiting 1 second...") await asyncio.sleep(1) while len(self.task_queue) == 0: - logger.warning("No tasks in queue. Waiting 1 second...") + # logger.warning("No tasks in queue. Waiting 1 second...") await asyncio.sleep(1) try: # get task from the task queue @@ -157,12 +154,12 @@ async def run_step(self) -> ValidatorLoggingEvent | ErrorLoggingEvent | None: with Timer() as timer: response_event = await collect_responses(task=task) if response_event is None: - logger.warning("No response event collected. This should not be happening.") + # logger.warning("No response event collected. This should not be happening.") return - logger.debug("🔍 Estimating block") + # logger.debug("🔍 Estimating block") estimated_block = self.estimate_block - logger.debug("🔍 Creating scoring config") + # logger.debug("🔍 Creating scoring config") scoring_config = ScoringConfig( task=task, @@ -172,9 +169,9 @@ async def run_step(self) -> ValidatorLoggingEvent | ErrorLoggingEvent | None: step=self.step, task_id=task.task_id, ) - logger.debug(f"Collected responses in {timer.final_time:.2f} seconds") + # logger.debug(f"Collected responses in {timer.final_time:.2f} seconds") self.scoring_queue.append(scoring_config) - logger.debug(f"SCORING: Added to queue: {task.__class__.__name__}. Queue size: {len(self.scoring_queue)}") + # logger.debug(f"SCORING: Added to queue: {task.__class__.__name__}. Queue size: {len(self.scoring_queue)}") # Log the step event. return ValidatorLoggingEvent( diff --git a/prompting/weight_setting/weight_setter.py b/prompting/weight_setting/weight_setter.py index fb99f7db4..3ba5ac570 100644 --- a/prompting/weight_setting/weight_setter.py +++ b/prompting/weight_setting/weight_setter.py @@ -40,7 +40,7 @@ def apply_reward_func(raw_rewards: np.ndarray, p=0.5): def save_weights(weights: list[np.ndarray]): """Saves the list of numpy arrays to a file.""" - logger.info("Saving validator state.") + logger.debug("Saving validator state.") # Save all arrays into a single .npz file np.savez_compressed(FILENAME, *weights) @@ -86,17 +86,17 @@ def set_weights( ) = bt.utils.weight_utils.convert_weights_and_uids_for_emit( uids=processed_weight_uids, weights=processed_weights ) - logger.debug("uint_weights", uint_weights) - logger.debug("uint_uids", uint_uids) + # logger.debug("uint_weights", uint_weights) + # logger.debug("uint_uids", uint_uids) except Exception as ex: logger.exception(f"Issue with setting weights: {ex}") # Create a dataframe from weights and uids and save it as a csv file, with the current step as the filename. if shared_settings.LOG_WEIGHTS: try: - logger.debug( - f"Lengths... UIDS: {len(uint_uids)}, WEIGHTS: {len(processed_weights.flatten())}, RAW_WEIGHTS: {len(weights.flatten())}, UINT_WEIGHTS: {len(uint_weights)}" - ) + # logger.debug( + # f"Lengths... UIDS: {len(uint_uids)}, WEIGHTS: {len(processed_weights.flatten())}, RAW_WEIGHTS: {len(weights.flatten())}, UINT_WEIGHTS: {len(uint_weights)}" + # ) weights_df = pd.DataFrame( { "step": step, @@ -129,10 +129,10 @@ def set_weights( version_key=__spec_version__, ) - if result[0] is True: - logger.info("set_weights on chain successfully!") + if result[0]: + logger.info("Successfully set weights on chain") else: - logger.error(f"set_weights failed: {result}") + logger.error(f"Failed to set weights on chain: {result}") class WeightSetter(AsyncLoopRunner): @@ -155,7 +155,7 @@ async def start(self, reward_events, name: str | None = None): try: with np.load(FILENAME) as data: PAST_WEIGHTS = [data[key] for key in data.files] - logger.debug(f"Loaded persistent weights of length: {len(PAST_WEIGHTS)}") + # logger.debug(f"Loaded persistent weights of length: {len(PAST_WEIGHTS)}") except FileNotFoundError: logger.info("No weights file found - this is expected on a new validator, starting with empty weights") PAST_WEIGHTS = [] @@ -169,7 +169,7 @@ async def run_step(self): if len(self.reward_events) == 0: logger.warning("No reward events in queue, skipping weight setting...") return - logger.debug(f"Found {len(self.reward_events)} reward events in queue") + # logger.debug(f"Found {len(self.reward_events)} reward events in queue") # reward_events is a list of lists of WeightedRewardEvents - the 'sublists' each contain the multiple reward events for a single task self.reward_events: list[list[WeightedRewardEvent]] = self.reward_events # to get correct typehinting @@ -217,7 +217,7 @@ async def run_step(self): for task_config, rewards in miner_rewards.items(): r = np.array([x["reward"] / max(1, x["count"]) for x in list(rewards.values())]) - logger.debug(f"Rewards for task {task_config.task.__name__}: {r}") + # logger.debug(f"Rewards for task {task_config.task.__name__}: {r}") u = np.array(list(rewards.keys())) if task_config.task == InferenceTask: processed_rewards = r / max(1, (np.sum(r[r > 0]) + 1e-10)) @@ -233,13 +233,13 @@ async def run_step(self): except Exception as ex: logger.exception(f"{ex}") - mean_value = final_rewards.mean() - min_value = final_rewards.min() - max_value = final_rewards.max() - length = len(final_rewards) - logger.debug( - f"Reward stats. Mean: {mean_value:.2f}; Min: {min_value:.4f}; Max: {max_value:.4f}; Count: {length}" - ) + # mean_value = final_rewards.mean() + # min_value = final_rewards.min() + # max_value = final_rewards.max() + # length = len(final_rewards) + # logger.debug( + # f"Reward stats. Mean: {mean_value:.2f}; Min: {min_value:.4f}; Max: {max_value:.4f}; Count: {length}" + # ) # set weights on chain set_weights( final_rewards, step=self.step, subtensor=shared_settings.SUBTENSOR, metagraph=shared_settings.METAGRAPH diff --git a/shared/config.py b/shared/config.py index aea859bd9..eaad4d858 100644 --- a/shared/config.py +++ b/shared/config.py @@ -1,7 +1,6 @@ import argparse import bittensor as bt -from loguru import logger def add_args(parser): @@ -32,8 +31,8 @@ def config() -> bt.config: # args = parser.parse_args() add_args(parser=parser) args, unknown = parser.parse_known_args() - logger.info(f"RUNNING WITH ARGS: {' '.join(f'{k}={v}' for k, v in vars(args).items())}") - logger.info(f"UNKNOWN ARGS: {unknown}") + # logger.info(f"RUNNING WITH ARGS: {' '.join(f'{k}={v}' for k, v in vars(args).items())}") + # logger.info(f"UNKNOWN ARGS: {unknown}") bt.wallet.add_args(parser) bt.subtensor.add_args(parser) bt.axon.add_args(parser) diff --git a/shared/loop_runner.py b/shared/loop_runner.py index 43d380bd4..70daec897 100644 --- a/shared/loop_runner.py +++ b/shared/loop_runner.py @@ -33,15 +33,15 @@ async def get_time(self): """Get the current time from the time server with a timeout.""" if not self.sync: time = datetime.datetime.now(datetime.timezone.utc) - logger.debug(f"Time: {time}") + # logger.debug(f"Time: {time}") return time try: async with aiohttp.ClientSession() as session: - logger.info("Waiting for response time") + # logger.info("Waiting for response time") async with session.get(self.time_server_url, timeout=5) as response: if response.status == 200: data = await response.json() - logger.info("Got response") + # logger.info("Got response") return datetime.datetime.fromisoformat(data["datetime"].replace("Z", "+00:00")) else: raise Exception(f"Failed to get server time. Status: {response.status}") @@ -64,7 +64,7 @@ async def wait_for_next_execution(self, last_run_time): next_run = self.next_sync_point(current_time) else: next_run = last_run_time + timedelta(seconds=self.interval) - logger.debug(f"Next run: {next_run}") + # logger.debug(f"Next run: {next_run}") wait_time = (next_run - current_time).total_seconds() if wait_time > 0: @@ -76,21 +76,22 @@ async def wait_for_next_execution(self, last_run_time): async def run_loop(self): """Run the loop periodically, optionally synchronizing across all instances.""" - logger.debug(f"Starting loop {self.__class__.__name__}; running: {self.running}") + # logger.debug(f"Starting loop {self.__class__.__name__}; running: {self.running}") last_run_time = await self.get_time() - logger.debug(f"Got time of last run: {last_run_time}") + # logger.debug(f"Got time of last run: {last_run_time}") try: while self.running: with profiler.measure(self.name): - logger.debug("Waiting...") + # logger.debug("Waiting...") next_run = await self.wait_for_next_execution(last_run_time) - logger.debug("Wait ended") + # logger.debug("Wait ended") try: - run_results = await self.run_step() - logger.debug(f"Run_results: {run_results}") + await self.run_step() + # run_results = await self.run_step() + # logger.debug(f"Run_results: {run_results}") self.step += 1 - logger.debug(f"{self.name}: Step {self.step} completed at {next_run}") + # logger.debug(f"{self.name}: Step {self.step} completed at {next_run}") except Exception as ex: logger.exception(f"Error in loop iteration: {ex}") last_run_time = next_run @@ -100,8 +101,8 @@ async def run_loop(self): logger.error(f"Fatal error in loop: {e}") finally: self.running = False - logger.info("Loop has been cleaned up.") - logger.debug("Exiting run_loop") + # logger.info("Loop has been cleaned up.") + # logger.debug("Exiting run_loop") async def start(self, name: str | None = None): """Start the loop.""" @@ -109,7 +110,7 @@ async def start(self, name: str | None = None): logger.warning("Loop is already running.") return self.running = True - logger.debug(f"{self.name}: Starting loop with {'synchronized' if self.sync else 'non-synchronized'} mode") + # logger.debug(f"{self.name}: Starting loop with {'synchronized' if self.sync else 'non-synchronized'} mode") self._task = asyncio.create_task(self.run_loop(), name=name) async def stop(self): @@ -120,4 +121,4 @@ async def stop(self): try: await self._task except asyncio.CancelledError: - logger.info("Loop task was cancelled.") + logger.debug("Loop task was cancelled.") diff --git a/shared/misc.py b/shared/misc.py index 858765b7c..5c66494da 100644 --- a/shared/misc.py +++ b/shared/misc.py @@ -1,4 +1,3 @@ -import asyncio import time import traceback from functools import lru_cache, update_wrapper @@ -6,7 +5,6 @@ from typing import Any, Callable import bittensor as bt -from loguru import logger from shared.exceptions import BittensorError @@ -112,17 +110,17 @@ def ttl_get_block(subtensor: bt.Subtensor | None = None) -> int: def async_log(func): async def wrapper(*args, **kwargs): - start_time = time.time() - task_id = id(asyncio.current_task()) - func_name = func.__name__ - logger.debug(f"Starting {func_name} on asyncio task {task_id} at {start_time}") + # start_time = time.time() + # task_id = id(asyncio.current_task()) + # func_name = func.__name__ + # logger.debug(f"Starting {func_name} on asyncio task {task_id} at {start_time}") # Execute the wrapped function result = await func(*args, **kwargs) - end_time = time.time() - execution_time = end_time - start_time - logger.debug(f"Completed {func_name} on asyncio task {task_id} in {execution_time} seconds") + # end_time = time.time() + # execution_time = end_time - start_time + # logger.debug(f"Completed {func_name} on asyncio task {task_id} in {execution_time} seconds") return result diff --git a/validator_api/chat_completion.py b/validator_api/chat_completion.py index b44c2b769..8dfa4ec1e 100644 --- a/validator_api/chat_completion.py +++ b/validator_api/chat_completion.py @@ -163,7 +163,7 @@ async def collect_remaining_responses( """Collect remaining responses for scoring without blocking the main response.""" try: responses = await remaining - logger.debug(f"responses to forward: {responses}") + # logger.debug(f"responses to forward: {responses}") for i, response in enumerate(responses): if isinstance(response, Exception): logger.error(f"Error collecting response from uid {uids[i+1]}: {response}") @@ -197,7 +197,6 @@ async def chat_completion( body: dict[str, any], uids: Optional[list[int]] = None, num_miners: int = 10 ) -> tuple | StreamingResponse: """Handle chat completion with multiple miners in parallel.""" - logger.debug(f"REQUEST_BODY: {body}") # Get multiple UIDs if none specified if uids is None: uids = list(get_uids(sampling_mode="random", k=100)) @@ -208,7 +207,7 @@ async def chat_completion( else: selected_uids = uids[:num_miners] # If UID is specified, only use that one - logger.debug(f"Querying uids {selected_uids}") + # logger.debug(f"Querying uids {selected_uids}") STREAM = body.get("stream", False) # Initialize chunks collection for each miner diff --git a/validator_api/gpt_endpoints.py b/validator_api/gpt_endpoints.py index 3c185d044..1b61b7606 100644 --- a/validator_api/gpt_endpoints.py +++ b/validator_api/gpt_endpoints.py @@ -63,7 +63,7 @@ async def web_retrieval(search_query: str, n_miners: int = 10, n_results: int = if not uids: raise HTTPException(status_code=500, detail="No available miners") uids = random.sample(uids, min(len(uids), n_miners)) - logger.debug(f"🔍 Querying uids: {uids}") + # logger.debug(f"🔍 Querying uids: {uids}") if len(uids) == 0: logger.warning("No available miners. This should already have been caught earlier.") return @@ -87,9 +87,9 @@ async def web_retrieval(search_query: str, n_miners: int = 10, n_results: int = if isinstance(res, SynapseStreamResult) and res.accumulated_chunks ] distinct_results = list(np.unique(results)) - logger.info( - f"🔍 Collected responses from {len(stream_results)} miners. {len(results)} responded successfully with a total of {len(distinct_results)} distinct results" - ) + # logger.info( + # f"🔍 Collected responses from {len(stream_results)} miners. {len(results)} responded successfully with a total of {len(distinct_results)} distinct results" + # ) loaded_results = [] for result in distinct_results: try: @@ -110,7 +110,8 @@ async def test_time_inference(messages: list[dict], model: str = None): async def create_response_stream(messages): async for steps, total_thinking_time in generate_response(messages, model=model): if total_thinking_time is not None: - logger.info(f"**Total thinking time: {total_thinking_time:.2f} seconds**") + # logger.info(f"**Total thinking time: {total_thinking_time:.2f} seconds**") + pass yield steps, total_thinking_time # Create a streaming response that yields each step diff --git a/validator_api/mixture_of_miners.py b/validator_api/mixture_of_miners.py index 8d797f38f..b5742e185 100644 --- a/validator_api/mixture_of_miners.py +++ b/validator_api/mixture_of_miners.py @@ -61,8 +61,6 @@ async def mixture_of_miners(body: dict[str, any], uids: list[int]) -> tuple | St # Extract completions from the responses. completions = ["".join(response[1]) for response in valid_responses if response and len(response) > 1] - logger.debug(f"Using Mixture of Miners with {len(completions)} miners") - new_messages = body["messages"] + [ { "role": "assistant", diff --git a/validator_api/scoring_queue.py b/validator_api/scoring_queue.py index 493bc55a8..7e7d864c6 100644 --- a/validator_api/scoring_queue.py +++ b/validator_api/scoring_queue.py @@ -45,7 +45,7 @@ async def run_step(self): scoring_payload = self._scoring_queue.popleft() payload = scoring_payload.payload uids = payload["uid"] - logger.info(f"Received new organic for scoring, uids: {uids}") + # logger.info(f"Received new organic for scoring, uids: {uids}") url = f"http://{shared_settings.VALIDATOR_API}/scoring" try: @@ -59,7 +59,7 @@ async def run_step(self): if response.status_code != 200: # Raise an exception so that the retry logic in the except block handles it. raise Exception(f"Non-200 response: {response.status_code} for uids {uids}") - logger.info(f"Forwarding response completed with status {response.status_code}") + # logger.info(f"Forwarding response completed with status {response.status_code}") except Exception as e: if scoring_payload.retries < self.max_scoring_retries: scoring_payload.retries += 1 @@ -74,7 +74,7 @@ async def append_response(self, uids: list[int], body: dict[str, Any], chunks: l return if body.get("task") != "InferenceTask" and body.get("task") != "WebRetrievalTask": - logger.debug(f"Skipping forwarding for non-inference/web retrieval task: {body.get('task')}") + # logger.debug(f"Skipping forwarding for non-inference/web retrieval task: {body.get('task')}") return uids = [int(u) for u in uids] @@ -85,9 +85,6 @@ async def append_response(self, uids: list[int], body: dict[str, Any], chunks: l async with self._scoring_lock: self._scoring_queue.append(scoring_item) - logger.info(f"Appended responses from uids {uids} into scoring queue with size: {self.size}") - logger.debug(f"Queued responses body: {body}, chunks: {chunks}") - @property def size(self) -> int: return len(self._scoring_queue) diff --git a/validator_api/test_time_inference.py b/validator_api/test_time_inference.py index 71af4e1e3..044baacc2 100644 --- a/validator_api/test_time_inference.py +++ b/validator_api/test_time_inference.py @@ -60,7 +60,6 @@ def parse_multiple_json(api_response): async def make_api_call(messages, max_tokens, model=None, is_final_answer=False): ATTEMPTS_PER_STEP = 10 - logger.info(f"Making API call with messages: {messages}") async def single_attempt(): try: From 764f48f10b9e6bb6952670e80012629d227811de Mon Sep 17 00:00:00 2001 From: richwardle Date: Fri, 14 Feb 2025 16:51:19 +0000 Subject: [PATCH 2/7] Precommit Fixes --- validator_api/gpt_endpoints.py | 2 +- validator_api/test_time_inference.py | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/validator_api/gpt_endpoints.py b/validator_api/gpt_endpoints.py index b1089c476..68e68e046 100644 --- a/validator_api/gpt_endpoints.py +++ b/validator_api/gpt_endpoints.py @@ -61,7 +61,7 @@ async def web_retrieval(search_query: str, n_miners: int = 10, n_results: int = uids = filter_available_uids(task="WebRetrievalTask", test=shared_settings.API_TEST_MODE, n_miners=n_miners) if not uids: raise HTTPException(status_code=500, detail="No available miners") - + uids = random.sample(uids, min(len(uids), n_miners)) # logger.debug(f"🔍 Querying uids: {uids}") if len(uids) == 0: diff --git a/validator_api/test_time_inference.py b/validator_api/test_time_inference.py index 0de578c65..241c4189e 100644 --- a/validator_api/test_time_inference.py +++ b/validator_api/test_time_inference.py @@ -58,8 +58,6 @@ def parse_multiple_json(api_response): async def make_api_call(messages, max_tokens, model=None, is_final_answer=False): - ATTEMPTS_PER_STEP = 10 - async def single_attempt(): try: response = await chat_completion( From 8ac0ee9144f99c4c5a6200d1312b2823b2b02872 Mon Sep 17 00:00:00 2001 From: richwardle Date: Mon, 17 Feb 2025 09:52:24 +0000 Subject: [PATCH 3/7] Isort Fix --- data/top100k_domains.csv | 2 +- prompting/rewards/web_retrieval.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/data/top100k_domains.csv b/data/top100k_domains.csv index bc1a6ebd9..8bfa68d9a 100644 --- a/data/top100k_domains.csv +++ b/data/top100k_domains.csv @@ -99997,4 +99997,4 @@ "99996","tankspotter.com","4.51" "99997","targetshootingapp.com","4.51" "99998","tastytalegame.com","4.51" -"99999","tbscan.com","4.51" \ No newline at end of file +"99999","tbscan.com","4.51" diff --git a/prompting/rewards/web_retrieval.py b/prompting/rewards/web_retrieval.py index ecfda2dd1..8993aced8 100644 --- a/prompting/rewards/web_retrieval.py +++ b/prompting/rewards/web_retrieval.py @@ -8,7 +8,6 @@ import numpy as np import pandas as pd from loguru import logger - from pydantic import BaseModel from scipy import spatial from thefuzz import fuzz From b5269f68bbac0d765ae09c88387e1feb918b5e67 Mon Sep 17 00:00:00 2001 From: Rich <34130474+richwardle@users.noreply.github.com> Date: Mon, 17 Feb 2025 10:58:42 +0000 Subject: [PATCH 4/7] Removing More Logs --- neurons/miners/epistula_miner/miner.py | 6 +-- .../miners/epistula_miner/web_retrieval.py | 1 - neurons/validator.py | 8 ++++ prompting/api/miner_availabilities/api.py | 2 - prompting/llms/apis/gpt_wrapper.py | 12 ------ prompting/llms/apis/llm_wrapper.py | 1 - prompting/llms/model_manager.py | 6 +-- .../miner_availability/miner_availability.py | 1 - prompting/rewards/multi_choice.py | 1 - prompting/rewards/scoring.py | 10 ----- prompting/tasks/base_task.py | 2 - prompting/tasks/multi_step_reasoning.py | 5 --- prompting/tasks/task_creation.py | 2 - prompting/tasks/task_sending.py | 41 +++++++------------ prompting/weight_setting/weight_setter.py | 18 -------- shared/config.py | 2 - shared/loop_runner.py | 14 ------- shared/misc.py | 10 ----- validator_api/chat_completion.py | 3 -- validator_api/gpt_endpoints.py | 7 +--- validator_api/scoring_queue.py | 2 +- 21 files changed, 28 insertions(+), 126 deletions(-) diff --git a/neurons/miners/epistula_miner/miner.py b/neurons/miners/epistula_miner/miner.py index 8eaf0750f..e8f32fa7a 100644 --- a/neurons/miners/epistula_miner/miner.py +++ b/neurons/miners/epistula_miner/miner.py @@ -162,9 +162,9 @@ def run(self): except Exception: logger.error("Failed to get external IP") - # logger.info( - # f"Serving miner endpoint {external_ip}:{shared_settings.AXON_PORT} on network: {shared_settings.SUBTENSOR_NETWORK} with netuid: {shared_settings.NETUID}" - # ) + logger.info( + f"Serving miner endpoint {external_ip}:{shared_settings.AXON_PORT} on network: {shared_settings.SUBTENSOR_NETWORK} with netuid: {shared_settings.NETUID}" + ) serve_success = serve_extrinsic( subtensor=shared_settings.SUBTENSOR, diff --git a/neurons/miners/epistula_miner/web_retrieval.py b/neurons/miners/epistula_miner/web_retrieval.py index 654a0b93c..b8dfffaa9 100644 --- a/neurons/miners/epistula_miner/web_retrieval.py +++ b/neurons/miners/epistula_miner/web_retrieval.py @@ -56,7 +56,6 @@ async def get_websites_with_similarity( """ ddgs = PatchedDDGS(proxy=settings.shared_settings.PROXY_URL, verify=False) results = list(ddgs.text(query)) - # logger.debug(f"Got {len(results)} results") urls = [r["href"] for r in results][:n_results] # Fetch and extract content diff --git a/neurons/validator.py b/neurons/validator.py index afb1cd65c..c16420f9b 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -60,6 +60,14 @@ async def spawn_loops(task_queue, scoring_queue, reward_events): logger.info("Starting WeightSetter...") asyncio.create_task(weight_setter.start(reward_events)) + # while True: + # await asyncio.sleep(5) + + # # Check if all tasks are still running + # logger.debug(f"Number of tasks in Task Queue: {len(task_queue)}") + # logger.debug(f"Number of tasks in Scoring Queue: {len(scoring_queue)}") + # logger.debug(f"Number of tasks in Reward Events: {len(reward_events)}") + asyncio.run(spawn_loops(task_queue, scoring_queue, reward_events)) diff --git a/prompting/api/miner_availabilities/api.py b/prompting/api/miner_availabilities/api.py index 48cf13b98..75a737c56 100644 --- a/prompting/api/miner_availabilities/api.py +++ b/prompting/api/miner_availabilities/api.py @@ -12,7 +12,6 @@ async def get_miner_availabilities(uids: list[int] | None = None): if uids: return {uid: miner_availabilities.miners.get(uid) for uid in uids} - # logger.info(f"Returning all miner availabilities for {len(miner_availabilities.miners)} miners") return miner_availabilities.miners @@ -22,7 +21,6 @@ async def get_available_miners( model: str | None = None, k: int = 10, ): - # logger.info(f"Getting {k} available miners for task {task} and model {model}") task_configs = [config for config in TaskRegistry.task_configs if config.task.__name__ == task] task_config = task_configs[0] if task_configs else None return miner_availabilities.get_available_miners(task=task_config, model=model, k=k) diff --git a/prompting/llms/apis/gpt_wrapper.py b/prompting/llms/apis/gpt_wrapper.py index 51fd0d04c..e8bf39d8e 100644 --- a/prompting/llms/apis/gpt_wrapper.py +++ b/prompting/llms/apis/gpt_wrapper.py @@ -85,10 +85,6 @@ def chat_complete( break else: model = shared_settings.GPT_MODEL_CONFIG[model].get("upgrade") - # logger.debug(f"INPUT TOKENS: {input_tokens}") - # logger.warning( - # f"Upgrading to model {model} because output tokens ({output_tokens}) < min tokens({min_tokens})" - # ) if model is None: raise ValueError( f"Minimum tokens ({min_tokens}) exceed the available output tokens ({output_tokens})" @@ -114,8 +110,6 @@ def chat_complete( input_cost = ( response.usage.prompt_tokens * shared_settings.GPT_MODEL_CONFIG[model]["input_token_cost"] ) / 1000 - # logger.debug(f"MSG TOKENS: {messages.get_tokens(model)}") - # logger.debug(f"USAGE: {response.usage}") return response, output_cost + input_cost except Exception as ex: logger.exception(f"GPT call failed: {ex}") @@ -165,10 +159,6 @@ async def chat_complete_async( break else: model = shared_settings.GPT_MODEL_CONFIG[model].get("upgrade") - # logger.debug(f"INPUT TOKENS: {input_tokens}") - # logger.warning( - # f"Upgrading to model {model} because output tokens ({output_tokens}) < min tokens({min_tokens})" - # ) if model is None: raise ValueError( f"Minimum tokens ({min_tokens}) exceed the available output tokens ({output_tokens})" @@ -194,8 +184,6 @@ async def chat_complete_async( input_cost = ( response.usage.prompt_tokens * shared_settings.GPT_MODEL_CONFIG[model]["input_token_cost"] ) / 1000 - # logger.info(f"MSG TOKENS: {messages.get_tokens(model)}") - # logger.info(f"USAGE: {response.usage}") return response, output_cost + input_cost except Exception as ex: logger.exception(f"GPT call failed: {ex}") diff --git a/prompting/llms/apis/llm_wrapper.py b/prompting/llms/apis/llm_wrapper.py index 20dacc338..631ab686c 100644 --- a/prompting/llms/apis/llm_wrapper.py +++ b/prompting/llms/apis/llm_wrapper.py @@ -50,5 +50,4 @@ def chat_complete( logprobs=logprobs, ) response = response.choices[0].message.content - # logger.debug(f"Generated {len(response)} characters using {model}") return response diff --git a/prompting/llms/model_manager.py b/prompting/llms/model_manager.py index 59927b46b..35171d453 100644 --- a/prompting/llms/model_manager.py +++ b/prompting/llms/model_manager.py @@ -37,9 +37,8 @@ def load_model(self, model_config: ModelConfig, force: bool = True): # if force loading is enabled, unload models until there is enough RAM if force: - # logger.debug(f"Forcing model {model_config.llm_model_id} to load.") + logger.debug(f"Forcing model {model_config.llm_model_id} to load.") for active_model in list(self.active_models.keys()): - # logger.debug(f"Checking if model {active_model.llm_model_id} can be unloaded.") if active_model in self.always_active_models: logger.debug(f"Model {active_model.llm_model_id} is always active. Skipping.") continue @@ -63,9 +62,6 @@ def load_model(self, model_config: ModelConfig, force: bool = True): ) try: - # logger.debug( - # f"Loading model... {model_config.llm_model_id} with GPU Utilization: {model_config.min_ram / GPUInfo.free_memory}" - # ) GPUInfo.log_gpu_info() model = ReproducibleHF( diff --git a/prompting/miner_availability/miner_availability.py b/prompting/miner_availability/miner_availability.py index ec4e29763..fd73e7497 100644 --- a/prompting/miner_availability/miner_availability.py +++ b/prompting/miner_availability/miner_availability.py @@ -87,7 +87,6 @@ async def run_step(self): llm_model_availabilities={model: False for model in model_config}, ) - # logger.debug("Miner availabilities updated.") self.current_index = end_index if self.current_index >= len(self.uids): diff --git a/prompting/rewards/multi_choice.py b/prompting/rewards/multi_choice.py index 442f0bc0c..d4ee2ee39 100644 --- a/prompting/rewards/multi_choice.py +++ b/prompting/rewards/multi_choice.py @@ -75,5 +75,4 @@ def reward(self, reference: str, response_event: DendriteResponseEvent, **kwargs timings.append(time.perf_counter() - start_time) rewards.append(reward) - # logger.debug(f"Rewards: {rewards}") return BatchRewardOutput(rewards=np.asarray(rewards), timings=np.asarray(timings)) diff --git a/prompting/rewards/scoring.py b/prompting/rewards/scoring.py index 6f100aae8..3eb0a2913 100644 --- a/prompting/rewards/scoring.py +++ b/prompting/rewards/scoring.py @@ -50,7 +50,6 @@ def add_to_queue( task_id=task_id, ) ) - # logger.debug(f"Added to queue: {task.__class__.__name__}. Queue size: {len(self.scoring_queue)}") async def run_step(self) -> RewardLoggingEvent: await asyncio.sleep(0.1) @@ -62,7 +61,6 @@ async def run_step(self) -> RewardLoggingEvent: or (scoring_config.task.llm_model is None) ] if len(scorable) == 0: - # logger.debug("Nothing to score. Skipping scoring step.") # Run a model_scheduler step to load a new model as there are no more tasks to be scored if len(self.scoring_queue) > 0: await model_scheduler.run_step() @@ -77,10 +75,6 @@ async def run_step(self) -> RewardLoggingEvent: # and there we then calculate the reward reward_pipeline = TaskRegistry.get_task_reward(scoring_config.task) - # logger.debug( - # f"{len(scoring_config.response.completions)} completions to score for task " - # f"{scoring_config.task.__class__.__name__}" - # ) reward_events = reward_pipeline.apply( response_event=scoring_config.response, challenge=scoring_config.task.query, @@ -89,10 +83,6 @@ async def run_step(self) -> RewardLoggingEvent: task=scoring_config.task, ) self.reward_events.append(reward_events) - # logger.debug( - # f"Scored {scoring_config.task.__class__.__name__} {scoring_config.task.task_id} with model " - # f"{scoring_config.task.llm_model_id}" - # ) log_event( RewardLoggingEvent( response_event=scoring_config.response, diff --git a/prompting/tasks/base_task.py b/prompting/tasks/base_task.py index cf8870415..f949aece3 100644 --- a/prompting/tasks/base_task.py +++ b/prompting/tasks/base_task.py @@ -78,7 +78,6 @@ def make_reference(self, dataset_entry: DatasetEntry) -> str: def generate_reference(self, messages: list[str]) -> str: """Generates a reference answer to be used for scoring miner completions""" - # logger.info("🤖 Generating reference...") self.reference = model_manager.get_model(settings.shared_settings.LLM_MODEL).generate( messages=messages ) # This should be a list of dict @@ -92,7 +91,6 @@ def generate_query( messages: list[str], ) -> str: """Generates a query to be used for generating the challenge""" - # logger.info("🤖 Generating query...") llm_messages = [LLMMessage(role="system", content=self.query_system_prompt)] if self.query_system_prompt else [] llm_messages.extend([LLMMessage(role="user", content=message) for message in messages]) diff --git a/prompting/tasks/multi_step_reasoning.py b/prompting/tasks/multi_step_reasoning.py index 0e27c7ba5..6b514ad26 100644 --- a/prompting/tasks/multi_step_reasoning.py +++ b/prompting/tasks/multi_step_reasoning.py @@ -231,11 +231,6 @@ def make_query(self, dataset_entry: Context): return self.query def make_reference(self, dataset_entry: Context): - # logger.info(f"Generating reference for Multi Step Reasoning task with query: {self.query}") steps, total_thinking_time = execute_multi_step_reasoning(user_query=self.query) - # logger.info( - # f"**Steps: {steps}**, **Total thinking time for multi step reasoning: {total_thinking_time} seconds**" - # ) - # logger.info(f"**Total thinking time for multi step reasoning: {total_thinking_time} seconds**") self.reference = steps[-1][1] return self.reference diff --git a/prompting/tasks/task_creation.py b/prompting/tasks/task_creation.py index 89180280a..7a651d685 100644 --- a/prompting/tasks/task_creation.py +++ b/prompting/tasks/task_creation.py @@ -31,10 +31,8 @@ async def start(self, task_queue, scoring_queue): async def run_step(self): if len(self.task_queue) > shared_settings.TASK_QUEUE_LENGTH_THRESHOLD: - # logger.debug("Task queue is full. Skipping task generation.") return None if len(self.scoring_queue) > shared_settings.SCORING_QUEUE_LENGTH_THRESHOLD: - # logger.debug("Scoring queue is full. Skipping task generation.") return None await asyncio.sleep(0.1) try: diff --git a/prompting/tasks/task_sending.py b/prompting/tasks/task_sending.py index 33c05a5aa..f7f9cd6ee 100644 --- a/prompting/tasks/task_sending.py +++ b/prompting/tasks/task_sending.py @@ -24,26 +24,25 @@ NEURON_SAMPLE_SIZE = 100 -# def log_stream_results(stream_results: List[SynapseStreamResult]): -# failed_responses = [ -# response for response in stream_results if response.exception is not None or response.completion is None -# ] -# empty_responses = [ -# response for response in stream_results if response.exception is None and response.completion == "" -# ] -# non_empty_responses = [ -# response for response in stream_results if response.exception is None and response.completion != "" -# ] - -# logger.debug(f"Total of non_empty responses: ({len(non_empty_responses)})") -# logger.debug(f"Total of empty responses: ({len(empty_responses)})") -# logger.debug(f"Total of failed responses: ({len(failed_responses)})") +def log_stream_results(stream_results: List[SynapseStreamResult]): + failed_responses = [ + response for response in stream_results if response.exception is not None or response.completion is None + ] + empty_responses = [ + response for response in stream_results if response.exception is None and response.completion == "" + ] + non_empty_responses = [ + response for response in stream_results if response.exception is None and response.completion != "" + ] + + logger.debug(f"Total of non_empty responses: ({len(non_empty_responses)})") + logger.debug(f"Total of empty responses: ({len(empty_responses)})") + logger.debug(f"Total of failed responses: ({len(failed_responses)})") async def collect_responses(task: BaseTextTask) -> DendriteResponseEvent | None: # Get the list of uids and their axons to query for this step. uids = miner_availabilities.get_available_miners(task=task, model=task.llm_model_id, k=NEURON_SAMPLE_SIZE) - # logger.debug(f"🔍 Querying uids: {uids}") if len(uids) == 0: logger.warning("No available miners. This should already have been caught earlier.") return @@ -58,9 +57,6 @@ async def collect_responses(task: BaseTextTask) -> DendriteResponseEvent | None: if isinstance(task, WebRetrievalTask): body["target_results"] = task.target_results body["timeout"] = task.timeout - - # logger.info(f"🔍 Sending task {task.task_id} with body: {body}") - # logger.debug(f"🔍 Collected responses from {len(stream_results)} miners") stream_results = await query_miners(uids, body) # log_stream_results(stream_results) @@ -75,7 +71,6 @@ async def collect_responses(task: BaseTextTask) -> DendriteResponseEvent | None: shared_settings.INFERENCE_TIMEOUT if isinstance(task, InferenceTask) else shared_settings.NEURON_TIMEOUT ), ) - # logger.debug("🔍 Response event created") return response_event @@ -140,10 +135,8 @@ async def run_step(self) -> ValidatorLoggingEvent | ErrorLoggingEvent | None: exclude (list, optional): The list of uids to exclude from the query. Defaults to []. """ while len(self.scoring_queue) > shared_settings.SCORING_QUEUE_LENGTH_THRESHOLD: - # logger.debug("Scoring queue is full. Waiting 1 second...") await asyncio.sleep(1) while len(self.task_queue) == 0: - # logger.warning("No tasks in queue. Waiting 1 second...") await asyncio.sleep(1) try: # get task from the task queue @@ -154,13 +147,9 @@ async def run_step(self) -> ValidatorLoggingEvent | ErrorLoggingEvent | None: with Timer() as timer: response_event = await collect_responses(task=task) if response_event is None: - # logger.warning("No response event collected. This should not be happening.") return - # logger.debug("🔍 Estimating block") estimated_block = self.estimate_block - # logger.debug("🔍 Creating scoring config") - scoring_config = ScoringConfig( task=task, response=response_event, @@ -169,9 +158,7 @@ async def run_step(self) -> ValidatorLoggingEvent | ErrorLoggingEvent | None: step=self.step, task_id=task.task_id, ) - # logger.debug(f"Collected responses in {timer.final_time:.2f} seconds") self.scoring_queue.append(scoring_config) - # logger.debug(f"SCORING: Added to queue: {task.__class__.__name__}. Queue size: {len(self.scoring_queue)}") # Log the step event. return ValidatorLoggingEvent( diff --git a/prompting/weight_setting/weight_setter.py b/prompting/weight_setting/weight_setter.py index 3ba5ac570..594891c89 100644 --- a/prompting/weight_setting/weight_setter.py +++ b/prompting/weight_setting/weight_setter.py @@ -40,7 +40,6 @@ def apply_reward_func(raw_rewards: np.ndarray, p=0.5): def save_weights(weights: list[np.ndarray]): """Saves the list of numpy arrays to a file.""" - logger.debug("Saving validator state.") # Save all arrays into a single .npz file np.savez_compressed(FILENAME, *weights) @@ -86,17 +85,12 @@ def set_weights( ) = bt.utils.weight_utils.convert_weights_and_uids_for_emit( uids=processed_weight_uids, weights=processed_weights ) - # logger.debug("uint_weights", uint_weights) - # logger.debug("uint_uids", uint_uids) except Exception as ex: logger.exception(f"Issue with setting weights: {ex}") # Create a dataframe from weights and uids and save it as a csv file, with the current step as the filename. if shared_settings.LOG_WEIGHTS: try: - # logger.debug( - # f"Lengths... UIDS: {len(uint_uids)}, WEIGHTS: {len(processed_weights.flatten())}, RAW_WEIGHTS: {len(weights.flatten())}, UINT_WEIGHTS: {len(uint_weights)}" - # ) weights_df = pd.DataFrame( { "step": step, @@ -155,7 +149,6 @@ async def start(self, reward_events, name: str | None = None): try: with np.load(FILENAME) as data: PAST_WEIGHTS = [data[key] for key in data.files] - # logger.debug(f"Loaded persistent weights of length: {len(PAST_WEIGHTS)}") except FileNotFoundError: logger.info("No weights file found - this is expected on a new validator, starting with empty weights") PAST_WEIGHTS = [] @@ -169,8 +162,6 @@ async def run_step(self): if len(self.reward_events) == 0: logger.warning("No reward events in queue, skipping weight setting...") return - # logger.debug(f"Found {len(self.reward_events)} reward events in queue") - # reward_events is a list of lists of WeightedRewardEvents - the 'sublists' each contain the multiple reward events for a single task self.reward_events: list[list[WeightedRewardEvent]] = self.reward_events # to get correct typehinting @@ -217,7 +208,6 @@ async def run_step(self): for task_config, rewards in miner_rewards.items(): r = np.array([x["reward"] / max(1, x["count"]) for x in list(rewards.values())]) - # logger.debug(f"Rewards for task {task_config.task.__name__}: {r}") u = np.array(list(rewards.keys())) if task_config.task == InferenceTask: processed_rewards = r / max(1, (np.sum(r[r > 0]) + 1e-10)) @@ -233,13 +223,6 @@ async def run_step(self): except Exception as ex: logger.exception(f"{ex}") - # mean_value = final_rewards.mean() - # min_value = final_rewards.min() - # max_value = final_rewards.max() - # length = len(final_rewards) - # logger.debug( - # f"Reward stats. Mean: {mean_value:.2f}; Min: {min_value:.4f}; Max: {max_value:.4f}; Count: {length}" - # ) # set weights on chain set_weights( final_rewards, step=self.step, subtensor=shared_settings.SUBTENSOR, metagraph=shared_settings.METAGRAPH @@ -249,5 +232,4 @@ async def run_step(self): await asyncio.sleep(0.01) return final_rewards - weight_setter = WeightSetter() diff --git a/shared/config.py b/shared/config.py index eaad4d858..f7eceb27b 100644 --- a/shared/config.py +++ b/shared/config.py @@ -31,8 +31,6 @@ def config() -> bt.config: # args = parser.parse_args() add_args(parser=parser) args, unknown = parser.parse_known_args() - # logger.info(f"RUNNING WITH ARGS: {' '.join(f'{k}={v}' for k, v in vars(args).items())}") - # logger.info(f"UNKNOWN ARGS: {unknown}") bt.wallet.add_args(parser) bt.subtensor.add_args(parser) bt.axon.add_args(parser) diff --git a/shared/loop_runner.py b/shared/loop_runner.py index 70daec897..a5e6a5a24 100644 --- a/shared/loop_runner.py +++ b/shared/loop_runner.py @@ -33,15 +33,12 @@ async def get_time(self): """Get the current time from the time server with a timeout.""" if not self.sync: time = datetime.datetime.now(datetime.timezone.utc) - # logger.debug(f"Time: {time}") return time try: async with aiohttp.ClientSession() as session: - # logger.info("Waiting for response time") async with session.get(self.time_server_url, timeout=5) as response: if response.status == 200: data = await response.json() - # logger.info("Got response") return datetime.datetime.fromisoformat(data["datetime"].replace("Z", "+00:00")) else: raise Exception(f"Failed to get server time. Status: {response.status}") @@ -64,7 +61,6 @@ async def wait_for_next_execution(self, last_run_time): next_run = self.next_sync_point(current_time) else: next_run = last_run_time + timedelta(seconds=self.interval) - # logger.debug(f"Next run: {next_run}") wait_time = (next_run - current_time).total_seconds() if wait_time > 0: @@ -76,22 +72,15 @@ async def wait_for_next_execution(self, last_run_time): async def run_loop(self): """Run the loop periodically, optionally synchronizing across all instances.""" - # logger.debug(f"Starting loop {self.__class__.__name__}; running: {self.running}") last_run_time = await self.get_time() - # logger.debug(f"Got time of last run: {last_run_time}") try: while self.running: with profiler.measure(self.name): - # logger.debug("Waiting...") next_run = await self.wait_for_next_execution(last_run_time) - # logger.debug("Wait ended") try: await self.run_step() - # run_results = await self.run_step() - # logger.debug(f"Run_results: {run_results}") self.step += 1 - # logger.debug(f"{self.name}: Step {self.step} completed at {next_run}") except Exception as ex: logger.exception(f"Error in loop iteration: {ex}") last_run_time = next_run @@ -101,8 +90,6 @@ async def run_loop(self): logger.error(f"Fatal error in loop: {e}") finally: self.running = False - # logger.info("Loop has been cleaned up.") - # logger.debug("Exiting run_loop") async def start(self, name: str | None = None): """Start the loop.""" @@ -110,7 +97,6 @@ async def start(self, name: str | None = None): logger.warning("Loop is already running.") return self.running = True - # logger.debug(f"{self.name}: Starting loop with {'synchronized' if self.sync else 'non-synchronized'} mode") self._task = asyncio.create_task(self.run_loop(), name=name) async def stop(self): diff --git a/shared/misc.py b/shared/misc.py index 5c66494da..5254f84c7 100644 --- a/shared/misc.py +++ b/shared/misc.py @@ -110,18 +110,8 @@ def ttl_get_block(subtensor: bt.Subtensor | None = None) -> int: def async_log(func): async def wrapper(*args, **kwargs): - # start_time = time.time() - # task_id = id(asyncio.current_task()) - # func_name = func.__name__ - # logger.debug(f"Starting {func_name} on asyncio task {task_id} at {start_time}") - # Execute the wrapped function result = await func(*args, **kwargs) - - # end_time = time.time() - # execution_time = end_time - start_time - # logger.debug(f"Completed {func_name} on asyncio task {task_id} in {execution_time} seconds") - return result return wrapper diff --git a/validator_api/chat_completion.py b/validator_api/chat_completion.py index 9dce10153..338ab4127 100644 --- a/validator_api/chat_completion.py +++ b/validator_api/chat_completion.py @@ -163,7 +163,6 @@ async def collect_remaining_responses( """Collect remaining responses for scoring without blocking the main response.""" try: responses = await remaining - # logger.debug(f"responses to forward: {responses}") for i, response in enumerate(responses): if isinstance(response, Exception): logger.error(f"Error collecting response from uid {uids[i+1]}: {response}") @@ -206,8 +205,6 @@ async def chat_completion( raise HTTPException(status_code=500, detail="No available miners") uids = random.sample(uids, min(len(uids), num_miners)) - logger.debug(f"Querying uids {uids}") - STREAM = body.get("stream", False) # Initialize chunks collection for each miner diff --git a/validator_api/gpt_endpoints.py b/validator_api/gpt_endpoints.py index 68e68e046..e4ae5e0c8 100644 --- a/validator_api/gpt_endpoints.py +++ b/validator_api/gpt_endpoints.py @@ -63,7 +63,6 @@ async def web_retrieval(search_query: str, n_miners: int = 10, n_results: int = raise HTTPException(status_code=500, detail="No available miners") uids = random.sample(uids, min(len(uids), n_miners)) - # logger.debug(f"🔍 Querying uids: {uids}") if len(uids) == 0: logger.warning("No available miners. This should already have been caught earlier.") return @@ -87,9 +86,6 @@ async def web_retrieval(search_query: str, n_miners: int = 10, n_results: int = if isinstance(res, SynapseStreamResult) and res.accumulated_chunks ] distinct_results = list(np.unique(results)) - # logger.info( - # f"🔍 Collected responses from {len(stream_results)} miners. {len(results)} responded successfully with a total of {len(distinct_results)} distinct results" - # ) loaded_results = [] for result in distinct_results: try: @@ -110,8 +106,7 @@ async def test_time_inference(messages: list[dict], model: str = None): async def create_response_stream(messages): async for steps, total_thinking_time in generate_response(messages, model=model): if total_thinking_time is not None: - # logger.info(f"**Total thinking time: {total_thinking_time:.2f} seconds**") - pass + logger.debug(f"**Total thinking time: {total_thinking_time:.2f} seconds**") yield steps, total_thinking_time # Create a streaming response that yields each step diff --git a/validator_api/scoring_queue.py b/validator_api/scoring_queue.py index 7e7d864c6..bed6f42a6 100644 --- a/validator_api/scoring_queue.py +++ b/validator_api/scoring_queue.py @@ -45,7 +45,7 @@ async def run_step(self): scoring_payload = self._scoring_queue.popleft() payload = scoring_payload.payload uids = payload["uid"] - # logger.info(f"Received new organic for scoring, uids: {uids}") + logger.info(f"Received new organic for scoring, uids: {uids}") url = f"http://{shared_settings.VALIDATOR_API}/scoring" try: From 9e12fcd8289ddb39260da8ca7d00acce089aed8b Mon Sep 17 00:00:00 2001 From: Rich <34130474+richwardle@users.noreply.github.com> Date: Mon, 17 Feb 2025 11:15:38 +0000 Subject: [PATCH 5/7] Fix Imports --- shared/misc.py | 1 + 1 file changed, 1 insertion(+) diff --git a/shared/misc.py b/shared/misc.py index 644c5f96c..a5cb6aa3b 100644 --- a/shared/misc.py +++ b/shared/misc.py @@ -1,4 +1,5 @@ import asyncio +import subprocess import time import traceback from functools import lru_cache, update_wrapper From 5fcb948a80b0be2af1f21667ce0b6fee7173ed7a Mon Sep 17 00:00:00 2001 From: richwardle Date: Mon, 17 Feb 2025 11:23:28 +0000 Subject: [PATCH 6/7] Linting --- prompting/rewards/scoring.py | 1 + prompting/tasks/task_sending.py | 2 +- prompting/weight_setting/weight_setter.py | 1 + shared/misc.py | 1 - 4 files changed, 3 insertions(+), 2 deletions(-) diff --git a/prompting/rewards/scoring.py b/prompting/rewards/scoring.py index d4b460b11..795f174f2 100644 --- a/prompting/rewards/scoring.py +++ b/prompting/rewards/scoring.py @@ -1,6 +1,7 @@ import asyncio import threading +from loguru import logger from pydantic import ConfigDict from prompting.llms.model_manager import model_manager, model_scheduler diff --git a/prompting/tasks/task_sending.py b/prompting/tasks/task_sending.py index f7f9cd6ee..42466c7a9 100644 --- a/prompting/tasks/task_sending.py +++ b/prompting/tasks/task_sending.py @@ -24,7 +24,7 @@ NEURON_SAMPLE_SIZE = 100 -def log_stream_results(stream_results: List[SynapseStreamResult]): +def log_stream_results(stream_results): failed_responses = [ response for response in stream_results if response.exception is not None or response.completion is None ] diff --git a/prompting/weight_setting/weight_setter.py b/prompting/weight_setting/weight_setter.py index 594891c89..977fa3303 100644 --- a/prompting/weight_setting/weight_setter.py +++ b/prompting/weight_setting/weight_setter.py @@ -232,4 +232,5 @@ async def run_step(self): await asyncio.sleep(0.01) return final_rewards + weight_setter = WeightSetter() diff --git a/shared/misc.py b/shared/misc.py index a5cb6aa3b..621d470f7 100644 --- a/shared/misc.py +++ b/shared/misc.py @@ -1,4 +1,3 @@ -import asyncio import subprocess import time import traceback From 6c56b324167dacd027d186e6e89555e84f67eb78 Mon Sep 17 00:00:00 2001 From: richwardle Date: Mon, 17 Feb 2025 11:46:46 +0000 Subject: [PATCH 7/7] Switch default hotkey to macrocosmos --- shared/settings.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shared/settings.py b/shared/settings.py index 52e605ec3..e2cb11d3b 100644 --- a/shared/settings.py +++ b/shared/settings.py @@ -88,7 +88,7 @@ class SharedSettings(BaseSettings): # ==== API ===== # Hotkey used to run api, defaults to Macrocosmos - API_HOTKEY: str = Field("5F4tQyWrhfGVcNhoqeiNsR6KjD4wMZ2kfhLj4oHYuyHbZAc3", env="API_HOTKEY") + API_HOTKEY: str = Field("5Cg5QgjMfRqBC6bh8X4PDbQi7UzVRn9eyWXsB8gkyfppFPPy", env="API_HOTKEY") # Scoring request rate limit in seconds. SCORING_RATE_LIMIT_SEC: float = Field(5, env="SCORING_RATE_LIMIT_SEC") # Scoring queue threshold when rate-limit start to kick in, used to query validator API with scoring requests.