diff --git a/crates/mordor/src/lib.rs b/crates/mordor/src/lib.rs index 20d6156..494603f 100644 --- a/crates/mordor/src/lib.rs +++ b/crates/mordor/src/lib.rs @@ -9,6 +9,8 @@ pub const SLOT_DURATION: u64 = 12; pub const SLOTS_PER_EPOCH: u64 = 32; /// Duration of each epoch in seconds pub const EPOCH_DURATION: u64 = SLOT_DURATION * SLOTS_PER_EPOCH; +/// Number of slots in a period +pub const SLOTS_PER_PERIOD: u64 = 8192; pub type SlotTiming = (u64, u8, u8); diff --git a/crates/parser/Cargo.toml b/crates/parser/Cargo.toml index 76c86c5..47edd0c 100644 --- a/crates/parser/Cargo.toml +++ b/crates/parser/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] - +mordor = { path = "../mordor" } alloy.workspace = true alloy-primitives.workspace = true serde.workspace = true diff --git a/crates/parser/src/beacon/beacon_finality_checkpoints.rs b/crates/parser/src/beacon/beacon_finality_checkpoints.rs index ba4d0b6..340887d 100644 --- a/crates/parser/src/beacon/beacon_finality_checkpoints.rs +++ b/crates/parser/src/beacon/beacon_finality_checkpoints.rs @@ -32,7 +32,7 @@ impl<'a> BeaconFinalityData { } else { let code = find_field(input, b"\"code\":", b",")?; let code_str = std::str::from_utf8(&input[code.0..code.1]).ok()?; - Some(Self{ + Some(Self { previous_justified: FinalityCheckpoint::default(), current_justified: FinalityCheckpoint::default(), finalized: FinalityCheckpoint::default(), diff --git a/crates/parser/src/beacon/beacon_genesis_parser.rs b/crates/parser/src/beacon/beacon_genesis_parser.rs index 68c712d..cd325dc 100644 --- a/crates/parser/src/beacon/beacon_genesis_parser.rs +++ b/crates/parser/src/beacon/beacon_genesis_parser.rs @@ -50,7 +50,6 @@ impl<'a> RawJsonResponse<'a> { }); } - let pos = data_start + data_marker.len(); let mut bracket_depth = 0; let mut data_end = pos; diff --git a/crates/parser/src/beacon/light_client_bootstrap.rs b/crates/parser/src/beacon/light_client_bootstrap.rs index b00fb98..b576761 100644 --- a/crates/parser/src/beacon/light_client_bootstrap.rs +++ b/crates/parser/src/beacon/light_client_bootstrap.rs @@ -1,40 +1,28 @@ use alloy_primitives::{B256, U64}; -use crate::{find_field, hex_to_b256}; - -#[derive(Debug, Default)] -pub struct LightClientBootstrap<'a> { - pub version: &'a str, +use crate::{ + find_field, hex_to_b256, + types::{Beacon, SyncCommittee}, +}; + +#[derive(Debug, Default, Clone)] +pub struct LightClientBootstrap { + pub version: String, pub header: Header, - pub current_sync_committee: CurrentSyncCommittee, + pub current_sync_committee: SyncCommittee, pub current_sync_committee_branch: Vec, pub code: Option, } -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct Header { pub beacon: Beacon, } -#[derive(Debug, Default)] -pub struct Beacon { - pub slot: U64, - pub proposer_index: U64, - pub parent_root: B256, - pub state_root: B256, - pub body_root: B256, -} - -#[derive(Debug, Default)] -pub struct CurrentSyncCommittee { - pub pub_keys: Vec, - pub aggregate_pubkey: B256, -} - -impl<'a> LightClientBootstrap<'a> { - pub fn parse(input: &'a [u8]) -> Option { - if memchr::memmem::find(input, b"\"code\":").is_some() { - let code = find_field(input, b"\"code\":", b",")?; +impl LightClientBootstrap { + pub fn parse(input: &[u8]) -> Option { + if let Some(_pos) = memchr::memmem::find(input, b"\"code\":") { + let code = find_field(input, b"\"code\":", b"}")?; let code_str = std::str::from_utf8(&input[code.0..code.1]).ok()?; return Some(Self { code: Some(code_str.parse().ok()?), @@ -42,6 +30,7 @@ impl<'a> LightClientBootstrap<'a> { }); } + let version = find_field(input, b"\"version\":\"", b"\"")?; let slot = find_field(input, b"\"slot\":\"", b"\"")?; let proposer_index = find_field(input, b"\"proposer_index\":\"", b"\"")?; @@ -64,7 +53,7 @@ impl<'a> LightClientBootstrap<'a> { body_root: hex_to_b256(&input[body_root.0..body_root.1]), }; - let current_sync_committee = CurrentSyncCommittee { + let current_sync_committee = SyncCommittee { pub_keys: Self::parse_pub_keys_array(input)? .iter() .map(|&(start, end)| hex_to_b256(&input[start..end])) @@ -80,7 +69,9 @@ impl<'a> LightClientBootstrap<'a> { let header = Header { beacon }; Some(LightClientBootstrap { - version: std::str::from_utf8(&input[version.0..version.1]).ok()?, + version: std::str::from_utf8(&input[version.0..version.1]) + .ok()? + .to_string(), header, current_sync_committee, current_sync_committee_branch, diff --git a/crates/parser/src/beacon/light_client_update.rs b/crates/parser/src/beacon/light_client_update.rs new file mode 100644 index 0000000..03dc5e9 --- /dev/null +++ b/crates/parser/src/beacon/light_client_update.rs @@ -0,0 +1,212 @@ +use alloy_primitives::{B256, U64}; + +use crate::{ + find_field, hex_to_b256, hex_to_u64, + types::{Beacon, SyncCommittee}, +}; + +#[derive(Debug, Default, Clone)] +pub struct Updates { + pub version: String, + pub attested_header: Beacon, + pub next_sync_committee_branch: Vec, + pub next_sync_committee: SyncCommittee, + pub finalized_header: Beacon, + pub finality_branch: Vec, + pub sync_aggregate: SyncAggregate, + pub signature_slot: U64, + pub code: Option, +} + +#[derive(Debug, Default, Clone, Copy)] +pub struct SyncAggregate { + pub sync_committee_bits: U64, + pub sync_committee_signature: B256, +} + +impl<'a> Updates { + pub fn parse(input: &'a [u8]) -> Option { + if let Some(_pos) = memchr::memmem::find(input, b"\"code\":") { + let code = find_field(input, b"\"code\":", b"}")?; + let code_str = std::str::from_utf8(&input[code.0..code.1]).ok()?; + return Some(Self { + code: Some(code_str.parse().ok()?), + ..Default::default() + }); + } + + let version = find_field(input, b"\"version\":\"", b"\"")?; + + let signature_key = find_field(input, b"\"signature_slot\":\"", b"\"")?; + + let finalized_header = Self::parse_header(input, b"\"finalized_header\":")?; + let attested_header = Self::parse_header(input, b"\"attested_header\":")?; + let sync_committee_bits = find_field(input, b"\"sync_committee_bits\":\"", b"\"")?; + let sync_committee_signatures = + find_field(input, b"\"sync_committee_signatures\":\"", b"\"")?; + + let finality_branch: Vec = Self::finality_branch(input)? + .iter() + .map(|&(start, end)| hex_to_b256(&input[start..end])) + .collect(); + + let next_sync_committee_branch: Vec = Self::next_sync_committee_branch(input)? + .iter() + .map(|&(start, end)| hex_to_b256(&input[start..end])) + .collect(); + + let pubkeys: Vec = Self::pubkeys(input)? + .iter() + .map(|&(start, end)| hex_to_b256(&input[start..end])) + .collect(); + + let beacon_a = Self::parse_beacon(&input[attested_header.0..attested_header.1])?; + + let aggregate_pub_key = find_field(input, b"\"aggregate_pubkey\":\"", b"\"")?; + + let beacon_f = Self::parse_beacon(&input[finalized_header.0..finalized_header.1])?; + + let sync_aggregate = SyncAggregate { + sync_committee_bits: hex_to_u64(&input[sync_committee_bits.0..sync_committee_bits.1]), + sync_committee_signature: hex_to_b256( + &input[sync_committee_signatures.0..sync_committee_signatures.1], + ), + }; + + let next_sync_committee = SyncCommittee { + pub_keys: pubkeys, + aggregate_pubkey: hex_to_b256(&input[aggregate_pub_key.0..aggregate_pub_key.1]), + }; + + Some(Updates { + version: std::str::from_utf8(&input[version.0..version.1]) + .ok()? + .to_string(), + attested_header: beacon_a, + finalized_header: beacon_f, + finality_branch, + sync_aggregate, + signature_slot: hex_to_u64(&input[signature_key.0..signature_key.1]), + next_sync_committee, + next_sync_committee_branch, + code: None, + }) + } + + fn parse_header(input: &[u8], key: &[u8]) -> Option<(usize, usize)> { + let start = memchr::memmem::find(input, key)? + key.len(); + let mut depth = 0; + let mut end = start; + + for (i, &b) in input[start..].iter().enumerate() { + match b { + b'{' => depth += 1, + b'}' => { + depth -= 1; + if depth == 0 { + end = start + i + 1; + break; + } + } + _ => {} + } + } + Some((start, end)) + } + + fn parse_beacon(input: &[u8]) -> Option { + let slot = find_field(input, b"\"slot\":\"", b"\"")?; + let proposer_index = find_field(input, b"\"proposer_index\":\"", b"\"")?; + let parent_root = find_field(input, b"\"parent_root\":\"", b"\"")?; + let state_root = find_field(input, b"\"state_root\":\"", b"\"")?; + let body_root = find_field(input, b"\"body_root\":\"", b"\"")?; + + let slot = &input[slot.0..slot.1]; + let proposer_index = &input[proposer_index.0..proposer_index.1]; + Some(Beacon { + slot: std::str::from_utf8(slot).unwrap().parse().unwrap(), + proposer_index: std::str::from_utf8(proposer_index) + .unwrap() + .parse() + .unwrap(), + parent_root: hex_to_b256(&input[parent_root.0..parent_root.1]), + state_root: hex_to_b256(&input[state_root.0..state_root.1]), + body_root: hex_to_b256(&input[body_root.0..body_root.1]), + }) + } + + pub fn finality_branch(data: &[u8]) -> Option> { + let start = memchr::memmem::find(data, b"\"finality_branch\":[")?; + let mut pos = start + b"\"finality_branch\":[".len(); + let mut result = Vec::new(); + + while data[pos] != b']' { + while data[pos] != b'"' && data[pos] != b']' { + pos += 1; + } + if data[pos] == b']' { + break; + } + pos += 1; + let committee_start = pos; + + while data[pos] != b'"' { + pos += 1; + } + result.push((committee_start, pos)); + pos += 1; + } + + Some(result) + } + + pub fn pubkeys(data: &[u8]) -> Option> { + let start = memchr::memmem::find(data, b"\"pubkeys\":[")?; + let mut pos = start + b"\"pubkeys\":[".len(); + let mut result = Vec::new(); + + while data[pos] != b']' { + while data[pos] != b'"' && data[pos] != b']' { + pos += 1; + } + if data[pos] == b']' { + break; + } + pos += 1; + let committee_start = pos; + + while data[pos] != b'"' { + pos += 1; + } + result.push((committee_start, pos)); + pos += 1; + } + + Some(result) + } + + pub fn next_sync_committee_branch(data: &[u8]) -> Option> { + let start = memchr::memmem::find(data, b"\"next_sync_committee_branch\":[")?; + let mut pos = start + b"\"next_sync_committee_branch\":[".len(); + let mut result = Vec::new(); + + while data[pos] != b']' { + while data[pos] != b'"' && data[pos] != b']' { + pos += 1; + } + if data[pos] == b']' { + break; + } + pos += 1; + let committee_start = pos; + + while data[pos] != b'"' { + pos += 1; + } + result.push((committee_start, pos)); + pos += 1; + } + + Some(result) + } +} diff --git a/crates/parser/src/beacon/light_finality_update.rs b/crates/parser/src/beacon/light_finality_update.rs new file mode 100644 index 0000000..6e2a5ce --- /dev/null +++ b/crates/parser/src/beacon/light_finality_update.rs @@ -0,0 +1,139 @@ +use crate::{find_field, hex_to_b256, types::Beacon}; +use alloy_primitives::{B256, U64}; + +#[derive(Debug, Default, Clone)] +pub struct FinalityUpdate { + pub version: String, + pub attested_header: Beacon, + pub finalized_header: Beacon, + pub finality_branch: Vec, + pub sync_aggregate: SyncAggregate, + pub signature_slot: U64, + pub code: Option, +} + +#[derive(Debug, Default, Clone, Copy)] +pub struct SyncAggregate { + pub sync_committee_bits: B256, + pub sync_committee_signature: B256, +} + +impl FinalityUpdate { + pub fn parse(input: &[u8]) -> Option { + if let Some(_pos) = memchr::memmem::find(input, b"\"code\":") { + let code = find_field(input, b"\"code\":", b"}")?; + let code_str = std::str::from_utf8(&input[code.0..code.1]).ok()?; + return Some(Self { + code: Some(code_str.parse().ok()?), + ..Default::default() + }); + } + + let version = find_field(input, b"\"version\":\"", b"\"")?; + let signature_slot = find_field(input, b"\"signature_slot\":\"", b"\"")?; + let sync_committee_bits = find_field(input, b"\"sync_committee_bits\":\"", b"\"")?; + let sync_committee_signature = + find_field(input, b"\"sync_committee_signature\":\"", b"\"")?; + + let finalized_header = Self::parse_header(input, b"\"finalized_header\":")?; + let attested_header = Self::parse_header(input, b"\"attested_header\":")?; + + let finality_branch: Vec = Self::finality_branch(input)? + .iter() + .map(|&(start, end)| hex_to_b256(&input[start..end])) + .collect(); + + let beacon_f = Self::parse_beacon(&input[finalized_header.0..finalized_header.1])?; + let beacon_a = Self::parse_beacon(&input[attested_header.0..attested_header.1])?; + + let sync_committee_bits = &input[sync_committee_bits.0..sync_committee_bits.1]; + let sync_aggregate = SyncAggregate { + sync_committee_bits: hex_to_b256(sync_committee_bits), + sync_committee_signature: hex_to_b256( + &input[sync_committee_signature.0..sync_committee_signature.1], + ), + }; + + Some(FinalityUpdate { + version: std::str::from_utf8(&input[version.0..version.1]) + .ok()? + .to_string(), + attested_header: beacon_a, + finalized_header: beacon_f, + finality_branch, + sync_aggregate, + signature_slot: std::str::from_utf8(&input[signature_slot.0..signature_slot.1]) + .unwrap() + .parse() + .unwrap(), + code: None, + }) + } + + fn parse_header(input: &[u8], key: &[u8]) -> Option<(usize, usize)> { + let start = memchr::memmem::find(input, key)? + key.len(); + let mut depth = 0; + let mut end = start; + + for (i, &b) in input[start..].iter().enumerate() { + match b { + b'{' => depth += 1, + b'}' => { + depth -= 1; + if depth == 0 { + end = start + i + 1; + break; + } + } + _ => {} + } + } + Some((start, end)) + } + + fn parse_beacon(input: &[u8]) -> Option { + let slot = find_field(input, b"\"slot\":\"", b"\"")?; + let proposer_index = find_field(input, b"\"proposer_index\":\"", b"\"")?; + let parent_root = find_field(input, b"\"parent_root\":\"", b"\"")?; + let state_root = find_field(input, b"\"state_root\":\"", b"\"")?; + let body_root = find_field(input, b"\"body_root\":\"", b"\"")?; + + let slot = &input[slot.0..slot.1]; + let proposer_index = &input[proposer_index.0..proposer_index.1]; + Some(Beacon { + slot: std::str::from_utf8(slot).unwrap().parse().unwrap(), + proposer_index: std::str::from_utf8(proposer_index) + .unwrap() + .parse() + .unwrap(), + parent_root: hex_to_b256(&input[parent_root.0..parent_root.1]), + state_root: hex_to_b256(&input[state_root.0..state_root.1]), + body_root: hex_to_b256(&input[body_root.0..body_root.1]), + }) + } + + fn finality_branch(data: &[u8]) -> Option> { + let start = memchr::memmem::find(data, b"\"finality_branch\":[")?; + let mut pos = start + b"\"finality_branch\":[".len(); + let mut result = Vec::new(); + + while data[pos] != b']' { + while data[pos] != b'"' && data[pos] != b']' { + pos += 1; + } + if data[pos] == b']' { + break; + } + pos += 1; + let committee_start = pos; + + while data[pos] != b'"' { + pos += 1; + } + result.push((committee_start, pos)); + pos += 1; + } + + Some(result) + } +} diff --git a/crates/parser/src/beacon/light_optimistic_update.rs b/crates/parser/src/beacon/light_optimistic_update.rs new file mode 100644 index 0000000..48a6bf6 --- /dev/null +++ b/crates/parser/src/beacon/light_optimistic_update.rs @@ -0,0 +1,125 @@ +use alloy_primitives::{B256, U64}; + +use crate::{find_field, hex_to_b256, types::Beacon}; + +#[derive(Debug, Default, Clone)] +pub struct LightOptimisticUpdate { + pub version: String, + pub attested_header: Beacon, + pub sync_aggregate: SyncAggregate, + pub signature_slot: U64, + pub code: Option, +} + +#[derive(Debug, Default, Clone, Copy)] +pub struct SyncAggregate { + pub sync_committee_bits: B256, + pub sync_committee_signature: B256, +} + +impl LightOptimisticUpdate { + pub fn parse(input: &[u8]) -> Option { + if let Some(_pos) = memchr::memmem::find(input, b"\"code\":") { + let code = find_field(input, b"\"code\":", b"}")?; + let code_str = std::str::from_utf8(&input[code.0..code.1]).ok()?; + return Some(Self { + code: Some(code_str.parse().ok()?), + ..Default::default() + }); + } + + + let version = find_field(input, b"\"version\":\"", b"\"")?; + let slot = find_field(input, b"\"slot\":\"", b"\"")?; + let proposer_index = find_field(input, b"\"proposer_index\":\"", b"\"")?; + let parent_root = find_field(input, b"\"parent_root\":\"", b"\"")?; + let state_root = find_field(input, b"\"state_root\":\"", b"\"")?; + let body_root = find_field(input, b"\"body_root\":\"", b"\"")?; + let sync_committee_bits = find_field(input, b"\"sync_committee_bits\":\"", b"\"")?; + let sync_committee_signatures = + find_field(input, b"\"sync_committee_signature\":\"", b"\"")?; + + let sync_aggregate = SyncAggregate { + sync_committee_bits: hex_to_b256(&input[sync_committee_bits.0..sync_committee_bits.1]), + sync_committee_signature: hex_to_b256( + &input[sync_committee_signatures.0..sync_committee_signatures.1], + ), + }; + + let signature_slot = find_field(&input, b"\"signature_slot\":\"", b"\"")?; + + let beacon = Beacon { + slot: std::str::from_utf8(&input[slot.0..slot.1]) + .ok()? + .parse() + .ok()?, + proposer_index: std::str::from_utf8(&input[proposer_index.0..proposer_index.1]) + .ok()? + .parse() + .ok()?, + parent_root: hex_to_b256(&input[parent_root.0..parent_root.1]), + state_root: hex_to_b256(&input[state_root.0..state_root.1]), + body_root: hex_to_b256(&input[body_root.0..body_root.1]), + }; + + Some(LightOptimisticUpdate { + version: std::str::from_utf8(&input[version.0..version.1]) + .ok()? + .to_string(), + attested_header: beacon, + sync_aggregate, + signature_slot: std::str::from_utf8(&input[signature_slot.0..signature_slot.1]).unwrap().parse().unwrap(), + code: None, + }) + } + + pub fn parse_pub_keys_array(data: &[u8]) -> Option> { + let start = memchr::memmem::find(data, b"\"pubkeys\":[")?; + let mut pos = start + b"\"pubkeys\":[".len(); + let mut result = Vec::new(); + + while data[pos] != b']' { + while data[pos] != b'"' && data[pos] != b']' { + pos += 1; + } + if data[pos] == b']' { + break; + } + pos += 1; + let committee_start = pos; + + while data[pos] != b'"' { + pos += 1; + } + result.push((committee_start, pos)); + pos += 1; + } + + Some(result) + } + + pub fn parse_committee_branch_array(data: &[u8]) -> Option> { + let start = memchr::memmem::find(data, b"\"current_sync_committee_branch\":[")?; + let mut pos = start + b"\"current_sync_committee_branch\":[".len(); + let mut result = Vec::new(); + + while data[pos] != b']' { + while data[pos] != b'"' && data[pos] != b']' { + pos += 1; + } + if data[pos] == b']' { + break; + } + pos += 1; + let committee_start = pos; + + while data[pos] != b'"' { + pos += 1; + } + result.push((committee_start, pos)); + pos += 1; + } + + Some(result) + } +} diff --git a/crates/parser/src/beacon/mod.rs b/crates/parser/src/beacon/mod.rs index 2b04f14..947226c 100644 --- a/crates/parser/src/beacon/mod.rs +++ b/crates/parser/src/beacon/mod.rs @@ -1,5 +1,8 @@ pub mod beacon_attestation; +pub mod beacon_finality_checkpoints; pub mod beacon_genesis_parser; pub mod beacon_state_root_parser; -pub mod beacon_finality_checkpoints; -pub mod light_client_bootstrap; \ No newline at end of file +pub mod light_client_bootstrap; +pub mod light_client_update; +pub mod light_finality_update; +pub mod light_optimistic_update; diff --git a/crates/parser/src/lib.rs b/crates/parser/src/lib.rs index dc2ccbf..2d57483 100644 --- a/crates/parser/src/lib.rs +++ b/crates/parser/src/lib.rs @@ -1,11 +1,11 @@ use alloy_primitives::{Address, B256, U256, U64}; +pub mod beacon; pub mod block_parser; pub mod log_parser; pub mod parser_for_small_response; pub mod tx_parser; pub mod types; -pub mod beacon; #[inline] pub fn hex_to_address(hex: &[u8]) -> Address { @@ -143,4 +143,3 @@ pub fn find_field(data: &[u8], prefix: &[u8], suffix: &[u8]) -> Option<(usize, u let end = start + memchr::memmem::find(&data[start..], suffix)?; Some((start, end)) } - diff --git a/crates/parser/src/types.rs b/crates/parser/src/types.rs index 86ab79b..e3fa255 100644 --- a/crates/parser/src/types.rs +++ b/crates/parser/src/types.rs @@ -119,8 +119,6 @@ pub struct Block { pub uncles: Vec, } - - #[derive(Debug)] pub struct RawJsonResponse<'a> { pub data: &'a [u8], @@ -269,7 +267,7 @@ pub struct Header { pub beacon: Beacon, } -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct Beacon { pub slot: U64, pub proposer_index: U64, @@ -278,7 +276,7 @@ pub struct Beacon { pub body_root: B256, } -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct SyncCommittee { pub pub_keys: Vec, pub aggregate_pubkey: B256, @@ -331,7 +329,6 @@ pub struct LightClientStore { pub current_max_active_participants: U64, } - pub static MAINNET_BOOTNODES : [&str; 4] = [ "enode://d860a01f9722d78051619d1e2351aba3f43f943f6f00718d1b9baa4101932a1f5011f16bb2b1bb35db20d6fe28fa0bf09636d26a87d31de9ec6203eeedb1f666@18.138.108.67:30303", // bootnode-aws-ap-southeast-1-001 "enode://22a8232c3abc76a16ae9d6c3b164f98775fe226f0917b0ca871128a74a8e9630b458460865bab457221f1d448dd9791d24c4e5d88786180ac185df813a68d4de@3.209.45.79:30303", // bootnode-aws-us-east-1-001 diff --git a/crates/shire/Cargo.toml b/crates/shire/Cargo.toml index 1ebe5fa..1378dd2 100644 --- a/crates/shire/Cargo.toml +++ b/crates/shire/Cargo.toml @@ -15,4 +15,6 @@ async-trait.workspace = true serde.workspace = true serde_json.workspace = true libp2p.workspace = true -tokio.workspace = true \ No newline at end of file +tokio.workspace = true +futures.workspace = true +reqwest.workspace = true \ No newline at end of file diff --git a/crates/shire/src/checkpoint.rs b/crates/shire/src/checkpoint.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/shire/src/checkpoint.rs @@ -0,0 +1 @@ + diff --git a/crates/shire/src/concensus.rs b/crates/shire/src/concensus.rs index b016cf6..e19a8bf 100644 --- a/crates/shire/src/concensus.rs +++ b/crates/shire/src/concensus.rs @@ -76,6 +76,12 @@ pub enum ConsensusError { SyncError(String), #[error("Invalid Signature")] InvalidSignature, + #[error("Invalid response: {0}")] + Response(String), + #[error("No consensus")] + NoConsensus, + #[error("Parse error")] + Parse, } #[async_trait::async_trait] diff --git a/crates/shire/src/concensus_rpc.rs b/crates/shire/src/concensus_rpc.rs deleted file mode 100644 index 0e6cf7b..0000000 --- a/crates/shire/src/concensus_rpc.rs +++ /dev/null @@ -1,71 +0,0 @@ -use std::sync::Arc; - -use alloy_primitives::U256; -use async_trait::async_trait; -use palantiri::RpcError; -use parser::{hex_to_u256, parser_for_small_response::Generic}; -use serde::Serialize; -use serde_json::json; - -#[async_trait] -pub trait Transport: Send + Sync + std::fmt::Debug { - async fn execute_raw(&self, request: String) -> Result, RpcError>; - async fn execute(&self, request: String) -> Result; -} - -//: Real time data no need for cache -#[derive(Debug, Clone)] -pub struct RpcClient { - pub transport: Arc, -} - -/// Represents an RPC request to a Ethereum node -#[derive(Debug, Serialize)] -pub struct RpcRequest { - pub jsonrpc: &'static str, - pub method: &'static str, - pub params: serde_json::Value, - pub id: u64, -} - -impl RpcClient { - pub async fn get_gas_price(&self) -> Result { - let request = RpcRequest { - jsonrpc: "2.0", - method: "eth_gasPrice", - params: json!([]), - id: 1, - }; - - let response = self.execute_raw(request).await?; - - match Generic::parse(&response) { - Some(generic) => { - let bytes = &response[generic.result_start.0..generic.result_start.1]; - Ok(hex_to_u256(&bytes[2..])) - } - None => Err(RpcError::Response("Failed to parse gas price".into())), - } - } - - pub async fn execute_raw(&self, request: RpcRequest) -> Result, RpcError> { - let response = self - .transport - .execute_raw(serde_json::to_string(&request).expect("convert to string")) - .await?; - - Ok(response) - } - - pub async fn execute( - &self, - request: RpcRequest, - ) -> Result { - let response = self - .transport - .execute(serde_json::to_string(&request).expect("convert to string")) - .await?; - - serde_json::from_str(&response).map_err(|e| RpcError::Parse(e.to_string())) - } -} diff --git a/crates/shire/src/lib.rs b/crates/shire/src/lib.rs index 3b11e6a..e6cf0b0 100644 --- a/crates/shire/src/lib.rs +++ b/crates/shire/src/lib.rs @@ -1,8 +1,9 @@ #![feature(trivial_bounds)] +pub mod checkpoint; pub mod concensus; -pub mod concensus_rpc; pub mod libp2p; +pub mod light_client; // Single cache-line optimized fork bitmap || 5 bits per fork || const FORK_BITS: u32 = 0b11111; diff --git a/crates/shire/src/libp2p.rs b/crates/shire/src/libp2p.rs index 3d436f5..1858c21 100644 --- a/crates/shire/src/libp2p.rs +++ b/crates/shire/src/libp2p.rs @@ -1,260 +1,265 @@ -use libp2p::{ - futures::StreamExt, - gossipsub::{self, IdentTopic}, - identity, kad, noise, ping, - swarm::{NetworkBehaviour, SwarmEvent}, - tcp, yamux, Multiaddr, PeerId, Swarm, SwarmBuilder, -}; -use std::error::Error; -use std::time::Duration; -use tokio::{signal, sync::oneshot}; - -/// Ethereum Foundation Go Bootnodes -pub static MAINNET_BOOTNODES : [&str; 4] = [ - "enode://d860a01f9722d78051619d1e2351aba3f43f943f6f00718d1b9baa4101932a1f5011f16bb2b1bb35db20d6fe28fa0bf09636d26a87d31de9ec6203eeedb1f666@18.138.108.67:30303", // bootnode-aws-ap-southeast-1-001 - "enode://22a8232c3abc76a16ae9d6c3b164f98775fe226f0917b0ca871128a74a8e9630b458460865bab457221f1d448dd9791d24c4e5d88786180ac185df813a68d4de@3.209.45.79:30303", // bootnode-aws-us-east-1-001 - "enode://2b252ab6a1d0f971d9722cb839a42cb81db019ba44c08754628ab4a823487071b5695317c8ccd085219c3a03af063495b2f1da8d18218da2d6a82981b45e6ffc@65.108.70.101:30303", // bootnode-hetzner-hel - "enode://4aeb4ab6c14b23e2c4cfdce879c04b0748a20d8e9b59e25ded2a08143e265c6c25936e74cbc8e641e3312ca288673d91f2f93f8e277de3cfa444ecdaaf982052@157.90.35.166:30303", // bootnode-hetzner-fsn -]; - -#[derive(Debug)] -enum LightClientEvent { - Ping(ping::Event), - Gossipsub(gossipsub::Event), - Kademlia(kad::Event), -} - -impl From for LightClientEvent { - fn from(event: ping::Event) -> Self { - LightClientEvent::Ping(event) - } -} - -impl From for LightClientEvent { - fn from(event: gossipsub::Event) -> Self { - LightClientEvent::Gossipsub(event) - } -} - -impl From for LightClientEvent { - fn from(event: kad::Event) -> Self { - LightClientEvent::Kademlia(event) - } -} - -#[derive(NetworkBehaviour)] -#[behaviour(out_event = "LightClientEvent")] -struct LightClientBehaviour { - gossipsub: gossipsub::Behaviour, - kademlia: kad::Behaviour, - ping: ping::Behaviour, -} - -pub struct LightClient { - swarm: Swarm, -} - -impl LightClientBehaviour { - fn new() -> Result> { - let local_key = identity::Keypair::generate_ed25519(); - let local_peer_id = PeerId::from(local_key.public()); - - // Setup Kademlia - let mut kad_config = kad::Config::default(); - kad_config.set_query_timeout(Duration::from_secs(5 * 60)); - - let store = kad::store::MemoryStore::new(local_peer_id); - let mut kademlia = kad::Behaviour::with_config(local_peer_id, store, kad_config); - - // Add bootnodes - for bootnode in MAINNET_BOOTNODES.iter() { - if let Ok(multiaddr) = convert_enode_to_multiaddr(bootnode) { - kademlia.add_address(&local_peer_id, multiaddr); - } - } - - // Setup gossipsub - let gossipsub_config = gossipsub::ConfigBuilder::default() - .flood_publish(true) - .history_length(5) - .validation_mode(gossipsub::ValidationMode::Anonymous) - .build()?; - - let mut gossipsub = - gossipsub::Behaviour::new(gossipsub::MessageAuthenticity::Anonymous, gossipsub_config)?; - - // Subscribe to light client topics - gossipsub.subscribe(&IdentTopic::new( - "/eth2/beacon_chain/light_client/finality_update/1/", - ))?; - gossipsub.subscribe(&IdentTopic::new( - "/eth2/beacon_chain/light_client/optimistic_update/1/", - ))?; - - Ok(Self { - gossipsub, - kademlia, - ping: ping::Behaviour::default(), - }) - } -} - -impl LightClient { - pub fn new() -> Result> { - let behaviour = LightClientBehaviour::new()?; - - let mut swarm = SwarmBuilder::with_new_identity() - .with_tokio() - .with_tcp( - tcp::Config::default(), - noise::Config::new, - yamux::Config::default, - )? - .with_behaviour(|_| Ok(behaviour))? - .build(); - - swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; - - Ok(Self { swarm }) - } - - pub async fn start(&mut self, mut shutdown_rx: oneshot::Receiver<()>) -> Result<(), Box> { - loop { - tokio::select! { - event = self.swarm.select_next_some() => { - match event { - SwarmEvent::NewListenAddr { address, .. } => { - println!("Listening on {:?}", address); - } - SwarmEvent::Behaviour(LightClientEvent::Kademlia(kad::Event::OutboundQueryProgressed { - result: kad::QueryResult::GetClosestPeers(Ok(ok)), - .. - })) => { - println!("Found closest peers: {:?}", ok.peers); - } - SwarmEvent::Behaviour(event) => { - println!("Other event: {:?}", event); - } - _ => {} - } - } - _ = &mut shutdown_rx => { - println!("Received shutdown signal. Stopping..."); - break; - } - } - } - Ok(()) - } -} - -fn convert_enode_to_multiaddr(enode: &str) -> Result> { - // Parse enode URL format: enode://pubkey@ip:port - let parts: Vec<&str> = enode.strip_prefix("enode://").unwrap().split('@').collect(); - - let addr_port: Vec<&str> = parts[1].split(':').collect(); - let ip = addr_port[0]; - let port = addr_port[1]; - - // Convert to multiaddr format - let multiaddr = format!("/ip4/{}/tcp/{}", ip, port).parse()?; - - Ok(multiaddr) -} - -#[cfg(test)] -mod tests { - use tokio::sync::broadcast; - - use super::*; - - #[tokio::test] - async fn test_light_client_network() -> Result<(), Box> { - println!("Initializing Swarm..."); - - let mut client1 = LightClient::new()?; - let mut client2 = LightClient::new()?; - - // Store peer IDs before moving clients - let client1_id = client1.swarm.local_peer_id().clone(); - let client2_id = client2.swarm.local_peer_id().clone(); - - // Create broadcast channel for shutdown signal - let (shutdown_tx, _) = broadcast::channel::<()>(1); - let mut shutdown_rx1 = shutdown_tx.subscribe(); - let mut shutdown_rx2 = shutdown_tx.subscribe(); - - // Create channels to receive peer discovery events - let (peers_tx1, mut peers_rx1) = tokio::sync::mpsc::channel(1); - let (peers_tx2, mut peers_rx2) = tokio::sync::mpsc::channel(1); - - // Start both clients with peer discovery reporting - let client1_handle = tokio::spawn(async move { - loop { - tokio::select! { - event = client1.swarm.select_next_some() => { - if let SwarmEvent::Behaviour(LightClientEvent::Kademlia( - - kad::Event::RoutingUpdated { peer, .. } - - )) = event { - let _ = peers_tx1.send(peer).await; - } - } - Ok(_) = shutdown_rx1.recv() => break, - else => break, - } - } - }); - - - let client2_handle = tokio::spawn(async move { - loop { - tokio::select! { - event = client2.swarm.select_next_some() => { - if let SwarmEvent::Behaviour(LightClientEvent::Kademlia( - kad::Event::RoutingUpdated { peer, .. } - )) = event { - let _ = peers_tx2.send(peer).await; - } - } - Ok(_) = shutdown_rx2.recv() => break, // Call recv() to get a Future - else => break, - } - } - }); - // Wait for peer discovery - let mut client1_found = false; - let mut client2_found = false; - - tokio::select! { - _ = async { - while let Some(peer) = peers_rx1.recv().await { - if peer == client2_id { - client2_found = true; - break; - } - } - while let Some(peer) = peers_rx2.recv().await { - if peer == client1_id { - client1_found = true; - break; - } - } - } => {}, - _ = tokio::time::sleep(Duration::from_secs(5)) => {}, - } - - // Send shutdown signals - let _ = shutdown_tx.send(()); - - // Wait for tasks to exit cleanly - let _ = client1_handle.await; - let _ = client2_handle.await; - - println!("Client 1 found client 2: {}", client1_found); - - assert!(client1_found && client2_found, "Peers should have discovered each other"); - Ok(()) - } -} \ No newline at end of file +// use libp2p::{ +// futures::StreamExt, +// gossipsub::{self, IdentTopic}, +// identity, kad, noise, ping, +// swarm::{NetworkBehaviour, SwarmEvent}, +// tcp, yamux, Multiaddr, PeerId, Swarm, SwarmBuilder, +// }; +// use std::error::Error; +// use std::time::Duration; +// use tokio::{signal, sync::oneshot}; + +// /// Ethereum Foundation Go Bootnodes +// pub static MAINNET_BOOTNODES : [&str; 4] = [ +// "enode://d860a01f9722d78051619d1e2351aba3f43f943f6f00718d1b9baa4101932a1f5011f16bb2b1bb35db20d6fe28fa0bf09636d26a87d31de9ec6203eeedb1f666@18.138.108.67:30303", // bootnode-aws-ap-southeast-1-001 +// "enode://22a8232c3abc76a16ae9d6c3b164f98775fe226f0917b0ca871128a74a8e9630b458460865bab457221f1d448dd9791d24c4e5d88786180ac185df813a68d4de@3.209.45.79:30303", // bootnode-aws-us-east-1-001 +// "enode://2b252ab6a1d0f971d9722cb839a42cb81db019ba44c08754628ab4a823487071b5695317c8ccd085219c3a03af063495b2f1da8d18218da2d6a82981b45e6ffc@65.108.70.101:30303", // bootnode-hetzner-hel +// "enode://4aeb4ab6c14b23e2c4cfdce879c04b0748a20d8e9b59e25ded2a08143e265c6c25936e74cbc8e641e3312ca288673d91f2f93f8e277de3cfa444ecdaaf982052@157.90.35.166:30303", // bootnode-hetzner-fsn +// ]; + +// #[derive(Debug)] +// enum LightClientEvent { +// Ping(ping::Event), +// Gossipsub(gossipsub::Event), +// Kademlia(kad::Event), +// } + +// impl From for LightClientEvent { +// fn from(event: ping::Event) -> Self { +// LightClientEvent::Ping(event) +// } +// } + +// impl From for LightClientEvent { +// fn from(event: gossipsub::Event) -> Self { +// LightClientEvent::Gossipsub(event) +// } +// } + +// impl From for LightClientEvent { +// fn from(event: kad::Event) -> Self { +// LightClientEvent::Kademlia(event) +// } +// } + +// #[derive(NetworkBehaviour)] +// #[behaviour(out_event = "LightClientEvent")] +// struct LightClientBehaviour { +// gossipsub: gossipsub::Behaviour, +// kademlia: kad::Behaviour, +// ping: ping::Behaviour, +// } + +// pub struct LightClient { +// swarm: Swarm, +// } + +// impl LightClientBehaviour { +// fn new() -> Result> { +// let local_key = identity::Keypair::generate_ed25519(); +// let local_peer_id = PeerId::from(local_key.public()); + +// // Setup Kademlia +// let mut kad_config = kad::Config::default(); +// kad_config.set_query_timeout(Duration::from_secs(5 * 60)); + +// let store = kad::store::MemoryStore::new(local_peer_id); +// let mut kademlia = kad::Behaviour::with_config(local_peer_id, store, kad_config); + +// // Add bootnodes +// for bootnode in MAINNET_BOOTNODES.iter() { +// if let Ok(multiaddr) = convert_enode_to_multiaddr(bootnode) { +// kademlia.add_address(&local_peer_id, multiaddr); +// } +// } + +// // Setup gossipsub +// let gossipsub_config = gossipsub::ConfigBuilder::default() +// .flood_publish(true) +// .history_length(5) +// .validation_mode(gossipsub::ValidationMode::Anonymous) +// .build()?; + +// let mut gossipsub = +// gossipsub::Behaviour::new(gossipsub::MessageAuthenticity::Anonymous, gossipsub_config)?; + +// // Subscribe to light client topics +// gossipsub.subscribe(&IdentTopic::new( +// "/eth2/beacon_chain/light_client/finality_update/1/", +// ))?; +// gossipsub.subscribe(&IdentTopic::new( +// "/eth2/beacon_chain/light_client/optimistic_update/1/", +// ))?; + +// Ok(Self { +// gossipsub, +// kademlia, +// ping: ping::Behaviour::default(), +// }) +// } +// } + +// impl LightClient { +// pub fn new() -> Result> { +// let behaviour = LightClientBehaviour::new()?; + +// let mut swarm = SwarmBuilder::with_new_identity() +// .with_tokio() +// .with_tcp( +// tcp::Config::default(), +// noise::Config::new, +// yamux::Config::default, +// )? +// .with_behaviour(|_| Ok(behaviour))? +// .build(); + +// swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; + +// Ok(Self { swarm }) +// } + +// pub async fn start( +// &mut self, +// mut shutdown_rx: oneshot::Receiver<()>, +// ) -> Result<(), Box> { +// loop { +// tokio::select! { +// event = self.swarm.select_next_some() => { +// match event { +// SwarmEvent::NewListenAddr { address, .. } => { +// println!("Listening on {:?}", address); +// } +// SwarmEvent::Behaviour(LightClientEvent::Kademlia(kad::Event::OutboundQueryProgressed { +// result: kad::QueryResult::GetClosestPeers(Ok(ok)), +// .. +// })) => { +// println!("Found closest peers: {:?}", ok.peers); +// } +// SwarmEvent::Behaviour(event) => { +// println!("Other event: {:?}", event); +// } +// _ => {} +// } +// } +// _ = &mut shutdown_rx => { +// println!("Received shutdown signal. Stopping..."); +// break; +// } +// } +// } +// Ok(()) +// } +// } + +// fn convert_enode_to_multiaddr(enode: &str) -> Result> { +// // Parse enode URL format: enode://pubkey@ip:port +// let parts: Vec<&str> = enode.strip_prefix("enode://").unwrap().split('@').collect(); + +// let addr_port: Vec<&str> = parts[1].split(':').collect(); +// let ip = addr_port[0]; +// let port = addr_port[1]; + +// // Convert to multiaddr format +// let multiaddr = format!("/ip4/{}/tcp/{}", ip, port).parse()?; + +// Ok(multiaddr) +// } + +// #[cfg(test)] +// mod tests { +// use tokio::sync::broadcast; + +// use super::*; + +// #[tokio::test] +// async fn test_light_client_network() -> Result<(), Box> { +// println!("Initializing Swarm..."); + +// let mut client1 = LightClient::new()?; +// let mut client2 = LightClient::new()?; + +// // Store peer IDs before moving clients +// let client1_id = client1.swarm.local_peer_id().clone(); +// let client2_id = client2.swarm.local_peer_id().clone(); + +// // Create broadcast channel for shutdown signal +// let (shutdown_tx, _) = broadcast::channel::<()>(1); +// let mut shutdown_rx1 = shutdown_tx.subscribe(); +// let mut shutdown_rx2 = shutdown_tx.subscribe(); + +// // Create channels to receive peer discovery events +// let (peers_tx1, mut peers_rx1) = tokio::sync::mpsc::channel(1); +// let (peers_tx2, mut peers_rx2) = tokio::sync::mpsc::channel(1); + +// // Start both clients with peer discovery reporting +// let client1_handle = tokio::spawn(async move { +// loop { +// tokio::select! { +// event = client1.swarm.select_next_some() => { +// if let SwarmEvent::Behaviour(LightClientEvent::Kademlia( + +// kad::Event::RoutingUpdated { peer, .. } + +// )) = event { +// let _ = peers_tx1.send(peer).await; +// } +// } +// Ok(_) = shutdown_rx1.recv() => break, +// else => break, +// } +// } +// }); + +// let client2_handle = tokio::spawn(async move { +// loop { +// tokio::select! { +// event = client2.swarm.select_next_some() => { +// if let SwarmEvent::Behaviour(LightClientEvent::Kademlia( +// kad::Event::RoutingUpdated { peer, .. } +// )) = event { +// let _ = peers_tx2.send(peer).await; +// } +// } +// Ok(_) = shutdown_rx2.recv() => break, // Call recv() to get a Future +// else => break, +// } +// } +// }); +// // Wait for peer discovery +// let mut client1_found = false; +// let mut client2_found = false; + +// tokio::select! { +// _ = async { +// while let Some(peer) = peers_rx1.recv().await { +// if peer == client2_id { +// client2_found = true; +// break; +// } +// } +// while let Some(peer) = peers_rx2.recv().await { +// if peer == client1_id { +// client1_found = true; +// break; +// } +// } +// } => {}, +// _ = tokio::time::sleep(Duration::from_secs(5)) => {}, +// } + +// // Send shutdown signals +// let _ = shutdown_tx.send(()); + +// // Wait for tasks to exit cleanly +// let _ = client1_handle.await; +// let _ = client2_handle.await; + +// println!("Client 1 found client 2: {}", client1_found); + +// assert!( +// client1_found && client2_found, +// "Peers should have discovered each other" +// ); +// Ok(()) +// } +// } diff --git a/crates/shire/src/light_client.rs b/crates/shire/src/light_client.rs index e69de29..1890323 100644 --- a/crates/shire/src/light_client.rs +++ b/crates/shire/src/light_client.rs @@ -0,0 +1,338 @@ +use std::time::Duration; + +use alloy_primitives::B256; +use futures::future::join_all; +use mordor::{SlotSynchronizer, SLOTS_PER_PERIOD}; +use parser::{ + beacon::{ + light_client_bootstrap::LightClientBootstrap, + light_client_update::Updates, + light_finality_update::{self, FinalityUpdate}, + light_optimistic_update::LightOptimisticUpdate, + }, + types::{Beacon, LightClientOptimisticUpdate, LightClientUpdate, SyncCommittee}, +}; +use reqwest::Client; +use thiserror::Error; +use tokio::task; + +use crate::concensus::ConsensusError; + +/// Stores the current state of a light client, including finalized and optimistic headers, +/// and sync committee information + +#[derive(Debug, Default, Clone)] +pub struct LightClientStore { + pub finalized_header: Beacon, + pub optimistic_header: Beacon, + pub attested_header: Beacon, + pub current_sync_committee: SyncCommittee, + pub next_sync_committee: Option, +} + +/// Manages the synchronization process for a light client by coordinating updates +/// and maintaining the client state + +#[derive(Debug, Default, Clone)] +pub struct LightClientSyncer { + pub client: LightClient, + pub slot_sync: SlotSynchronizer, + pub store: Option, +} + +/// A light client implementation for interacting with Ethereum beacon chain endpoints. +/// This client supports concurrent querying of multiple endpoints for redundancy and +/// consensus verification. + +#[derive(Debug, Default, Clone)] +pub struct LightClient { + pub endpoints: Vec, + pub client: Client, +} + +impl LightClientSyncer { + pub fn new(endpoints: Vec) -> Self { + let client = Client::builder() + .timeout(Duration::from_secs(5)) + .build() + .unwrap(); + + Self { + client: LightClient { endpoints, client }, + slot_sync: SlotSynchronizer::default(), + store: None, + } + } + + /// Retrieves the latest finality update from the beacon chain. + /// + /// Queries all configured endpoints concurrently and selects the popular response + /// + /// # Returns + /// - `Result`: The latest finality update or error + pub async fn get_latest_finality_update(&self) -> Result { + let mut responses = Vec::new(); + + // Query all endpoints concurrently + let results = futures::future::join_all(self.client.endpoints.iter().map(|endpoint| { + self.client + .client + .get(format!( + "{}/eth/v1/beacon/light_client/finality_update", + endpoint + )) + .send() + })) + .await; + + // Collect responses with signatures + for response in results { + if let Ok(resp) = response { + let input = resp + .bytes() + .await + .map(|b| b.to_vec()) + .map_err(|_| ConsensusError::Parse)?; + + let update = FinalityUpdate::parse(&input).ok_or(ConsensusError::Parse)?; + + responses.push((update.sync_aggregate.sync_committee_signature, update)); + } + } + + responses.sort_by(|a, b| a.0.cmp(&b.0)); + Ok(responses[0].1.clone()) + } + + /// Retrieves light client updates for a specific period. + /// + /// # Arguments + /// * `period` - The sync committee period to query + /// * `count` - Number of updates to retrieve + /// + /// # Returns + /// - `Result`: The requested updates or error + pub async fn get_latest_update( + &self, + period: u64, + count: u64, + ) -> Result { + let mut responses = Vec::new(); + + // Query all endpoints concurrently + let results = futures::future::join_all(self.client.endpoints.iter().map(|endpoint| { + self.client + .client + .get(format!( + "{}/eth/v1/beacon/light_client/updates?period={}&count={}", + endpoint, period, count + )) + .send() + })) + .await; + + // Collect responses with signatures + for response in results { + if let Ok(resp) = response { + let input = resp + .bytes() + .await + .map(|b| b.to_vec()) + .map_err(|_| ConsensusError::Parse)?; + + let update = Updates::parse(&input).ok_or(ConsensusError::Parse)?; + + responses.push((update.sync_aggregate.sync_committee_signature, update)); + } + } + + responses.sort_by(|a, b| a.0.cmp(&b.0)); + Ok(responses[0].1.clone()) + } + + /// Retrieves the light client bootstrap data for a specific block. + /// + /// # Arguments + /// * `block_root` - The root hash of the block to bootstrap from + /// + /// # Returns + /// - `Result`: Bootstrap data or error + pub async fn get_bootstrap( + &self, + block_root: B256, + ) -> Result { + let mut responses = Vec::new(); + + // Query all endpoints concurrently + let results = futures::future::join_all(self.client.endpoints.iter().map(|endpoint| { + self.client + .client + .get(format!( + "{}/eth/v1/beacon/light_client/bootstrap/{}", + endpoint, block_root, + )) + .send() + })) + .await; + + // Collect responses with signatures + for response in results { + if let Ok(resp) = response { + let input = resp + .bytes() + .await + .map(|b| b.to_vec()) + .map_err(|_| ConsensusError::Parse)?; + + let update = LightClientBootstrap::parse(&input).ok_or(ConsensusError::Parse)?; + + responses.push((update.current_sync_committee.aggregate_pubkey, update)); + } + } + + responses.sort_by(|a, b| a.0.cmp(&b.0)); + Ok(responses[0].1.clone()) + } + + /// SIGNICANT ISSUE: WRONG RETURN TYPE + /// Retrieves the latest optimistic update from the beacon chain. + /// + /// Similar to finality update but for optimistic sync data. + /// + /// # Returns + /// - `Result`: Latest optimistic update or error + pub async fn get_optimistic_update(&self) -> Result { + let mut responses = Vec::new(); + + // Query all endpoints concurrently + let results = futures::future::join_all(self.client.endpoints.iter().map(|endpoint| { + self.client + .client + .get(format!( + "{}/eth/v1/beacon/light_client/optimistic_update", + endpoint, + )) + .send() + })) + .await; + + // Collect responses with signatures + for response in results { + if let Ok(resp) = response { + let input = resp + .bytes() + .await + .map(|b| b.to_vec()) + .map_err(|_| ConsensusError::Parse)?; + + let update = LightOptimisticUpdate::parse(&input).ok_or(ConsensusError::Parse)?; + + responses.push((update.sync_aggregate.sync_committee_signature, update)); + } + } + + responses.sort_by(|a, b| a.0.cmp(&b.0)); + Ok(responses[0].1.clone()) + } + + pub fn get_sync_committee_period(&self, slot: u64) -> u64 { + slot / SLOTS_PER_PERIOD + } + + pub fn is_next_sync_committee_known(&self) -> bool { + self.store + .as_ref() + .map(|store| store.next_sync_committee.is_some()) + .unwrap_or(false) + } + + /// Initialize a new light client store from bootstrap data + /// ISSUE: since it's the same header why not combine it instead of clone + pub async fn initialize_store( + &mut self, + trusted_block_root: B256, + ) -> Result<(), ConsensusError> { + let bootstrap = self.get_bootstrap(trusted_block_root).await?; + + self.store = Some(LightClientStore { + finalized_header: bootstrap.header.beacon.clone(), + optimistic_header: bootstrap.header.beacon.clone(), + attested_header: bootstrap.header.beacon, + current_sync_committee: bootstrap.current_sync_committee, + next_sync_committee: None, + }); + + Ok(()) + } + + /// Main sync loop that keeps the light client in sync with the network + pub async fn sync(&mut self, trusted_block_root: B256) -> Result<(), ConsensusError> { + // Initialize store if needed + if self.store.is_none() { + self.initialize_store(trusted_block_root).await?; + } + let mut store = self.store.clone().ok_or(ConsensusError::Parse)?; + + loop { + // apply_light_client_update // + let current_slot = self + .slot_sync + .current_slot() + .map_err(|_| ConsensusError::Parse)?; + let finalized_period = self.get_sync_committee_period(store.finalized_header.slot.to()); + let optimistic_period = + self.get_sync_committee_period(store.optimistic_header.slot.to()); + let current_period = self.get_sync_committee_period(current_slot); + + if finalized_period == optimistic_period && !self.is_next_sync_committee_known() { + let updates = self.get_latest_update(finalized_period, 1).await?; + store.next_sync_committee = Some(updates.next_sync_committee); + } + + if finalized_period + 1 < current_period { + for period in (finalized_period + 1)..current_period { + let updates = self.get_latest_update(period, 1).await?; + + // Apply the update to move forward + store.finalized_header = updates.finalized_header; + store.optimistic_header = updates.attested_header; + store.current_sync_committee = updates.next_sync_committee; + store.next_sync_committee = None; + } + } + + // Case 3: Stay up to date with latest updates + if finalized_period + 1 >= current_period { + // Process finality update + if let Ok(finality_update) = self.get_latest_finality_update().await { + store.finalized_header = finality_update.finalized_header; + store.optimistic_header = finality_update.attested_header; + } + + // Process optimistic update + if let Ok(optimistic_update) = self.get_optimistic_update().await { + store.optimistic_header = optimistic_update.attested_header; + } + } + + let wait_time = self + .slot_sync + .time_until_next_slot() + .map_err(|_| ConsensusError::Parse)?; + tokio::time::sleep(wait_time).await; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_sync() { + let a = LightClientSyncer::new(vec!["https://eth-beacon-chain.drpc.org/rest/".to_string()]); + // println!("{:?}", a.get_latest_finality_update().await); + println!("{:?}", a.get_latest_update(1, 1).await); + // println!("{:?}", a.get_optimistic_update().await); + } +} diff --git a/rustc-ice-2025-02-11T12_59_39-34292.txt b/rustc-ice-2025-02-11T12_59_39-34292.txt new file mode 100644 index 0000000..4873966 --- /dev/null +++ b/rustc-ice-2025-02-11T12_59_39-34292.txt @@ -0,0 +1,44 @@ +thread 'rustc' panicked at compiler/rustc_metadata/src/rmeta/decoder/cstore_impl.rs:716:32: +Box +stack backtrace: + 0: 0x10feafb64 - std::backtrace::Backtrace::create::h38331fefa18b97b9 + 1: 0x10dd9f904 - as core[65ac7e790faddaa0]::ops::function::Fn<(&dyn for<'a, 'b> core[65ac7e790faddaa0]::ops::function::Fn<(&'a std[4265e78211a8ccfd]::panic::PanicHookInfo<'b>,), Output = ()> + core[65ac7e790faddaa0]::marker::Sync + core[65ac7e790faddaa0]::marker::Send, &std[4265e78211a8ccfd]::panic::PanicHookInfo)>>::call + 2: 0x10feca2f8 - std::panicking::rust_panic_with_hook::h6965295fc494c3ec + 3: 0x10de48628 - std[4265e78211a8ccfd]::panicking::begin_panic::::{closure#0} + 4: 0x10de4834c - std[4265e78211a8ccfd]::sys::backtrace::__rust_end_short_backtrace::::{closure#0}, !> + 5: 0x1125cfff8 - std[4265e78211a8ccfd]::panicking::begin_panic:: + 6: 0x10de624dc - ::emit_producing_guarantee + 7: 0x10ea7a1d0 - rustc_middle[99ab855d0efbbcf5]::util::bug::opt_span_bug_fmt::::{closure#0} + 8: 0x10ea72cb8 - rustc_middle[99ab855d0efbbcf5]::ty::context::tls::with_opt::::{closure#0}, !>::{closure#0} + 9: 0x10ea72c84 - rustc_middle[99ab855d0efbbcf5]::ty::context::tls::with_context_opt::::{closure#0}, !>::{closure#0}, !> + 10: 0x112673584 - rustc_middle[99ab855d0efbbcf5]::util::bug::bug_fmt + 11: 0x10e9bdce4 - >::call_once + 12: 0x10ea4e434 - ::def_path_hash_to_def_id + 13: 0x10f3f5ba8 - ::{closure#0} as core[65ac7e790faddaa0]::ops::function::FnOnce<(rustc_middle[99ab855d0efbbcf5]::ty::context::TyCtxt, rustc_query_system[30ffef2ab3376262]::dep_graph::dep_node::DepNode)>>::call_once + 14: 0x10f5e1b94 - >::try_mark_previous_green:: + 15: 0x10f5e1924 - >::try_mark_green:: + 16: 0x10f31c774 - rustc_query_system[30ffef2ab3376262]::query::plumbing::ensure_must_run::>, false, false, false>, rustc_query_impl[fee6ea5a1897d0d4]::plumbing::QueryCtxt> + 17: 0x10f63f030 - rustc_query_impl[fee6ea5a1897d0d4]::query_impl::proc_macro_decls_static::get_query_incr::__rust_end_short_backtrace + 18: 0x10e6a1438 - ::time::<(), rustc_interface[d3777d291916c9e7]::passes::run_required_analyses::{closure#0}::{closure#1}::{closure#0}::{closure#6}::{closure#1}> + 19: 0x10e65d0e4 - ::run::<(), rustc_interface[d3777d291916c9e7]::passes::run_required_analyses::{closure#0}::{closure#0}::{closure#0}> + 20: 0x10e6a1a44 - ::time::<(), rustc_interface[d3777d291916c9e7]::passes::run_required_analyses::{closure#0}> + 21: 0x10e69baa0 - rustc_interface[d3777d291916c9e7]::passes::analysis + 22: 0x10f425bf0 - rustc_query_impl[fee6ea5a1897d0d4]::plumbing::__rust_begin_short_backtrace::> + 23: 0x10f578f5c - >::call_once + 24: 0x10f342270 - rustc_query_system[30ffef2ab3376262]::query::plumbing::try_execute_query::>, false, false, false>, rustc_query_impl[fee6ea5a1897d0d4]::plumbing::QueryCtxt, true> + 25: 0x10f617908 - rustc_query_impl[fee6ea5a1897d0d4]::query_impl::analysis::get_query_incr::__rust_end_short_backtrace + 26: 0x10dd883b0 - >::enter::, rustc_driver_impl[818fc7bc80f3f00c]::run_compiler::{closure#0}::{closure#1}::{closure#5}> + 27: 0x10dd59fd0 - ::enter::, rustc_span[5f3f762fc4fb81dd]::ErrorGuaranteed>> + 28: 0x10ddd0e5c - rustc_span[5f3f762fc4fb81dd]::create_session_globals_then::, rustc_interface[d3777d291916c9e7]::util::run_in_thread_with_globals, rustc_driver_impl[818fc7bc80f3f00c]::run_compiler::{closure#0}>::{closure#1}, core[65ac7e790faddaa0]::result::Result<(), rustc_span[5f3f762fc4fb81dd]::ErrorGuaranteed>>::{closure#0}, core[65ac7e790faddaa0]::result::Result<(), rustc_span[5f3f762fc4fb81dd]::ErrorGuaranteed>>::{closure#0}::{closure#0}::{closure#0}> + 29: 0x10dd8a0f8 - std[4265e78211a8ccfd]::sys::backtrace::__rust_begin_short_backtrace::, rustc_driver_impl[818fc7bc80f3f00c]::run_compiler::{closure#0}>::{closure#1}, core[65ac7e790faddaa0]::result::Result<(), rustc_span[5f3f762fc4fb81dd]::ErrorGuaranteed>>::{closure#0}, core[65ac7e790faddaa0]::result::Result<(), rustc_span[5f3f762fc4fb81dd]::ErrorGuaranteed>>::{closure#0}::{closure#0}, core[65ac7e790faddaa0]::result::Result<(), rustc_span[5f3f762fc4fb81dd]::ErrorGuaranteed>> + 30: 0x10dd8013c - <::spawn_unchecked_, rustc_driver_impl[818fc7bc80f3f00c]::run_compiler::{closure#0}>::{closure#1}, core[65ac7e790faddaa0]::result::Result<(), rustc_span[5f3f762fc4fb81dd]::ErrorGuaranteed>>::{closure#0}, core[65ac7e790faddaa0]::result::Result<(), rustc_span[5f3f762fc4fb81dd]::ErrorGuaranteed>>::{closure#0}::{closure#0}, core[65ac7e790faddaa0]::result::Result<(), rustc_span[5f3f762fc4fb81dd]::ErrorGuaranteed>>::{closure#1} as core[65ac7e790faddaa0]::ops::function::FnOnce<()>>::call_once::{shim:vtable#0} + 31: 0x10fed4724 - std::sys::pal::unix::thread::Thread::new::thread_start::h6b98e9ae71cc0684 + 32: 0x184cc42e4 - __pthread_deallocate + + +rustc version: 1.84.0-nightly (da935398d 2024-10-19) +platform: aarch64-apple-darwin + +query stack during panic: +#0 [analysis] running analysis passes on this crate +end of query stack