Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SN1-402 clean logging #586

Merged
merged 11 commits into from
Feb 17, 2025
Prev Previous commit
Next Next commit
Removing More Logs
  • Loading branch information
richwardle committed Feb 17, 2025
commit b5269f68bbac0d765ae09c88387e1feb918b5e67
6 changes: 3 additions & 3 deletions neurons/miners/epistula_miner/miner.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,9 @@ def run(self):
except Exception:
logger.error("Failed to get external IP")

# logger.info(
# f"Serving miner endpoint {external_ip}:{shared_settings.AXON_PORT} on network: {shared_settings.SUBTENSOR_NETWORK} with netuid: {shared_settings.NETUID}"
# )
logger.info(
f"Serving miner endpoint {external_ip}:{shared_settings.AXON_PORT} on network: {shared_settings.SUBTENSOR_NETWORK} with netuid: {shared_settings.NETUID}"
)

serve_success = serve_extrinsic(
subtensor=shared_settings.SUBTENSOR,
Expand Down
1 change: 0 additions & 1 deletion neurons/miners/epistula_miner/web_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ async def get_websites_with_similarity(
"""
ddgs = PatchedDDGS(proxy=settings.shared_settings.PROXY_URL, verify=False)
results = list(ddgs.text(query))
# logger.debug(f"Got {len(results)} results")
urls = [r["href"] for r in results][:n_results]

# Fetch and extract content
Expand Down
8 changes: 8 additions & 0 deletions neurons/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ async def spawn_loops(task_queue, scoring_queue, reward_events):
logger.info("Starting WeightSetter...")
asyncio.create_task(weight_setter.start(reward_events))

# while True:
# await asyncio.sleep(5)

# # Check if all tasks are still running
# logger.debug(f"Number of tasks in Task Queue: {len(task_queue)}")
# logger.debug(f"Number of tasks in Scoring Queue: {len(scoring_queue)}")
# logger.debug(f"Number of tasks in Reward Events: {len(reward_events)}")

asyncio.run(spawn_loops(task_queue, scoring_queue, reward_events))


Expand Down
2 changes: 0 additions & 2 deletions prompting/api/miner_availabilities/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
async def get_miner_availabilities(uids: list[int] | None = None):
if uids:
return {uid: miner_availabilities.miners.get(uid) for uid in uids}
# logger.info(f"Returning all miner availabilities for {len(miner_availabilities.miners)} miners")
return miner_availabilities.miners


Expand All @@ -22,7 +21,6 @@ async def get_available_miners(
model: str | None = None,
k: int = 10,
):
# logger.info(f"Getting {k} available miners for task {task} and model {model}")
task_configs = [config for config in TaskRegistry.task_configs if config.task.__name__ == task]
task_config = task_configs[0] if task_configs else None
return miner_availabilities.get_available_miners(task=task_config, model=model, k=k)
12 changes: 0 additions & 12 deletions prompting/llms/apis/gpt_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,6 @@ def chat_complete(
break
else:
model = shared_settings.GPT_MODEL_CONFIG[model].get("upgrade")
# logger.debug(f"INPUT TOKENS: {input_tokens}")
# logger.warning(
# f"Upgrading to model {model} because output tokens ({output_tokens}) < min tokens({min_tokens})"
# )
if model is None:
raise ValueError(
f"Minimum tokens ({min_tokens}) exceed the available output tokens ({output_tokens})"
Expand All @@ -114,8 +110,6 @@ def chat_complete(
input_cost = (
response.usage.prompt_tokens * shared_settings.GPT_MODEL_CONFIG[model]["input_token_cost"]
) / 1000
# logger.debug(f"MSG TOKENS: {messages.get_tokens(model)}")
# logger.debug(f"USAGE: {response.usage}")
return response, output_cost + input_cost
except Exception as ex:
logger.exception(f"GPT call failed: {ex}")
Expand Down Expand Up @@ -165,10 +159,6 @@ async def chat_complete_async(
break
else:
model = shared_settings.GPT_MODEL_CONFIG[model].get("upgrade")
# logger.debug(f"INPUT TOKENS: {input_tokens}")
# logger.warning(
# f"Upgrading to model {model} because output tokens ({output_tokens}) < min tokens({min_tokens})"
# )
if model is None:
raise ValueError(
f"Minimum tokens ({min_tokens}) exceed the available output tokens ({output_tokens})"
Expand All @@ -194,8 +184,6 @@ async def chat_complete_async(
input_cost = (
response.usage.prompt_tokens * shared_settings.GPT_MODEL_CONFIG[model]["input_token_cost"]
) / 1000
# logger.info(f"MSG TOKENS: {messages.get_tokens(model)}")
# logger.info(f"USAGE: {response.usage}")
return response, output_cost + input_cost
except Exception as ex:
logger.exception(f"GPT call failed: {ex}")
Expand Down
1 change: 0 additions & 1 deletion prompting/llms/apis/llm_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,4 @@ def chat_complete(
logprobs=logprobs,
)
response = response.choices[0].message.content
# logger.debug(f"Generated {len(response)} characters using {model}")
return response
6 changes: 1 addition & 5 deletions prompting/llms/model_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ def load_model(self, model_config: ModelConfig, force: bool = True):

# if force loading is enabled, unload models until there is enough RAM
if force:
# logger.debug(f"Forcing model {model_config.llm_model_id} to load.")
logger.debug(f"Forcing model {model_config.llm_model_id} to load.")
for active_model in list(self.active_models.keys()):
# logger.debug(f"Checking if model {active_model.llm_model_id} can be unloaded.")
if active_model in self.always_active_models:
logger.debug(f"Model {active_model.llm_model_id} is always active. Skipping.")
continue
Expand All @@ -63,9 +62,6 @@ def load_model(self, model_config: ModelConfig, force: bool = True):
)

try:
# logger.debug(
# f"Loading model... {model_config.llm_model_id} with GPU Utilization: {model_config.min_ram / GPUInfo.free_memory}"
# )
GPUInfo.log_gpu_info()

model = ReproducibleHF(
Expand Down
1 change: 0 additions & 1 deletion prompting/miner_availability/miner_availability.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ async def run_step(self):
llm_model_availabilities={model: False for model in model_config},
)

# logger.debug("Miner availabilities updated.")
self.current_index = end_index

if self.current_index >= len(self.uids):
Expand Down
1 change: 0 additions & 1 deletion prompting/rewards/multi_choice.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,4 @@ def reward(self, reference: str, response_event: DendriteResponseEvent, **kwargs
timings.append(time.perf_counter() - start_time)
rewards.append(reward)

# logger.debug(f"Rewards: {rewards}")
return BatchRewardOutput(rewards=np.asarray(rewards), timings=np.asarray(timings))
10 changes: 0 additions & 10 deletions prompting/rewards/scoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ def add_to_queue(
task_id=task_id,
)
)
# logger.debug(f"Added to queue: {task.__class__.__name__}. Queue size: {len(self.scoring_queue)}")

async def run_step(self) -> RewardLoggingEvent:
await asyncio.sleep(0.1)
Expand All @@ -62,7 +61,6 @@ async def run_step(self) -> RewardLoggingEvent:
or (scoring_config.task.llm_model is None)
]
if len(scorable) == 0:
# logger.debug("Nothing to score. Skipping scoring step.")
# Run a model_scheduler step to load a new model as there are no more tasks to be scored
if len(self.scoring_queue) > 0:
await model_scheduler.run_step()
Expand All @@ -77,10 +75,6 @@ async def run_step(self) -> RewardLoggingEvent:

# and there we then calculate the reward
reward_pipeline = TaskRegistry.get_task_reward(scoring_config.task)
# logger.debug(
# f"{len(scoring_config.response.completions)} completions to score for task "
# f"{scoring_config.task.__class__.__name__}"
# )
reward_events = reward_pipeline.apply(
response_event=scoring_config.response,
challenge=scoring_config.task.query,
Expand All @@ -89,10 +83,6 @@ async def run_step(self) -> RewardLoggingEvent:
task=scoring_config.task,
)
self.reward_events.append(reward_events)
# logger.debug(
# f"Scored {scoring_config.task.__class__.__name__} {scoring_config.task.task_id} with model "
# f"{scoring_config.task.llm_model_id}"
# )
log_event(
RewardLoggingEvent(
response_event=scoring_config.response,
Expand Down
2 changes: 0 additions & 2 deletions prompting/tasks/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ def make_reference(self, dataset_entry: DatasetEntry) -> str:

def generate_reference(self, messages: list[str]) -> str:
"""Generates a reference answer to be used for scoring miner completions"""
# logger.info("🤖 Generating reference...")
self.reference = model_manager.get_model(settings.shared_settings.LLM_MODEL).generate(
messages=messages
) # This should be a list of dict
Expand All @@ -92,7 +91,6 @@ def generate_query(
messages: list[str],
) -> str:
"""Generates a query to be used for generating the challenge"""
# logger.info("🤖 Generating query...")
llm_messages = [LLMMessage(role="system", content=self.query_system_prompt)] if self.query_system_prompt else []
llm_messages.extend([LLMMessage(role="user", content=message) for message in messages])

Expand Down
5 changes: 0 additions & 5 deletions prompting/tasks/multi_step_reasoning.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,6 @@ def make_query(self, dataset_entry: Context):
return self.query

def make_reference(self, dataset_entry: Context):
# logger.info(f"Generating reference for Multi Step Reasoning task with query: {self.query}")
steps, total_thinking_time = execute_multi_step_reasoning(user_query=self.query)
# logger.info(
# f"**Steps: {steps}**, **Total thinking time for multi step reasoning: {total_thinking_time} seconds**"
# )
# logger.info(f"**Total thinking time for multi step reasoning: {total_thinking_time} seconds**")
self.reference = steps[-1][1]
return self.reference
2 changes: 0 additions & 2 deletions prompting/tasks/task_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ async def start(self, task_queue, scoring_queue):

async def run_step(self):
if len(self.task_queue) > shared_settings.TASK_QUEUE_LENGTH_THRESHOLD:
# logger.debug("Task queue is full. Skipping task generation.")
return None
if len(self.scoring_queue) > shared_settings.SCORING_QUEUE_LENGTH_THRESHOLD:
# logger.debug("Scoring queue is full. Skipping task generation.")
return None
await asyncio.sleep(0.1)
try:
Expand Down
41 changes: 14 additions & 27 deletions prompting/tasks/task_sending.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,25 @@
NEURON_SAMPLE_SIZE = 100


# def log_stream_results(stream_results: List[SynapseStreamResult]):
# failed_responses = [
# response for response in stream_results if response.exception is not None or response.completion is None
# ]
# empty_responses = [
# response for response in stream_results if response.exception is None and response.completion == ""
# ]
# non_empty_responses = [
# response for response in stream_results if response.exception is None and response.completion != ""
# ]

# logger.debug(f"Total of non_empty responses: ({len(non_empty_responses)})")
# logger.debug(f"Total of empty responses: ({len(empty_responses)})")
# logger.debug(f"Total of failed responses: ({len(failed_responses)})")
def log_stream_results(stream_results: List[SynapseStreamResult]):
failed_responses = [
response for response in stream_results if response.exception is not None or response.completion is None
]
empty_responses = [
response for response in stream_results if response.exception is None and response.completion == ""
]
non_empty_responses = [
response for response in stream_results if response.exception is None and response.completion != ""
]

logger.debug(f"Total of non_empty responses: ({len(non_empty_responses)})")
logger.debug(f"Total of empty responses: ({len(empty_responses)})")
logger.debug(f"Total of failed responses: ({len(failed_responses)})")


async def collect_responses(task: BaseTextTask) -> DendriteResponseEvent | None:
# Get the list of uids and their axons to query for this step.
uids = miner_availabilities.get_available_miners(task=task, model=task.llm_model_id, k=NEURON_SAMPLE_SIZE)
# logger.debug(f"🔍 Querying uids: {uids}")
if len(uids) == 0:
logger.warning("No available miners. This should already have been caught earlier.")
return
Expand All @@ -58,9 +57,6 @@ async def collect_responses(task: BaseTextTask) -> DendriteResponseEvent | None:
if isinstance(task, WebRetrievalTask):
body["target_results"] = task.target_results
body["timeout"] = task.timeout

# logger.info(f"🔍 Sending task {task.task_id} with body: {body}")
# logger.debug(f"🔍 Collected responses from {len(stream_results)} miners")
stream_results = await query_miners(uids, body)
# log_stream_results(stream_results)

Expand All @@ -75,7 +71,6 @@ async def collect_responses(task: BaseTextTask) -> DendriteResponseEvent | None:
shared_settings.INFERENCE_TIMEOUT if isinstance(task, InferenceTask) else shared_settings.NEURON_TIMEOUT
),
)
# logger.debug("🔍 Response event created")
return response_event


Expand Down Expand Up @@ -140,10 +135,8 @@ async def run_step(self) -> ValidatorLoggingEvent | ErrorLoggingEvent | None:
exclude (list, optional): The list of uids to exclude from the query. Defaults to [].
"""
while len(self.scoring_queue) > shared_settings.SCORING_QUEUE_LENGTH_THRESHOLD:
# logger.debug("Scoring queue is full. Waiting 1 second...")
await asyncio.sleep(1)
while len(self.task_queue) == 0:
# logger.warning("No tasks in queue. Waiting 1 second...")
await asyncio.sleep(1)
try:
# get task from the task queue
Expand All @@ -154,13 +147,9 @@ async def run_step(self) -> ValidatorLoggingEvent | ErrorLoggingEvent | None:
with Timer() as timer:
response_event = await collect_responses(task=task)
if response_event is None:
# logger.warning("No response event collected. This should not be happening.")
return

# logger.debug("🔍 Estimating block")
estimated_block = self.estimate_block
# logger.debug("🔍 Creating scoring config")

scoring_config = ScoringConfig(
task=task,
response=response_event,
Expand All @@ -169,9 +158,7 @@ async def run_step(self) -> ValidatorLoggingEvent | ErrorLoggingEvent | None:
step=self.step,
task_id=task.task_id,
)
# logger.debug(f"Collected responses in {timer.final_time:.2f} seconds")
self.scoring_queue.append(scoring_config)
# logger.debug(f"SCORING: Added to queue: {task.__class__.__name__}. Queue size: {len(self.scoring_queue)}")

# Log the step event.
return ValidatorLoggingEvent(
Expand Down
18 changes: 0 additions & 18 deletions prompting/weight_setting/weight_setter.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ def apply_reward_func(raw_rewards: np.ndarray, p=0.5):

def save_weights(weights: list[np.ndarray]):
"""Saves the list of numpy arrays to a file."""
logger.debug("Saving validator state.")
# Save all arrays into a single .npz file
np.savez_compressed(FILENAME, *weights)

Expand Down Expand Up @@ -86,17 +85,12 @@ def set_weights(
) = bt.utils.weight_utils.convert_weights_and_uids_for_emit(
uids=processed_weight_uids, weights=processed_weights
)
# logger.debug("uint_weights", uint_weights)
# logger.debug("uint_uids", uint_uids)
except Exception as ex:
logger.exception(f"Issue with setting weights: {ex}")

# Create a dataframe from weights and uids and save it as a csv file, with the current step as the filename.
if shared_settings.LOG_WEIGHTS:
try:
# logger.debug(
# f"Lengths... UIDS: {len(uint_uids)}, WEIGHTS: {len(processed_weights.flatten())}, RAW_WEIGHTS: {len(weights.flatten())}, UINT_WEIGHTS: {len(uint_weights)}"
# )
weights_df = pd.DataFrame(
{
"step": step,
Expand Down Expand Up @@ -155,7 +149,6 @@ async def start(self, reward_events, name: str | None = None):
try:
with np.load(FILENAME) as data:
PAST_WEIGHTS = [data[key] for key in data.files]
# logger.debug(f"Loaded persistent weights of length: {len(PAST_WEIGHTS)}")
except FileNotFoundError:
logger.info("No weights file found - this is expected on a new validator, starting with empty weights")
PAST_WEIGHTS = []
Expand All @@ -169,8 +162,6 @@ async def run_step(self):
if len(self.reward_events) == 0:
logger.warning("No reward events in queue, skipping weight setting...")
return
# logger.debug(f"Found {len(self.reward_events)} reward events in queue")

# reward_events is a list of lists of WeightedRewardEvents - the 'sublists' each contain the multiple reward events for a single task
self.reward_events: list[list[WeightedRewardEvent]] = self.reward_events # to get correct typehinting

Expand Down Expand Up @@ -217,7 +208,6 @@ async def run_step(self):

for task_config, rewards in miner_rewards.items():
r = np.array([x["reward"] / max(1, x["count"]) for x in list(rewards.values())])
# logger.debug(f"Rewards for task {task_config.task.__name__}: {r}")
u = np.array(list(rewards.keys()))
if task_config.task == InferenceTask:
processed_rewards = r / max(1, (np.sum(r[r > 0]) + 1e-10))
Expand All @@ -233,13 +223,6 @@ async def run_step(self):
except Exception as ex:
logger.exception(f"{ex}")

# mean_value = final_rewards.mean()
# min_value = final_rewards.min()
# max_value = final_rewards.max()
# length = len(final_rewards)
# logger.debug(
# f"Reward stats. Mean: {mean_value:.2f}; Min: {min_value:.4f}; Max: {max_value:.4f}; Count: {length}"
# )
# set weights on chain
set_weights(
final_rewards, step=self.step, subtensor=shared_settings.SUBTENSOR, metagraph=shared_settings.METAGRAPH
Expand All @@ -249,5 +232,4 @@ async def run_step(self):
await asyncio.sleep(0.01)
return final_rewards


weight_setter = WeightSetter()
2 changes: 0 additions & 2 deletions shared/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ def config() -> bt.config:
# args = parser.parse_args()
add_args(parser=parser)
args, unknown = parser.parse_known_args()
# logger.info(f"RUNNING WITH ARGS: {' '.join(f'{k}={v}' for k, v in vars(args).items())}")
# logger.info(f"UNKNOWN ARGS: {unknown}")
bt.wallet.add_args(parser)
bt.subtensor.add_args(parser)
bt.axon.add_args(parser)
Expand Down
Loading