diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml index a5e9263d60f..88caac75b51 100644 --- a/.github/workflows/test-suite.yml +++ b/.github/workflows/test-suite.yml @@ -29,6 +29,31 @@ env: # Enable portable to prevent issues with caching `blst` for the wrong CPU type TEST_FEATURES: portable jobs: + check-labels: + runs-on: ubuntu-latest + name: Check for 'skip-ci' label + outputs: + skip_ci: ${{ steps.set-output.outputs.SKIP_CI }} + steps: + - name: check for skip-ci label + id: set-output + env: + LABELS: ${{ toJson(github.event.pull_request.labels) }} + run: | + SKIP_CI="false" + if [ -z "${LABELS}" ]; then + LABELS="none"; + else + LABELS=$(echo ${LABELS} | jq -r '.[].name') + fi + for label in ${LABELS}; do + if [ "$label" = "skip-ci" ]; then + SKIP_CI="true" + break + fi + done + echo "::set-output name=skip_ci::$SKIP_CI" + target-branch-check: name: target-branch-check runs-on: ubuntu-latest @@ -38,6 +63,8 @@ jobs: run: test ${{ github.base_ref }} != "stable" release-tests-ubuntu: name: release-tests-ubuntu + needs: [check-labels] + if: needs.check-labels.outputs.skip_ci != 'true' # Use self-hosted runners only on the sigp repo. runs-on: ${{ github.repository == 'sigp/lighthouse' && fromJson('["self-hosted", "linux", "CI", "large"]') || 'ubuntu-latest' }} steps: @@ -61,43 +88,47 @@ jobs: - name: Show cache stats if: env.SELF_HOSTED_RUNNERS == 'true' run: sccache --show-stats - # FIXME(das): disabled for now as the c-kzg-4844 `das` branch doesn't build on windows. - # release-tests-windows: - # name: release-tests-windows - # runs-on: ${{ github.repository == 'sigp/lighthouse' && fromJson('["self-hosted", "windows", "CI"]') || 'windows-2019' }} - # steps: - # - uses: actions/checkout@v4 - # - name: Get latest version of stable Rust - # if: env.SELF_HOSTED_RUNNERS == 'false' - # uses: moonrepo/setup-rust@v1 - # with: - # channel: stable - # cache-target: release - # bins: cargo-nextest - # env: - # GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - # - name: Install Foundry (anvil) - # if: env.SELF_HOSTED_RUNNERS == 'false' - # uses: foundry-rs/foundry-toolchain@v1 - # with: - # version: nightly-ca67d15f4abd46394b324c50e21e66f306a1162d - # - name: Install make - # if: env.SELF_HOSTED_RUNNERS == 'false' - # run: choco install -y make - ## - uses: KyleMayes/install-llvm-action@v1 - ## if: env.SELF_HOSTED_RUNNERS == 'false' - ## with: - ## version: "16.0" - ## directory: ${{ runner.temp }}/llvm - # - name: Set LIBCLANG_PATH - # run: echo "LIBCLANG_PATH=$((gcm clang).source -replace "clang.exe")" >> $env:GITHUB_ENV - # - name: Run tests in release - # run: make nextest-release - # - name: Show cache stats - # if: env.SELF_HOSTED_RUNNERS == 'true' - # run: sccache --show-stats +# FIXME(das): disabled for now as the c-kzg-4844 `das` branch doesn't build on windows. +# release-tests-windows: +# name: release-tests-windows +# needs: [check-labels] +# if: needs.check-labels.outputs.skip_ci != 'true' +# runs-on: ${{ github.repository == 'sigp/lighthouse' && fromJson('["self-hosted", "windows", "CI"]') || 'windows-2019' }} +# steps: +# - uses: actions/checkout@v4 +# - name: Get latest version of stable Rust +# if: env.SELF_HOSTED_RUNNERS == 'false' +# uses: moonrepo/setup-rust@v1 +# with: +# channel: stable +# cache-target: release +# bins: cargo-nextest +# env: +# GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} +# - name: Install Foundry (anvil) +# if: env.SELF_HOSTED_RUNNERS == 'false' +# uses: foundry-rs/foundry-toolchain@v1 +# with: +# version: nightly-ca67d15f4abd46394b324c50e21e66f306a1162d +# - name: Install make +# if: env.SELF_HOSTED_RUNNERS == 'false' +# run: choco install -y make +## - uses: KyleMayes/install-llvm-action@v1 +## if: env.SELF_HOSTED_RUNNERS == 'false' +## with: +## version: "16.0" +## directory: ${{ runner.temp }}/llvm +# - name: Set LIBCLANG_PATH +# run: echo "LIBCLANG_PATH=$((gcm clang).source -replace "clang.exe")" >> $env:GITHUB_ENV +# - name: Run tests in release +# run: make nextest-release +# - name: Show cache stats +# if: env.SELF_HOSTED_RUNNERS == 'true' +# run: sccache --show-stats beacon-chain-tests: name: beacon-chain-tests + needs: [check-labels] + if: needs.check-labels.outputs.skip_ci != 'true' # Use self-hosted runners only on the sigp repo. runs-on: ${{ github.repository == 'sigp/lighthouse' && fromJson('["self-hosted", "linux", "CI", "large"]') || 'ubuntu-latest' }} env: @@ -118,6 +149,8 @@ jobs: run: sccache --show-stats op-pool-tests: name: op-pool-tests + needs: [check-labels] + if: needs.check-labels.outputs.skip_ci != 'true' runs-on: ubuntu-latest env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} @@ -133,6 +166,8 @@ jobs: run: make test-op-pool network-tests: name: network-tests + needs: [check-labels] + if: needs.check-labels.outputs.skip_ci != 'true' runs-on: ubuntu-latest env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} @@ -148,6 +183,8 @@ jobs: run: make test-network slasher-tests: name: slasher-tests + needs: [check-labels] + if: needs.check-labels.outputs.skip_ci != 'true' runs-on: ubuntu-latest env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} @@ -163,6 +200,8 @@ jobs: run: make test-slasher debug-tests-ubuntu: name: debug-tests-ubuntu + needs: [check-labels] + if: needs.check-labels.outputs.skip_ci != 'true' # Use self-hosted runners only on the sigp repo. runs-on: ${{ github.repository == 'sigp/lighthouse' && fromJson('["self-hosted", "linux", "CI", "large"]') || 'ubuntu-latest' }} env: @@ -187,6 +226,8 @@ jobs: run: sccache --show-stats state-transition-vectors-ubuntu: name: state-transition-vectors-ubuntu + needs: [check-labels] + if: needs.check-labels.outputs.skip_ci != 'true' runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 @@ -199,6 +240,8 @@ jobs: run: make run-state-transition-tests ef-tests-ubuntu: name: ef-tests-ubuntu + needs: [check-labels] + if: needs.check-labels.outputs.skip_ci != 'true' # Use self-hosted runners only on the sigp repo. runs-on: ${{ github.repository == 'sigp/lighthouse' && fromJson('["self-hosted", "linux", "CI", "small"]') || 'ubuntu-latest' }} env: @@ -219,6 +262,8 @@ jobs: run: sccache --show-stats dockerfile-ubuntu: name: dockerfile-ubuntu + needs: [check-labels] + if: needs.check-labels.outputs.skip_ci != 'true' runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 @@ -228,6 +273,8 @@ jobs: run: docker run -t lighthouse:local lighthouse --version basic-simulator-ubuntu: name: basic-simulator-ubuntu + needs: [check-labels] + if: needs.check-labels.outputs.skip_ci != 'true' runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 @@ -240,6 +287,8 @@ jobs: run: cargo run --release --bin simulator basic-sim fallback-simulator-ubuntu: name: fallback-simulator-ubuntu + needs: [check-labels] + if: needs.check-labels.outputs.skip_ci != 'true' runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 @@ -252,6 +301,8 @@ jobs: run: cargo run --release --bin simulator fallback-sim doppelganger-protection-test: name: doppelganger-protection-test + needs: [check-labels] + if: needs.check-labels.outputs.skip_ci != 'true' runs-on: ${{ github.repository == 'sigp/lighthouse' && fromJson('["self-hosted", "linux", "CI", "small"]') || 'ubuntu-latest' }} env: # Enable portable to prevent issues with caching `blst` for the wrong CPU type @@ -286,6 +337,8 @@ jobs: ./doppelganger_protection.sh success genesis.json execution-engine-integration-ubuntu: name: execution-engine-integration-ubuntu + needs: [check-labels] + if: needs.check-labels.outputs.skip_ci != 'true' runs-on: ${{ github.repository == 'sigp/lighthouse' && fromJson('["self-hosted", "linux", "CI", "small"]') || 'ubuntu-latest' }} steps: - uses: actions/checkout@v4 @@ -345,6 +398,8 @@ jobs: run: cargo check --workspace cargo-udeps: name: cargo-udeps + needs: [check-labels] + if: needs.check-labels.outputs.skip_ci != 'true' runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 @@ -367,6 +422,8 @@ jobs: RUSTFLAGS: "" compile-with-beta-compiler: name: compile-with-beta-compiler + needs: [check-labels] + if: needs.check-labels.outputs.skip_ci != 'true' runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 @@ -378,6 +435,8 @@ jobs: run: make cli-check: name: cli-check + needs: [check-labels] + if: needs.check-labels.outputs.skip_ci != 'true' runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 @@ -392,8 +451,10 @@ jobs: # a PR is safe to merge. New jobs should be added here. test-suite-success: name: test-suite-success + if: needs.check-labels.outputs.skip_ci != 'true' runs-on: ubuntu-latest needs: [ + 'check-labels', 'target-branch-check', 'release-tests-ubuntu', # FIXME(das): disabled for now as the c-kzg-4844 `das` branch doesn't build on windows. diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 87e79924d3c..6b7e37f27ea 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -273,15 +273,7 @@ pub async fn publish_block NetworkGlobals { } /// Compute custody data columns the node is assigned to custody. - pub fn custody_columns(&self, _epoch: Epoch) -> Result, &'static str> { + pub fn custody_columns(&self, _epoch: Epoch) -> Vec { let enr = self.local_enr(); let node_id = enr.node_id().raw().into(); let custody_subnet_count = enr.custody_subnet_count::(); - Ok( - DataColumnSubnetId::compute_custody_columns::(node_id, custody_subnet_count) - .collect(), - ) + DataColumnSubnetId::compute_custody_columns::(node_id, custody_subnet_count).collect() } /// TESTING ONLY. Build a dummy NetworkGlobals instance. diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 0eb7b8634ff..915ba36a509 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -952,7 +952,7 @@ impl BackFillSync { Err(e) => { // NOTE: under normal conditions this shouldn't happen but we handle it anyway warn!(self.log, "Could not send batch request"; - "batch_id" => batch_id, "error" => e, &batch); + "batch_id" => batch_id, "error" => ?e, &batch); // register the failed download and check if the batch can be retried if let Err(e) = batch.start_downloading_from_peer(peer, 1) { return self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)); diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index c393f052bab..856d8999f92 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -86,7 +86,7 @@ impl RequestState for BlockRequestState { cx: &mut SyncNetworkContext, ) -> Result { cx.block_lookup_request(id, peer_id, self.requested_block_root) - .map_err(LookupRequestError::SendFailed) + .map_err(LookupRequestError::SendFailedNetwork) } fn send_for_processing( @@ -106,7 +106,7 @@ impl RequestState for BlockRequestState { RpcBlock::new_without_blobs(Some(block_root), value), seen_timestamp, ) - .map_err(LookupRequestError::SendFailed) + .map_err(LookupRequestError::SendFailedProcessor) } fn response_type() -> ResponseType { @@ -139,7 +139,7 @@ impl RequestState for BlobRequestState { self.block_root, downloaded_block_expected_blobs, ) - .map_err(LookupRequestError::SendFailed) + .map_err(LookupRequestError::SendFailedNetwork) } fn send_for_processing( @@ -154,7 +154,7 @@ impl RequestState for BlobRequestState { .. } = download_result; cx.send_blobs_for_processing(id, block_root, value, seen_timestamp) - .map_err(LookupRequestError::SendFailed) + .map_err(LookupRequestError::SendFailedProcessor) } fn response_type() -> ResponseType { @@ -183,7 +183,7 @@ impl RequestState for CustodyRequestState { cx: &mut SyncNetworkContext, ) -> Result { cx.custody_lookup_request(id, self.block_root, downloaded_block_expected_blobs) - .map_err(LookupRequestError::SendFailed) + .map_err(|e| LookupRequestError::SendFailedNetwork(e)) } fn send_for_processing( @@ -203,7 +203,7 @@ impl RequestState for CustodyRequestState { seen_timestamp, BlockProcessType::SingleCustodyColumn(id), ) - .map_err(LookupRequestError::SendFailed) + .map_err(LookupRequestError::SendFailedProcessor) } fn response_type() -> ResponseType { diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 115d9b24a66..692c382e14f 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -2,7 +2,7 @@ use self::parent_chain::{compute_parent_chains, NodeChain}; pub use self::single_block_lookup::DownloadResult; use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup}; use super::manager::{BlockProcessType, BlockProcessingResult}; -use super::network_context::{LookupFailure, PeerGroup, SyncNetworkContext}; +use super::network_context::{PeerGroup, RpcResponseError, SyncNetworkContext}; use crate::metrics; use crate::sync::block_lookups::common::{ResponseType, PARENT_DEPTH_TOLERANCE}; use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor; @@ -301,9 +301,12 @@ impl BlockLookups { }; let result = lookup.continue_requests(cx); - self.on_lookup_result(id, result, "new_current_lookup", cx); - self.update_metrics(); - true + if self.on_lookup_result(id, result, "new_current_lookup", cx) { + self.update_metrics(); + true + } else { + false + } } /* Lookup responses */ @@ -312,7 +315,7 @@ impl BlockLookups { pub fn on_download_response>( &mut self, id: SingleLookupReqId, - response: Result<(R::VerifiedResponseType, PeerGroup, Duration), LookupFailure>, + response: Result<(R::VerifiedResponseType, PeerGroup, Duration), RpcResponseError>, cx: &mut SyncNetworkContext, ) { let result = self.on_download_response_inner::(id, response, cx); @@ -323,7 +326,7 @@ impl BlockLookups { pub fn on_download_response_inner>( &mut self, id: SingleLookupReqId, - response: Result<(R::VerifiedResponseType, PeerGroup, Duration), LookupFailure>, + response: Result<(R::VerifiedResponseType, PeerGroup, Duration), RpcResponseError>, cx: &mut SyncNetworkContext, ) -> Result { // Note: no need to downscore peers here, already downscored on network context @@ -628,15 +631,16 @@ impl BlockLookups { } /// Common handler a lookup request error, drop it and update metrics + /// Returns true if the lookup is created or already exists fn on_lookup_result( &mut self, id: SingleLookupId, result: Result, source: &str, cx: &mut SyncNetworkContext, - ) { + ) -> bool { match result { - Ok(LookupResult::Pending) => {} // no action + Ok(LookupResult::Pending) => true, // no action Ok(LookupResult::Completed) => { if let Some(lookup) = self.single_block_lookups.remove(&id) { debug!(self.log, "Dropping completed lookup"; "block" => ?lookup.block_root(), "id" => id); @@ -647,12 +651,14 @@ impl BlockLookups { } else { debug!(self.log, "Attempting to drop non-existent lookup"; "id" => id); } + false } Err(error) => { debug!(self.log, "Dropping lookup on request error"; "id" => id, "source" => source, "error" => ?error); metrics::inc_counter_vec(&metrics::SYNC_LOOKUP_DROPPED, &[error.into()]); self.drop_lookup_and_children(id); self.update_metrics(); + false } } } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 2c194cd0467..3cac22167e4 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -2,7 +2,9 @@ use super::common::ResponseType; use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS}; use crate::sync::block_lookups::common::RequestState; use crate::sync::block_lookups::Id; -use crate::sync::network_context::{LookupRequestResult, PeerGroup, ReqId, SyncNetworkContext}; +use crate::sync::network_context::{ + LookupRequestResult, PeerGroup, ReqId, RpcRequestSendError, SendErrorProcessor, SyncNetworkContext, +}; use beacon_chain::data_column_verification::CustodyDataColumn; use beacon_chain::BeaconChainTypes; use derivative::Derivative; @@ -34,8 +36,10 @@ pub enum LookupRequestError { }, /// No peers left to serve this lookup NoPeers, - /// Error sending event to network or beacon processor - SendFailed(&'static str), + /// Error sending event to network + SendFailedNetwork(RpcRequestSendError), + /// Error sending event to processor + SendFailedProcessor(SendErrorProcessor), /// Inconsistent lookup request state BadState(String), /// Lookup failed for some other reason and should be dropped diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 17639a6bf6d..f5ffcf88c0e 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -30,7 +30,7 @@ use lighthouse_network::{ Client, Eth2Enr, NetworkGlobals, PeerAction, PeerId, ReportSource, Request, }; pub use requests::LookupVerifyError; -use slog::{debug, error, trace, warn}; +use slog::{debug, error, warn}; use slot_clock::SlotClock; use std::collections::hash_map::Entry; use std::sync::Arc; @@ -70,24 +70,49 @@ pub enum RpcEvent { RPCError(RPCError), } -pub type RpcProcessingResult = Result<(T, Duration), LookupFailure>; +pub type RpcResponseResult = Result<(T, Duration), RpcResponseError>; #[derive(Debug)] -pub enum LookupFailure { +pub enum RpcResponseError { RpcError(RPCError), LookupVerifyError(LookupVerifyError), CustodyRequestError(CustodyRequestError), } -impl From for LookupFailure { +#[derive(Debug, PartialEq, Eq)] +pub enum RpcRequestSendError { + /// Network channel send failed + NetworkSendError, + NoCustodyPeers, + CustodyRequestError(custody::Error), + SlotClockError, +} + +#[derive(Debug, PartialEq, Eq)] +pub enum SendErrorProcessor { + SendError, + ProcessorNotAvailable, +} + +impl std::fmt::Display for RpcResponseError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + RpcResponseError::RpcError(e) => write!(f, "RPC Error: {:?}", e), + RpcResponseError::LookupVerifyError(e) => write!(f, "Lookup Verify Error: {:?}", e), + RpcResponseError::CustodyRequestError(e) => write!(f, "Custody Request Error: {:?}", e), + } + } +} + +impl From for RpcResponseError { fn from(e: RPCError) -> Self { - LookupFailure::RpcError(e) + RpcResponseError::RpcError(e) } } -impl From for LookupFailure { +impl From for RpcResponseError { fn from(e: LookupVerifyError) -> Self { - LookupFailure::LookupVerifyError(e) + RpcResponseError::LookupVerifyError(e) } } @@ -273,7 +298,7 @@ impl SyncNetworkContext { batch_type: ByRangeRequestType, request: BlocksByRangeRequest, sender_id: RangeRequestId, - ) -> Result { + ) -> Result { let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); let id = self.next_id(); debug!( @@ -284,11 +309,13 @@ impl SyncNetworkContext { "epoch" => epoch, "peer" => %peer_id, ); - self.send_network_msg(NetworkMessage::SendRequest { - peer_id, - request: Request::BlocksByRange(request.clone()), - request_id: RequestId::Sync(SyncRequestId::RangeBlockComponents(id)), - })?; + self.network_send + .send(NetworkMessage::SendRequest { + peer_id, + request: Request::BlocksByRange(request.clone()), + request_id: RequestId::Sync(SyncRequestId::RangeBlockComponents(id)), + }) + .map_err(|_| RpcRequestSendError::NetworkSendError)?; let expected_blobs = if matches!(batch_type, ByRangeRequestType::BlocksAndBlobs) { debug!( @@ -301,14 +328,16 @@ impl SyncNetworkContext { ); // Create the blob request based on the blocks request. - self.send_network_msg(NetworkMessage::SendRequest { - peer_id, - request: Request::BlobsByRange(BlobsByRangeRequest { - start_slot: *request.start_slot(), - count: *request.count(), - }), - request_id: RequestId::Sync(SyncRequestId::RangeBlockComponents(id)), - })?; + self.network_send + .send(NetworkMessage::SendRequest { + peer_id, + request: Request::BlobsByRange(BlobsByRangeRequest { + start_slot: *request.start_slot(), + count: *request.count(), + }), + request_id: RequestId::Sync(SyncRequestId::RangeBlockComponents(id)), + }) + .map_err(|_| RpcRequestSendError::NetworkSendError)?; true } else { false @@ -316,7 +345,7 @@ impl SyncNetworkContext { let expects_custody_columns = if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) { - let custody_indexes = self.network_globals().custody_columns(epoch)?; + let custody_indexes = self.network_globals().custody_columns(epoch); for column_index in &custody_indexes { let custody_peer_ids = self.get_custodial_peers(epoch, *column_index); @@ -326,7 +355,7 @@ impl SyncNetworkContext { // - Handle the no peers case gracefully, maybe add some timeout and give a few // minutes / seconds to the peer manager to locate peers on this subnet before // abandoing progress on the chain completely. - return Err("no custody peer"); + return Err(RpcRequestSendError::NoCustodyPeers); }; debug!( @@ -348,7 +377,8 @@ impl SyncNetworkContext { columns: vec![*column_index], }), request_id: RequestId::Sync(SyncRequestId::RangeBlockComponents(id)), - })?; + }) + .map_err(|_| RpcRequestSendError::NetworkSendError)?; } Some(custody_indexes) @@ -423,7 +453,7 @@ impl SyncNetworkContext { lookup_id: SingleLookupId, peer_id: PeerId, block_root: Hash256, - ) -> Result { + ) -> Result { // da_checker includes block that are execution verified, but are missing components if self .chain @@ -460,11 +490,13 @@ impl SyncNetworkContext { let request = BlocksByRootSingleRequest(block_root); - self.send_network_msg(NetworkMessage::SendRequest { - peer_id, - request: Request::BlocksByRoot(request.into_request(&self.chain.spec)), - request_id: RequestId::Sync(SyncRequestId::SingleBlock { id }), - })?; + self.network_send + .send(NetworkMessage::SendRequest { + peer_id, + request: Request::BlocksByRoot(request.into_request(&self.chain.spec)), + request_id: RequestId::Sync(SyncRequestId::SingleBlock { id }), + }) + .map_err(|_| RpcRequestSendError::NetworkSendError)?; self.blocks_by_root_requests .insert(id, ActiveBlocksByRootRequest::new(request)); @@ -484,7 +516,7 @@ impl SyncNetworkContext { peer_id: PeerId, block_root: Hash256, downloaded_block_expected_blobs: Option, - ) -> Result { + ) -> Result { // Check if we are into deneb, and before peerdas if !self .chain @@ -494,13 +526,12 @@ impl SyncNetworkContext { self.chain .slot_clock .now_or_genesis() - .ok_or("clock not available")? + .ok_or(RpcRequestSendError::SlotClockError)? .epoch(T::EthSpec::slots_per_epoch()), ) { return Ok(LookupRequestResult::NoRequestNeeded); } - let Some(expected_blobs) = downloaded_block_expected_blobs.or_else(|| { self.chain .data_availability_checker @@ -547,11 +578,13 @@ impl SyncNetworkContext { indices, }; - self.send_network_msg(NetworkMessage::SendRequest { - peer_id, - request: Request::BlobsByRoot(request.clone().into_request(&self.chain.spec)), - request_id: RequestId::Sync(SyncRequestId::SingleBlob { id }), - })?; + self.network_send + .send(NetworkMessage::SendRequest { + peer_id, + request: Request::BlobsByRoot(request.clone().into_request(&self.chain.spec)), + request_id: RequestId::Sync(SyncRequestId::SingleBlob { id }), + }) + .map_err(|_| RpcRequestSendError::NetworkSendError)?; self.blobs_by_root_requests .insert(id, ActiveBlobsByRootRequest::new(request)); @@ -595,7 +628,7 @@ impl SyncNetworkContext { lookup_id: SingleLookupId, block_root: Hash256, downloaded_block_expected_data: Option, - ) -> Result { + ) -> Result { // Check if we are into peerdas if !self .chain @@ -605,7 +638,7 @@ impl SyncNetworkContext { self.chain .slot_clock .now_or_genesis() - .ok_or("clock not available")? + .ok_or(RpcRequestSendError::SlotClockError)? .epoch(T::EthSpec::slots_per_epoch()), ) { @@ -635,7 +668,7 @@ impl SyncNetworkContext { // TODO(das): figure out how to pass block.slot if we end up doing rotation let block_epoch = Epoch::new(0); - let custody_indexes_duty = self.network_globals().custody_columns(block_epoch)?; + let custody_indexes_duty = self.network_globals().custody_columns(block_epoch); // Include only the blob indexes not yet imported (received through gossip) let custody_indexes_to_fetch = custody_indexes_duty @@ -678,7 +711,7 @@ impl SyncNetworkContext { Ok(LookupRequestResult::RequestSent(req_id)) } // TODO(das): handle this error properly - Err(_) => Err("custody_send_error"), + Err(e) => Err(RpcRequestSendError::CustodyRequestError(e)), } } @@ -822,7 +855,7 @@ impl SyncNetworkContext { request_id: SingleLookupReqId, peer_id: PeerId, block: RpcEvent>>, - ) -> Option>>> { + ) -> Option>>> { let Entry::Occupied(mut request) = self.blocks_by_root_requests.entry(request_id) else { return None; }; @@ -848,7 +881,7 @@ impl SyncNetworkContext { } }; - if let Err(LookupFailure::LookupVerifyError(e)) = &resp { + if let Err(RpcResponseError::LookupVerifyError(e)) = &resp { self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } Some(resp) @@ -859,7 +892,7 @@ impl SyncNetworkContext { request_id: SingleLookupReqId, peer_id: PeerId, blob: RpcEvent>>, - ) -> Option>> { + ) -> Option>> { let Entry::Occupied(mut request) = self.blobs_by_root_requests.entry(request_id) else { return None; }; @@ -891,7 +924,7 @@ impl SyncNetworkContext { // catch if a peer is returning more blobs than requested or if the excess blobs are // invalid. Err((e, resolved)) => { - if let LookupFailure::LookupVerifyError(e) = &e { + if let RpcResponseError::LookupVerifyError(e) = &e { self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } if resolved { @@ -911,7 +944,7 @@ impl SyncNetworkContext { item: RpcEvent>>, ) -> Option<( DataColumnsByRootRequester, - RpcProcessingResult>>>, + RpcResponseResult>>>, )> { let Entry::Occupied(mut request) = self.data_columns_by_root_requests.entry(id) else { return None; @@ -960,10 +993,10 @@ impl SyncNetworkContext { &mut self, id: CustodyId, peer_id: PeerId, - resp: RpcProcessingResult>>>, + resp: RpcResponseResult>>>, ) -> Option<( CustodyRequester, - Result<(Vec>, PeerGroup), LookupFailure>, + Result<(Vec>, PeerGroup), RpcResponseError>, )> { // Note: need to remove the request to borrow self again below. Otherwise we can't // do nested requests @@ -975,7 +1008,7 @@ impl SyncNetworkContext { let result = request .on_data_column_downloaded(peer_id, id.column_index, resp, self) - .map_err(LookupFailure::CustodyRequestError) + .map_err(RpcResponseError::CustodyRequestError) .transpose(); // Convert a result from internal format of `ActiveCustodyRequest` (error first to use ?) to @@ -1003,31 +1036,27 @@ impl SyncNetworkContext { block_root: Hash256, block: RpcBlock, duration: Duration, - ) -> Result<(), &'static str> { - match self.beacon_processor_if_enabled() { - Some(beacon_processor) => { - debug!(self.log, "Sending block for processing"; "block" => ?block_root, "id" => id); - if let Err(e) = beacon_processor.send_rpc_beacon_block( - block_root, - block, - duration, - BlockProcessType::SingleBlock { id }, - ) { - error!( - self.log, - "Failed to send sync block to processor"; - "error" => ?e - ); - Err("beacon processor send failure") - } else { - Ok(()) - } - } - None => { - trace!(self.log, "Dropping block ready for processing. Beacon processor not available"; "block" => %block_root); - Err("beacon processor unavailable") - } - } + ) -> Result<(), SendErrorProcessor> { + let beacon_processor = self + .beacon_processor_if_enabled() + .ok_or(SendErrorProcessor::ProcessorNotAvailable)?; + + debug!(self.log, "Sending block for processing"; "block" => ?block_root, "id" => id); + beacon_processor + .send_rpc_beacon_block( + block_root, + block, + duration, + BlockProcessType::SingleBlock { id }, + ) + .map_err(|e| { + error!( + self.log, + "Failed to send sync block to processor"; + "error" => ?e + ); + SendErrorProcessor::SendError + }) } pub fn send_blobs_for_processing( @@ -1036,31 +1065,27 @@ impl SyncNetworkContext { block_root: Hash256, blobs: FixedBlobSidecarList, duration: Duration, - ) -> Result<(), &'static str> { - match self.beacon_processor_if_enabled() { - Some(beacon_processor) => { - debug!(self.log, "Sending blobs for processing"; "block" => ?block_root, "id" => id); - if let Err(e) = beacon_processor.send_rpc_blobs( - block_root, - blobs, - duration, - BlockProcessType::SingleBlob { id }, - ) { - error!( - self.log, - "Failed to send sync blobs to processor"; - "error" => ?e - ); - Err("beacon processor send failure") - } else { - Ok(()) - } - } - None => { - trace!(self.log, "Dropping blobs ready for processing. Beacon processor not available"; "block_root" => %block_root); - Err("beacon processor unavailable") - } - } + ) -> Result<(), SendErrorProcessor> { + let beacon_processor = self + .beacon_processor_if_enabled() + .ok_or(SendErrorProcessor::ProcessorNotAvailable)?; + + debug!(self.log, "Sending blobs for processing"; "block" => ?block_root, "id" => id); + beacon_processor + .send_rpc_blobs( + block_root, + blobs, + duration, + BlockProcessType::SingleBlob { id }, + ) + .map_err(|e| { + error!( + self.log, + "Failed to send sync blobs to processor"; + "error" => ?e + ); + SendErrorProcessor::SendError + }) } pub fn send_custody_columns_for_processing( @@ -1069,44 +1094,35 @@ impl SyncNetworkContext { custody_columns: Vec>, duration: Duration, process_type: BlockProcessType, - ) -> Result<(), &'static str> { - match self.beacon_processor_if_enabled() { - Some(beacon_processor) => { - debug!(self.log, "Sending custody columns for processing"; "block" => ?block_root, "process_type" => ?process_type); - if let Err(e) = beacon_processor.send_rpc_custody_columns( - block_root, - custody_columns, - duration, - process_type, - ) { - error!( - self.log, - "Failed to send sync custody columns to processor"; - "error" => ?e - ); - Err("beacon processor send failure") - } else { - Ok(()) - } - } - None => { - trace!(self.log, "Dropping custody columns ready for processing. Beacon processor not available"; "block_root" => %block_root); - Err("beacon processor unavailable") - } - } + ) -> Result<(), SendErrorProcessor> { + let beacon_processor = self + .beacon_processor_if_enabled() + .ok_or(SendErrorProcessor::ProcessorNotAvailable)?; + + debug!(self.log, "Sending custody columns for processing"; "block" => ?block_root, "process_type" => ?process_type); + beacon_processor + .send_rpc_custody_columns(block_root, custody_columns, duration, process_type) + .map_err(|e| { + error!( + self.log, + "Failed to send sync custody columns to processor"; + "error" => ?e + ); + SendErrorProcessor::SendError + }) } /// Downscore peers for lookup errors that originate from sync - pub fn on_lookup_failure(&self, peer_id: PeerId, err: &LookupFailure) { + pub fn on_lookup_failure(&self, peer_id: PeerId, err: &RpcResponseError) { match err { // RPCErros are downscored in the network handler - LookupFailure::RpcError(_) => {} + RpcResponseError::RpcError(_) => {} // Only downscore lookup verify errors. RPC errors are downscored in the network handler. - LookupFailure::LookupVerifyError(e) => { + RpcResponseError::LookupVerifyError(e) => { self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); } // CustodyRequestError are downscored in the each data_columns_by_root request - LookupFailure::CustodyRequestError(_) => {} + RpcResponseError::CustodyRequestError(_) => {} } } } diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs index f20a95415db..21acce8e6e5 100644 --- a/beacon_node/network/src/sync/network_context/custody.rs +++ b/beacon_node/network/src/sync/network_context/custody.rs @@ -9,7 +9,7 @@ use slog::{debug, warn}; use std::{marker::PhantomData, sync::Arc}; use types::{data_column_sidecar::ColumnIndex, DataColumnSidecar, Epoch, Hash256}; -use super::{PeerGroup, RpcProcessingResult, SyncNetworkContext}; +use super::{PeerGroup, RpcResponseResult, SyncNetworkContext}; #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub struct CustodyId { @@ -35,7 +35,7 @@ pub struct ActiveCustodyRequest { _phantom: PhantomData, } -#[derive(Debug)] +#[derive(Debug, Eq, PartialEq)] pub enum Error { SendFailed(&'static str), TooManyFailures, @@ -79,7 +79,7 @@ impl ActiveCustodyRequest { &mut self, _peer_id: PeerId, column_index: ColumnIndex, - resp: RpcProcessingResult>, + resp: RpcResponseResult>, cx: &mut SyncNetworkContext, ) -> CustodyRequestResult { // TODO(das): Should downscore peers for verify errors here diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 809d2d7bb16..b6390f1a07f 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -923,7 +923,7 @@ impl SyncingChain { Err(e) => { // NOTE: under normal conditions this shouldn't happen but we handle it anyway warn!(self.log, "Could not send batch request"; - "batch_id" => batch_id, "error" => e, &batch); + "batch_id" => batch_id, "error" => ?e, &batch); // register the failed download and check if the batch can be retried batch.start_downloading_from_peer(peer, 1)?; // fake request_id is not relevant self.peers diff --git a/beacon_node/network/src/sync/sampling.rs b/beacon_node/network/src/sync/sampling.rs index 84ebe69928a..0cb06980618 100644 --- a/beacon_node/network/src/sync/sampling.rs +++ b/beacon_node/network/src/sync/sampling.rs @@ -1,5 +1,5 @@ use self::request::ActiveColumnSampleRequest; -use super::network_context::{LookupFailure, SyncNetworkContext}; +use super::network_context::{RpcResponseError, SyncNetworkContext}; use crate::metrics; use beacon_chain::BeaconChainTypes; use fnv::FnvHashMap; @@ -101,7 +101,7 @@ impl Sampling { &mut self, id: SamplingId, peer_id: PeerId, - resp: Result<(DataColumnSidecarList, Duration), LookupFailure>, + resp: Result<(DataColumnSidecarList, Duration), RpcResponseError>, cx: &mut SyncNetworkContext, ) -> Option<(SamplingRequester, SamplingResult)> { let Some(request) = self.requests.get_mut(&id.id) else { @@ -235,7 +235,7 @@ impl ActiveSamplingRequest { &mut self, _peer_id: PeerId, column_index: ColumnIndex, - resp: Result<(DataColumnSidecarList, Duration), LookupFailure>, + resp: Result<(DataColumnSidecarList, Duration), RpcResponseError>, cx: &mut SyncNetworkContext, ) -> Result, SamplingError> { // Select columns to sample