diff --git a/subgraph-radio/src/metrics/mod.rs b/subgraph-radio/src/metrics/mod.rs index 7d7fff0..222cd66 100644 --- a/subgraph-radio/src/metrics/mod.rs +++ b/subgraph-radio/src/metrics/mod.rs @@ -113,23 +113,6 @@ pub static RECEIVED_MESSAGES: Lazy = Lazy::new(|| { m }); -#[allow(dead_code)] -pub static LOCAL_PPOIS_TO_COMPARE: Lazy = Lazy::new(|| { - let m = IntGaugeVec::new( - Opts::new( - "local_ppois_to_compare", - "Number of pPOIs stored locally for each subgraph", - ) - .namespace("graphcast") - .subsystem("subgraph_radio"), - &["deployment"], - ) - .expect("Failed to create LOCAL_PPOIS_TO_COMPARE gauges"); - prometheus::register(Box::new(m.clone())) - .expect("Failed to register local_ppois_to_compare gauge"); - m -}); - #[allow(dead_code)] pub static REGISTRY: Lazy = Lazy::new(prometheus::Registry::new); @@ -153,7 +136,6 @@ pub fn start_metrics() { Box::new(CONNECTED_PEERS.clone()), Box::new(GOSSIP_PEERS.clone()), Box::new(RECEIVED_MESSAGES.clone()), - Box::new(LOCAL_PPOIS_TO_COMPARE.clone()), ], ); } diff --git a/subgraph-radio/src/operator/attestation.rs b/subgraph-radio/src/operator/attestation.rs index fccd72d..4dbae71 100644 --- a/subgraph-radio/src/operator/attestation.rs +++ b/subgraph-radio/src/operator/attestation.rs @@ -18,9 +18,7 @@ use graphcast_sdk::{ }; use crate::{ - messages::poi::PublicPoiMessage, - metrics::{ACTIVE_INDEXERS, DIVERGING_SUBGRAPHS, LOCAL_PPOIS_TO_COMPARE}, - state::PersistedState, + messages::poi::PublicPoiMessage, metrics::ACTIVE_INDEXERS, state::PersistedState, OperationError, }; @@ -259,11 +257,6 @@ pub fn save_local_attestation( .entry(block_number) .and_modify(|existing_attestation| *existing_attestation = attestation.clone()) .or_insert(attestation); - - let ppoi_gauge = LOCAL_PPOIS_TO_COMPARE.with_label_values(&[&ipfs_hash]); - - // The value is the total number of senders that are attesting for that subgraph - ppoi_gauge.set(local_attestations.len().try_into().unwrap()); } /// Clear the expired local attestations after comparing with remote results @@ -279,9 +272,6 @@ pub fn clear_local_attestation( let mut blocks_clone: HashMap = HashMap::new(); blocks_clone.extend(blocks.clone()); blocks_clone.remove(&block_number); - let ppoi_gauge = LOCAL_PPOIS_TO_COMPARE.with_label_values(&[&ipfs_hash]); - // The value is the total number of senders that are attesting for that subgraph - ppoi_gauge.set(blocks_clone.len().try_into().unwrap()); local_attestations.insert(ipfs_hash, blocks_clone); }; } @@ -657,7 +647,6 @@ pub async fn process_comparison_results( Err(e) => cmp_errors.push(e.to_string()), } } - DIVERGING_SUBGRAPHS.set(divergent_strings.len().try_into().unwrap()); info!( chainhead_blocks = blocks_str, diff --git a/subgraph-radio/src/operator/mod.rs b/subgraph-radio/src/operator/mod.rs index 9f6bb68..d1f5fe9 100644 --- a/subgraph-radio/src/operator/mod.rs +++ b/subgraph-radio/src/operator/mod.rs @@ -11,8 +11,10 @@ use tracing::{debug, error, info, trace, warn}; use crate::{ chainhead_block_str, messages::poi::{process_valid_message, PublicPoiMessage}, - metrics::{CONNECTED_PEERS, GOSSIP_PEERS, RECEIVED_MESSAGES, VALIDATED_MESSAGES}, - operator::indexer_management::health_query, + metrics::{ + CONNECTED_PEERS, DIVERGING_SUBGRAPHS, GOSSIP_PEERS, RECEIVED_MESSAGES, VALIDATED_MESSAGES, + }, + operator::{attestation::ComparisonResultType, indexer_management::health_query}, }; use graphcast_sdk::{ graphcast_agent::{ @@ -239,6 +241,9 @@ impl RadioOperator { CONNECTED_PEERS.set(connected_peer_count(&self.graphcast_agent().node_handle).unwrap_or_default().try_into().unwrap_or_default()); GOSSIP_PEERS.set(self.graphcast_agent.number_of_peers().try_into().unwrap_or_default()); + let diverged_num = self.persisted_state.comparison_result_typed(ComparisonResultType::Divergent); + DIVERGING_SUBGRAPHS.set(diverged_num.len().try_into().unwrap()); + // Save cache if path provided let _ = &self.config.radio_infrastructure().persistence_file_path.as_ref().map(|path| { self.persisted_state.update_cache(path); diff --git a/subgraph-radio/src/state.rs b/subgraph-radio/src/state.rs index a9a8d0f..5b91cc3 100644 --- a/subgraph-radio/src/state.rs +++ b/subgraph-radio/src/state.rs @@ -144,6 +144,20 @@ impl PersistedState { .cloned() } + /// Getter for comparison results with a certain result type + pub fn comparison_result_typed( + &self, + result_type: ComparisonResultType, + ) -> Vec { + let mut matched_type = vec![]; + for (_key, value) in self.comparison_results() { + if value.result_type == result_type { + matched_type.push(value.clone()); + } + } + matched_type + } + /// Update local_attestations pub async fn update_local(&mut self, local_attestations: Local) { self.local_attestations = local_attestations; @@ -721,4 +735,85 @@ mod tests { "QmAAfLWowm1xkqc41vcygKNwFUvpsDSMbHdHghxmDVmH9x".to_string() ); } + + #[test] + fn test_comparison_result_typed_not_found() { + let mut comparison_results = HashMap::new(); + comparison_results.insert( + "a".to_string(), + ComparisonResult { + result_type: ComparisonResultType::NotFound, + deployment: String::from("Qmhash"), + block_number: 100, + local_attestation: None, + attestations: vec![], + }, + ); + comparison_results.insert( + "b".to_string(), + ComparisonResult { + result_type: ComparisonResultType::Match, + deployment: String::from("Qmhash"), + block_number: 100, + local_attestation: None, + attestations: vec![], + }, + ); + comparison_results.insert( + "c".to_string(), + ComparisonResult { + result_type: ComparisonResultType::Match, + deployment: String::from("Qmhash"), + block_number: 100, + local_attestation: None, + attestations: vec![], + }, + ); + comparison_results.insert( + "d".to_string(), + ComparisonResult { + result_type: ComparisonResultType::Match, + deployment: String::from("Qmhash"), + block_number: 100, + local_attestation: None, + attestations: vec![], + }, + ); + comparison_results.insert( + "e".to_string(), + ComparisonResult { + result_type: ComparisonResultType::Divergent, + deployment: String::from("Qmhash"), + block_number: 100, + local_attestation: None, + attestations: vec![], + }, + ); + comparison_results.insert( + "f".to_string(), + ComparisonResult { + result_type: ComparisonResultType::NotFound, + deployment: String::from("Qmhash"), + block_number: 100, + local_attestation: None, + attestations: vec![], + }, + ); + + let state = PersistedState { + comparison_results: Arc::new(SyncMutex::new(comparison_results)), + local_attestations: Arc::new(SyncMutex::new(HashMap::new())), + remote_ppoi_messages: Arc::new(SyncMutex::new(Vec::new())), + upgrade_intent_messages: Arc::new(SyncMutex::new(HashMap::new())), + }; + + let results = state.comparison_result_typed(ComparisonResultType::Match); + assert_eq!(results.len(), 3); + let results = state.comparison_result_typed(ComparisonResultType::NotFound); + assert_eq!(results.len(), 2); + let results = state.comparison_result_typed(ComparisonResultType::Divergent); + assert_eq!(results.len(), 1); + let results = state.comparison_result_typed(ComparisonResultType::BuildFailed); + assert_eq!(results.len(), 0); + } }