Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove local_ppois_to_compare, refactor diverged_subgraphs #81

Merged
merged 1 commit into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 0 additions & 18 deletions subgraph-radio/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,23 +113,6 @@ pub static RECEIVED_MESSAGES: Lazy<IntCounter> = Lazy::new(|| {
m
});

#[allow(dead_code)]
pub static LOCAL_PPOIS_TO_COMPARE: Lazy<IntGaugeVec> = Lazy::new(|| {
let m = IntGaugeVec::new(
Opts::new(
"local_ppois_to_compare",
"Number of pPOIs stored locally for each subgraph",
)
.namespace("graphcast")
.subsystem("subgraph_radio"),
&["deployment"],
)
.expect("Failed to create LOCAL_PPOIS_TO_COMPARE gauges");
prometheus::register(Box::new(m.clone()))
.expect("Failed to register local_ppois_to_compare gauge");
m
});

#[allow(dead_code)]
pub static REGISTRY: Lazy<prometheus::Registry> = Lazy::new(prometheus::Registry::new);

Expand All @@ -153,7 +136,6 @@ pub fn start_metrics() {
Box::new(CONNECTED_PEERS.clone()),
Box::new(GOSSIP_PEERS.clone()),
Box::new(RECEIVED_MESSAGES.clone()),
Box::new(LOCAL_PPOIS_TO_COMPARE.clone()),
],
);
}
Expand Down
13 changes: 1 addition & 12 deletions subgraph-radio/src/operator/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ use graphcast_sdk::{
};

use crate::{
messages::poi::PublicPoiMessage,
metrics::{ACTIVE_INDEXERS, DIVERGING_SUBGRAPHS, LOCAL_PPOIS_TO_COMPARE},
state::PersistedState,
messages::poi::PublicPoiMessage, metrics::ACTIVE_INDEXERS, state::PersistedState,
OperationError,
};

Expand Down Expand Up @@ -259,11 +257,6 @@ pub fn save_local_attestation(
.entry(block_number)
.and_modify(|existing_attestation| *existing_attestation = attestation.clone())
.or_insert(attestation);

let ppoi_gauge = LOCAL_PPOIS_TO_COMPARE.with_label_values(&[&ipfs_hash]);

// The value is the total number of senders that are attesting for that subgraph
ppoi_gauge.set(local_attestations.len().try_into().unwrap());
}

/// Clear the expired local attestations after comparing with remote results
Expand All @@ -279,9 +272,6 @@ pub fn clear_local_attestation(
let mut blocks_clone: HashMap<u64, Attestation> = HashMap::new();
blocks_clone.extend(blocks.clone());
blocks_clone.remove(&block_number);
let ppoi_gauge = LOCAL_PPOIS_TO_COMPARE.with_label_values(&[&ipfs_hash]);
// The value is the total number of senders that are attesting for that subgraph
ppoi_gauge.set(blocks_clone.len().try_into().unwrap());
local_attestations.insert(ipfs_hash, blocks_clone);
};
}
Expand Down Expand Up @@ -657,7 +647,6 @@ pub async fn process_comparison_results(
Err(e) => cmp_errors.push(e.to_string()),
}
}
DIVERGING_SUBGRAPHS.set(divergent_strings.len().try_into().unwrap());

info!(
chainhead_blocks = blocks_str,
Expand Down
9 changes: 7 additions & 2 deletions subgraph-radio/src/operator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ use tracing::{debug, error, info, trace, warn};
use crate::{
chainhead_block_str,
messages::poi::{process_valid_message, PublicPoiMessage},
metrics::{CONNECTED_PEERS, GOSSIP_PEERS, RECEIVED_MESSAGES, VALIDATED_MESSAGES},
operator::indexer_management::health_query,
metrics::{
CONNECTED_PEERS, DIVERGING_SUBGRAPHS, GOSSIP_PEERS, RECEIVED_MESSAGES, VALIDATED_MESSAGES,
},
operator::{attestation::ComparisonResultType, indexer_management::health_query},
};
use graphcast_sdk::{
graphcast_agent::{
Expand Down Expand Up @@ -239,6 +241,9 @@ impl RadioOperator {
CONNECTED_PEERS.set(connected_peer_count(&self.graphcast_agent().node_handle).unwrap_or_default().try_into().unwrap_or_default());
GOSSIP_PEERS.set(self.graphcast_agent.number_of_peers().try_into().unwrap_or_default());

let diverged_num = self.persisted_state.comparison_result_typed(ComparisonResultType::Divergent);
DIVERGING_SUBGRAPHS.set(diverged_num.len().try_into().unwrap());

// Save cache if path provided
let _ = &self.config.radio_infrastructure().persistence_file_path.as_ref().map(|path| {
self.persisted_state.update_cache(path);
Expand Down
95 changes: 95 additions & 0 deletions subgraph-radio/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,20 @@ impl PersistedState {
.cloned()
}

/// Getter for comparison results with a certain result type
pub fn comparison_result_typed(
&self,
result_type: ComparisonResultType,
) -> Vec<ComparisonResult> {
let mut matched_type = vec![];
for (_key, value) in self.comparison_results() {
if value.result_type == result_type {
matched_type.push(value.clone());
}
}
matched_type
}

/// Update local_attestations
pub async fn update_local(&mut self, local_attestations: Local) {
self.local_attestations = local_attestations;
Expand Down Expand Up @@ -721,4 +735,85 @@ mod tests {
"QmAAfLWowm1xkqc41vcygKNwFUvpsDSMbHdHghxmDVmH9x".to_string()
);
}

#[test]
fn test_comparison_result_typed_not_found() {
let mut comparison_results = HashMap::new();
comparison_results.insert(
"a".to_string(),
ComparisonResult {
result_type: ComparisonResultType::NotFound,
deployment: String::from("Qmhash"),
block_number: 100,
local_attestation: None,
attestations: vec![],
},
);
comparison_results.insert(
"b".to_string(),
ComparisonResult {
result_type: ComparisonResultType::Match,
deployment: String::from("Qmhash"),
block_number: 100,
local_attestation: None,
attestations: vec![],
},
);
comparison_results.insert(
"c".to_string(),
ComparisonResult {
result_type: ComparisonResultType::Match,
deployment: String::from("Qmhash"),
block_number: 100,
local_attestation: None,
attestations: vec![],
},
);
comparison_results.insert(
"d".to_string(),
ComparisonResult {
result_type: ComparisonResultType::Match,
deployment: String::from("Qmhash"),
block_number: 100,
local_attestation: None,
attestations: vec![],
},
);
comparison_results.insert(
"e".to_string(),
ComparisonResult {
result_type: ComparisonResultType::Divergent,
deployment: String::from("Qmhash"),
block_number: 100,
local_attestation: None,
attestations: vec![],
},
);
comparison_results.insert(
"f".to_string(),
ComparisonResult {
result_type: ComparisonResultType::NotFound,
deployment: String::from("Qmhash"),
block_number: 100,
local_attestation: None,
attestations: vec![],
},
);

let state = PersistedState {
comparison_results: Arc::new(SyncMutex::new(comparison_results)),
local_attestations: Arc::new(SyncMutex::new(HashMap::new())),
remote_ppoi_messages: Arc::new(SyncMutex::new(Vec::new())),
upgrade_intent_messages: Arc::new(SyncMutex::new(HashMap::new())),
};

let results = state.comparison_result_typed(ComparisonResultType::Match);
assert_eq!(results.len(), 3);
let results = state.comparison_result_typed(ComparisonResultType::NotFound);
assert_eq!(results.len(), 2);
let results = state.comparison_result_typed(ComparisonResultType::Divergent);
assert_eq!(results.len(), 1);
let results = state.comparison_result_typed(ComparisonResultType::BuildFailed);
assert_eq!(results.len(), 0);
}
}