From 53d972bdba0a57e2cb4c275510819b903a064ee1 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Mon, 24 Jun 2024 12:16:30 +0300 Subject: [PATCH 1/5] Move more expensive farm maintenance operations into dedicated blocking tasks --- .../commands/cluster/controller/farms.rs | 103 ++++++++++++++---- 1 file changed, 82 insertions(+), 21 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs index b6fc46af91..e8efec6895 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs @@ -15,6 +15,7 @@ use parking_lot::Mutex; use std::collections::hash_map::Entry; use std::collections::{HashMap, VecDeque}; use std::future::{pending, ready, Future}; +use std::mem; use std::pin::{pin, Pin}; use std::sync::Arc; use std::time::Instant; @@ -24,6 +25,7 @@ use subspace_farmer::cluster::farmer::{ClusterFarm, ClusterFarmerIdentifyFarmBro use subspace_farmer::cluster::nats_client::NatsClient; use subspace_farmer::farm::plotted_pieces::PlottedPieces; use subspace_farmer::farm::{Farm, FarmId, SectorPlottingDetails, SectorUpdate}; +use tokio::task; use tokio::time::MissedTickBehavior; use tracing::{error, info, trace, warn}; @@ -126,7 +128,7 @@ impl KnownFarms { pub(super) async fn maintain_farms( instance: &str, nats_client: &NatsClient, - plotted_pieces: &AsyncRwLock>, + plotted_pieces: &Arc>>, ) -> anyhow::Result<()> { let mut known_farms = KnownFarms::default(); @@ -173,7 +175,18 @@ pub(super) async fn maintain_farms( (farm_index, result) = farms.select_next_some() => { known_farms.remove(farm_index); farms_to_add_remove.push_back(Box::pin(async move { - plotted_pieces.write().await.delete_farm(farm_index); + let plotted_pieces = Arc::clone(plotted_pieces); + + let delete_farm_fut = task::spawn_blocking(move || { + plotted_pieces.write_blocking().delete_farm(farm_index); + }); + if let Err(error) = delete_farm_fut.await { + error!( + %farm_index, + %error, + "Failed to delete farm that exited" + ); + } None })); @@ -202,20 +215,39 @@ pub(super) async fn maintain_farms( } _ = farm_pruning_interval.tick().fuse() => { for (farm_index, removed_farm) in known_farms.remove_expired() { + let farm_id = removed_farm.farm_id; + if removed_farm.expired_sender.send(()).is_ok() { warn!( %farm_index, - farm_id = %removed_farm.farm_id, + %farm_id, "Farm expired and removed" ); } else { warn!( %farm_index, - farm_id = %removed_farm.farm_id, + %farm_id, "Farm exited before expiration notification" ); } - plotted_pieces.write().await.delete_farm(farm_index); + + farms_to_add_remove.push_back(Box::pin(async move { + let plotted_pieces = Arc::clone(plotted_pieces); + + let delete_farm_fut = task::spawn_blocking(move || { + plotted_pieces.write_blocking().delete_farm(farm_index); + }); + if let Err(error) = delete_farm_fut.await { + error!( + %farm_index, + %farm_id, + %error, + "Failed to delete farm that expired" + ); + } + + None + })); } } result = farm_add_remove_in_progress => { @@ -242,7 +274,7 @@ fn process_farm_identify_message<'a>( nats_client: &'a NatsClient, known_farms: &mut KnownFarms, farms_to_add_remove: &mut VecDeque>, - plotted_pieces: &'a AsyncRwLock>, + plotted_pieces: &'a Arc>>, ) { let ClusterFarmerIdentifyFarmBroadcast { farm_id, @@ -287,7 +319,19 @@ fn process_farm_identify_message<'a>( if remove { farms_to_add_remove.push_back(Box::pin(async move { - plotted_pieces.write().await.delete_farm(farm_index); + let plotted_pieces = Arc::clone(plotted_pieces); + + let delete_farm_fut = task::spawn_blocking(move || { + plotted_pieces.write_blocking().delete_farm(farm_index); + }); + if let Err(error) = delete_farm_fut.await { + error!( + %farm_index, + %farm_id, + %error, + "Failed to delete farm that was replaced" + ); + } None })); @@ -299,7 +343,7 @@ fn process_farm_identify_message<'a>( farm_index, farm_id, total_sectors_count, - plotted_pieces, + Arc::clone(plotted_pieces), nats_client, ) .await @@ -337,16 +381,13 @@ async fn initialize_farm( farm_index: FarmIndex, farm_id: FarmId, total_sectors_count: SectorIndex, - plotted_pieces: &AsyncRwLock>, + plotted_pieces: Arc>>, nats_client: &NatsClient, ) -> anyhow::Result { let farm = ClusterFarm::new(farm_id, total_sectors_count, nats_client.clone()) .await .map_err(|error| anyhow!("Failed instantiate cluster farm {farm_id}: {error}"))?; - let mut plotted_pieces = plotted_pieces.write().await; - plotted_pieces.add_farm(farm_index, farm.piece_reader()); - // Buffer sectors that are plotted while already plotted sectors are being iterated over let plotted_sectors_buffer = Arc::new(Mutex::new(Vec::new())); let sector_update_handler = farm.on_sector_update(Arc::new({ @@ -372,22 +413,42 @@ async fn initialize_farm( .get() .await .map_err(|error| anyhow!("Failed to get plotted sectors for farm {farm_id}: {error}"))?; - while let Some(plotted_sector_result) = plotted_sectors.next().await { - let plotted_sector = plotted_sector_result - .map_err(|error| anyhow!("Failed to get plotted sector for farm {farm_id}: {error}"))?; - plotted_pieces.add_sector(farm_index, &plotted_sector); + { + let mut plotted_pieces = plotted_pieces.write().await; + plotted_pieces.add_farm(farm_index, farm.piece_reader()); + + while let Some(plotted_sector_result) = plotted_sectors.next().await { + let plotted_sector = plotted_sector_result.map_err(|error| { + anyhow!("Failed to get plotted sector for farm {farm_id}: {error}") + })?; + + plotted_pieces.add_sector(farm_index, &plotted_sector); + + task::yield_now().await; + } } // Add sectors that were plotted while above iteration was happening to plotted sectors // too drop(sector_update_handler); - for (plotted_sector, old_plotted_sector) in plotted_sectors_buffer.lock().drain(..) { - if let Some(old_plotted_sector) = old_plotted_sector { - plotted_pieces.delete_sector(farm_index, &old_plotted_sector); + let plotted_sectors_buffer = mem::take(&mut *plotted_sectors_buffer.lock()); + let add_buffered_sectors_fut = task::spawn_blocking(move || { + let mut plotted_pieces = plotted_pieces.write_blocking(); + + for (plotted_sector, old_plotted_sector) in plotted_sectors_buffer { + if let Some(old_plotted_sector) = old_plotted_sector { + plotted_pieces.delete_sector(farm_index, &old_plotted_sector); + } + // Call delete first to avoid adding duplicates + plotted_pieces.delete_sector(farm_index, &plotted_sector); + plotted_pieces.add_sector(farm_index, &plotted_sector); } - plotted_pieces.add_sector(farm_index, &plotted_sector); - } + }); + + add_buffered_sectors_fut + .await + .map_err(|error| anyhow!("Failed to add buffered sectors for farm {farm_id}: {error}"))?; Ok(farm) } From cb69d6dcf6ae2196151c4666abd8b3c152dff1af Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Mon, 24 Jun 2024 12:35:40 +0300 Subject: [PATCH 2/5] Use `CachingProxyNodeClient` for networking too --- .../commands/cluster/controller.rs | 8 +++---- .../src/bin/subspace-farmer/commands/farm.rs | 8 +++---- .../commands/shared/network.rs | 8 +++---- .../node_client/caching_proxy_node_client.rs | 22 ++++++++++++++++++- .../handlers/segment_header.rs | 1 + 5 files changed, 34 insertions(+), 13 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs index ee3ce11847..01fa23a169 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller.rs @@ -140,6 +140,10 @@ pub(super) async fn controller( // TODO: Metrics + let node_client = CachingProxyNodeClient::new(node_client) + .await + .map_err(|error| anyhow!("Failed to create caching proxy node client: {error}"))?; + let (node, mut node_runner) = { if network_args.bootstrap_nodes.is_empty() { network_args @@ -160,10 +164,6 @@ pub(super) async fn controller( .map_err(|error| anyhow!("Failed to configure networking: {error}"))? }; - let node_client = CachingProxyNodeClient::new(node_client) - .await - .map_err(|error| anyhow!("Failed to create caching proxy node client: {error}"))?; - let kzg = Kzg::new(embedded_kzg_settings()); let validator = Some(SegmentCommitmentPieceValidator::new( node.clone(), diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index ff6a1b9b1e..407a8fe772 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -349,6 +349,10 @@ where let farmer_metrics = FarmerMetrics::new(&mut prometheus_metrics_registry); let should_start_prometheus_server = !prometheus_listen_on.is_empty(); + let node_client = CachingProxyNodeClient::new(node_client) + .await + .map_err(|error| anyhow!("Failed to create caching proxy node client: {error}"))?; + let (node, mut node_runner) = { if network_args.bootstrap_nodes.is_empty() { network_args @@ -369,10 +373,6 @@ where .map_err(|error| anyhow!("Failed to configure networking: {error}"))? }; - let node_client = CachingProxyNodeClient::new(node_client) - .await - .map_err(|error| anyhow!("Failed to create caching proxy node client: {error}"))?; - let _prometheus_worker = if should_start_prometheus_server { let prometheus_task = start_prometheus_metrics_server( prometheus_listen_on, diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs index 9a8d5705a6..29b5129c4e 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs @@ -9,8 +9,7 @@ use std::path::Path; use std::sync::{Arc, Weak}; use subspace_farmer::farm::plotted_pieces::PlottedPieces; use subspace_farmer::farmer_cache::FarmerCache; -use subspace_farmer::node_client::rpc_node_client::RpcNodeClient; -use subspace_farmer::node_client::{NodeClient, NodeClientExt}; +use subspace_farmer::node_client::NodeClientExt; use subspace_farmer::KNOWN_PEERS_CACHE_SIZE; use subspace_networking::libp2p::identity::Keypair; use subspace_networking::libp2p::kad::RecordKey; @@ -71,7 +70,7 @@ pub(in super::super) struct NetworkArgs { } #[allow(clippy::too_many_arguments)] -pub(in super::super) fn configure_network( +pub(in super::super) fn configure_network( protocol_prefix: String, base_path: &Path, keypair: Keypair, @@ -87,13 +86,14 @@ pub(in super::super) fn configure_network( external_addresses, }: NetworkArgs, weak_plotted_pieces: Weak>>, - node_client: RpcNodeClient, + node_client: NC, farmer_cache: FarmerCache, prometheus_metrics_registry: Option<&mut Registry>, ) -> Result<(Node, NodeRunner), anyhow::Error> where FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static, usize: From, + NC: NodeClientExt + Clone, { let known_peers_registry = KnownPeersManager::new(KnownPeersManagerConfig { path: Some(base_path.join("known_addresses.bin").into_boxed_path()), diff --git a/crates/subspace-farmer/src/node_client/caching_proxy_node_client.rs b/crates/subspace-farmer/src/node_client/caching_proxy_node_client.rs index 448fd08d8b..fe0a25965b 100644 --- a/crates/subspace-farmer/src/node_client/caching_proxy_node_client.rs +++ b/crates/subspace-farmer/src/node_client/caching_proxy_node_client.rs @@ -1,7 +1,7 @@ //! Node client wrapper around another node client that caches some data for better performance and //! proxies other requests through -use crate::node_client::{Error as RpcError, Error, NodeClient}; +use crate::node_client::{Error as RpcError, Error, NodeClient, NodeClientExt}; use crate::utils::AsyncJoinOnDrop; use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; use async_trait::async_trait; @@ -301,3 +301,23 @@ where Ok(()) } } + +#[async_trait] +impl NodeClientExt for CachingProxyNodeClient +where + NC: NodeClientExt, +{ + async fn last_segment_headers(&self, limit: u64) -> Result>, Error> { + Ok(self + .segment_headers + .read() + .await + .iter() + .copied() + .rev() + .take(limit as usize) + .rev() + .map(Some) + .collect()) + } +} diff --git a/crates/subspace-networking/src/protocols/request_response/handlers/segment_header.rs b/crates/subspace-networking/src/protocols/request_response/handlers/segment_header.rs index 1e925ec65c..adf3425916 100644 --- a/crates/subspace-networking/src/protocols/request_response/handlers/segment_header.rs +++ b/crates/subspace-networking/src/protocols/request_response/handlers/segment_header.rs @@ -18,6 +18,7 @@ pub enum SegmentHeaderRequest { /// Defines how many segment headers to return. LastSegmentHeaders { /// Number of segment headers to return. + // TODO: Replace u64 with a smaller type when able to make breaking changes segment_header_number: u64, }, } From 24ba5a3082a49759e368004fc3073c0a03862516 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Mon, 24 Jun 2024 12:57:53 +0300 Subject: [PATCH 3/5] Create `SegmentHeaders` wrapper for `CachingProxyNodeClient` that will be extended later --- .../node_client/caching_proxy_node_client.rs | 84 +++++++++++-------- 1 file changed, 47 insertions(+), 37 deletions(-) diff --git a/crates/subspace-farmer/src/node_client/caching_proxy_node_client.rs b/crates/subspace-farmer/src/node_client/caching_proxy_node_client.rs index fe0a25965b..6f2b7c0596 100644 --- a/crates/subspace-farmer/src/node_client/caching_proxy_node_client.rs +++ b/crates/subspace-farmer/src/node_client/caching_proxy_node_client.rs @@ -20,42 +20,47 @@ use tracing::{info, trace, warn}; const FARMER_APP_INFO_DEDUPLICATION_WINDOW: Duration = Duration::from_secs(1); -async fn sync_segment_headers( - segment_headers: &mut Vec, - client: &NC, -) -> Result<(), Error> -where - NC: NodeClient, -{ - let mut segment_index_offset = SegmentIndex::from(segment_headers.len() as u64); - let segment_index_step = SegmentIndex::from(MAX_SEGMENT_HEADERS_PER_REQUEST as u64); - - 'outer: loop { - let from = segment_index_offset; - let to = segment_index_offset + segment_index_step; - trace!(%from, %to, "Requesting segment headers"); +#[derive(Debug, Default)] +struct SegmentHeaders { + segment_headers: Vec, +} - for maybe_segment_header in client - .segment_headers((from..to).collect::>()) - .await - .map_err(|error| { - format!("Failed to download segment headers {from}..{to} from node: {error}") - })? - { - let Some(segment_header) = maybe_segment_header else { - // Reached non-existent segment header - break 'outer; - }; - - if segment_headers.len() == u64::from(segment_header.segment_index()) as usize { - segment_headers.push(segment_header); +impl SegmentHeaders { + async fn sync_segment_headers(&mut self, client: &NC) -> Result<(), Error> + where + NC: NodeClient, + { + let mut segment_index_offset = SegmentIndex::from(self.segment_headers.len() as u64); + let segment_index_step = SegmentIndex::from(MAX_SEGMENT_HEADERS_PER_REQUEST as u64); + + 'outer: loop { + let from = segment_index_offset; + let to = segment_index_offset + segment_index_step; + trace!(%from, %to, "Requesting segment headers"); + + for maybe_segment_header in client + .segment_headers((from..to).collect::>()) + .await + .map_err(|error| { + format!("Failed to download segment headers {from}..{to} from node: {error}") + })? + { + let Some(segment_header) = maybe_segment_header else { + // Reached non-existent segment header + break 'outer; + }; + + if self.segment_headers.len() == u64::from(segment_header.segment_index()) as usize + { + self.segment_headers.push(segment_header); + } } + + segment_index_offset += segment_index_step; } - segment_index_offset += segment_index_step; + Ok(()) } - - Ok(()) } /// Node client wrapper around another node client that caches some data for better performance and @@ -71,7 +76,7 @@ pub struct CachingProxyNodeClient { slot_info_receiver: watch::Receiver>, archived_segment_headers_receiver: watch::Receiver>, reward_signing_receiver: watch::Receiver>, - segment_headers: Arc>>, + segment_headers: Arc>, last_farmer_app_info: Arc>, _background_task: Arc>, } @@ -82,12 +87,12 @@ where { /// Create a new instance pub async fn new(client: NC) -> Result { - let mut segment_headers = Vec::::new(); + let mut segment_headers = SegmentHeaders::default(); let mut archived_segments_notifications = client.subscribe_archived_segment_headers().await?; info!("Downloading all segment headers from node..."); - sync_segment_headers(&mut segment_headers, &client).await?; + segment_headers.sync_segment_headers(&client).await?; info!("Downloaded all segment headers from node successfully"); let segment_headers = Arc::new(AsyncRwLock::new(segment_headers)); @@ -122,10 +127,12 @@ where ); let mut segment_headers = segment_headers.write().await; - if segment_headers.len() + if segment_headers.segment_headers.len() == u64::from(archived_segment_header.segment_index()) as usize { - segment_headers.push(archived_segment_header); + segment_headers + .segment_headers + .push(archived_segment_header); } while let Err(error) = client @@ -265,6 +272,7 @@ where .iter() .map(|segment_index| { segment_headers + .segment_headers .get(u64::from(*segment_index) as usize) .copied() }) @@ -276,12 +284,13 @@ where } else { // Re-sync segment headers let mut segment_headers = self.segment_headers.write().await; - sync_segment_headers(&mut segment_headers, &self.inner).await?; + segment_headers.sync_segment_headers(&self.inner).await?; Ok(segment_indices .iter() .map(|segment_index| { segment_headers + .segment_headers .get(u64::from(*segment_index) as usize) .copied() }) @@ -312,6 +321,7 @@ where .segment_headers .read() .await + .segment_headers .iter() .copied() .rev() From 4914476b27fadc16efc28073479093874ec6e5ba Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Mon, 24 Jun 2024 13:09:05 +0300 Subject: [PATCH 4/5] Encapsulate logic of interacting with segment headers into `SegmentHeaders` data structure --- .../node_client/caching_proxy_node_client.rs | 83 +++++++++---------- 1 file changed, 41 insertions(+), 42 deletions(-) diff --git a/crates/subspace-farmer/src/node_client/caching_proxy_node_client.rs b/crates/subspace-farmer/src/node_client/caching_proxy_node_client.rs index 6f2b7c0596..8a5d2ce140 100644 --- a/crates/subspace-farmer/src/node_client/caching_proxy_node_client.rs +++ b/crates/subspace-farmer/src/node_client/caching_proxy_node_client.rs @@ -26,7 +26,36 @@ struct SegmentHeaders { } impl SegmentHeaders { - async fn sync_segment_headers(&mut self, client: &NC) -> Result<(), Error> + fn push(&mut self, archived_segment_header: SegmentHeader) { + if self.segment_headers.len() == u64::from(archived_segment_header.segment_index()) as usize + { + self.segment_headers.push(archived_segment_header); + } + } + + fn get_segment_headers(&self, segment_indices: &[SegmentIndex]) -> Vec> { + segment_indices + .iter() + .map(|segment_index| { + self.segment_headers + .get(u64::from(*segment_index) as usize) + .copied() + }) + .collect::>() + } + + fn last_segment_headers(&self, limit: u64) -> Vec> { + self.segment_headers + .iter() + .copied() + .rev() + .take(limit as usize) + .rev() + .map(Some) + .collect() + } + + async fn sync(&mut self, client: &NC) -> Result<(), Error> where NC: NodeClient, { @@ -92,7 +121,7 @@ where client.subscribe_archived_segment_headers().await?; info!("Downloading all segment headers from node..."); - segment_headers.sync_segment_headers(&client).await?; + segment_headers.sync(&client).await?; info!("Downloaded all segment headers from node successfully"); let segment_headers = Arc::new(AsyncRwLock::new(segment_headers)); @@ -126,14 +155,7 @@ where "New archived archived segment header notification" ); - let mut segment_headers = segment_headers.write().await; - if segment_headers.segment_headers.len() - == u64::from(archived_segment_header.segment_index()) as usize - { - segment_headers - .segment_headers - .push(archived_segment_header); - } + segment_headers.write().await.push(archived_segment_header); while let Err(error) = client .acknowledge_archived_segment_header( @@ -265,36 +287,20 @@ where &self, segment_indices: Vec, ) -> Result>, RpcError> { - let retrieved_segment_headers = { - let segment_headers = self.segment_headers.read().await; - - segment_indices - .iter() - .map(|segment_index| { - segment_headers - .segment_headers - .get(u64::from(*segment_index) as usize) - .copied() - }) - .collect::>() - }; + let retrieved_segment_headers = self + .segment_headers + .read() + .await + .get_segment_headers(&segment_indices); if retrieved_segment_headers.iter().all(Option::is_some) { Ok(retrieved_segment_headers) } else { // Re-sync segment headers let mut segment_headers = self.segment_headers.write().await; - segment_headers.sync_segment_headers(&self.inner).await?; - - Ok(segment_indices - .iter() - .map(|segment_index| { - segment_headers - .segment_headers - .get(u64::from(*segment_index) as usize) - .copied() - }) - .collect::>()) + segment_headers.sync(&self.inner).await?; + + Ok(segment_headers.get_segment_headers(&segment_indices)) } } @@ -321,13 +327,6 @@ where .segment_headers .read() .await - .segment_headers - .iter() - .copied() - .rev() - .take(limit as usize) - .rev() - .map(Some) - .collect()) + .last_segment_headers(limit)) } } From 3f9bc0b4b8b04fc896b98dcff62bf4a36a046fe4 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Mon, 24 Jun 2024 13:22:07 +0300 Subject: [PATCH 5/5] Skip too frequent segment headers sync requests --- .../src/node_client/caching_proxy_node_client.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/crates/subspace-farmer/src/node_client/caching_proxy_node_client.rs b/crates/subspace-farmer/src/node_client/caching_proxy_node_client.rs index 8a5d2ce140..4b24879612 100644 --- a/crates/subspace-farmer/src/node_client/caching_proxy_node_client.rs +++ b/crates/subspace-farmer/src/node_client/caching_proxy_node_client.rs @@ -18,11 +18,13 @@ use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; use tracing::{info, trace, warn}; +const SEGMENT_HEADERS_SYNC_INTERVAL: Duration = Duration::from_secs(1); const FARMER_APP_INFO_DEDUPLICATION_WINDOW: Duration = Duration::from_secs(1); #[derive(Debug, Default)] struct SegmentHeaders { segment_headers: Vec, + last_synced: Option, } impl SegmentHeaders { @@ -59,6 +61,13 @@ impl SegmentHeaders { where NC: NodeClient, { + if let Some(last_synced) = &self.last_synced { + if last_synced.elapsed() < SEGMENT_HEADERS_SYNC_INTERVAL { + return Ok(()); + } + } + self.last_synced.replace(Instant::now()); + let mut segment_index_offset = SegmentIndex::from(self.segment_headers.len() as u64); let segment_index_step = SegmentIndex::from(MAX_SEGMENT_HEADERS_PER_REQUEST as u64);