From 4914476b27fadc16efc28073479093874ec6e5ba Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Mon, 24 Jun 2024 13:09:05 +0300 Subject: [PATCH] Encapsulate logic of interacting with segment headers into `SegmentHeaders` data structure --- .../node_client/caching_proxy_node_client.rs | 83 +++++++++---------- 1 file changed, 41 insertions(+), 42 deletions(-) diff --git a/crates/subspace-farmer/src/node_client/caching_proxy_node_client.rs b/crates/subspace-farmer/src/node_client/caching_proxy_node_client.rs index 6f2b7c0596..8a5d2ce140 100644 --- a/crates/subspace-farmer/src/node_client/caching_proxy_node_client.rs +++ b/crates/subspace-farmer/src/node_client/caching_proxy_node_client.rs @@ -26,7 +26,36 @@ struct SegmentHeaders { } impl SegmentHeaders { - async fn sync_segment_headers(&mut self, client: &NC) -> Result<(), Error> + fn push(&mut self, archived_segment_header: SegmentHeader) { + if self.segment_headers.len() == u64::from(archived_segment_header.segment_index()) as usize + { + self.segment_headers.push(archived_segment_header); + } + } + + fn get_segment_headers(&self, segment_indices: &[SegmentIndex]) -> Vec> { + segment_indices + .iter() + .map(|segment_index| { + self.segment_headers + .get(u64::from(*segment_index) as usize) + .copied() + }) + .collect::>() + } + + fn last_segment_headers(&self, limit: u64) -> Vec> { + self.segment_headers + .iter() + .copied() + .rev() + .take(limit as usize) + .rev() + .map(Some) + .collect() + } + + async fn sync(&mut self, client: &NC) -> Result<(), Error> where NC: NodeClient, { @@ -92,7 +121,7 @@ where client.subscribe_archived_segment_headers().await?; info!("Downloading all segment headers from node..."); - segment_headers.sync_segment_headers(&client).await?; + segment_headers.sync(&client).await?; info!("Downloaded all segment headers from node successfully"); let segment_headers = Arc::new(AsyncRwLock::new(segment_headers)); @@ -126,14 +155,7 @@ where "New archived archived segment header notification" ); - let mut segment_headers = segment_headers.write().await; - if segment_headers.segment_headers.len() - == u64::from(archived_segment_header.segment_index()) as usize - { - segment_headers - .segment_headers - .push(archived_segment_header); - } + segment_headers.write().await.push(archived_segment_header); while let Err(error) = client .acknowledge_archived_segment_header( @@ -265,36 +287,20 @@ where &self, segment_indices: Vec, ) -> Result>, RpcError> { - let retrieved_segment_headers = { - let segment_headers = self.segment_headers.read().await; - - segment_indices - .iter() - .map(|segment_index| { - segment_headers - .segment_headers - .get(u64::from(*segment_index) as usize) - .copied() - }) - .collect::>() - }; + let retrieved_segment_headers = self + .segment_headers + .read() + .await + .get_segment_headers(&segment_indices); if retrieved_segment_headers.iter().all(Option::is_some) { Ok(retrieved_segment_headers) } else { // Re-sync segment headers let mut segment_headers = self.segment_headers.write().await; - segment_headers.sync_segment_headers(&self.inner).await?; - - Ok(segment_indices - .iter() - .map(|segment_index| { - segment_headers - .segment_headers - .get(u64::from(*segment_index) as usize) - .copied() - }) - .collect::>()) + segment_headers.sync(&self.inner).await?; + + Ok(segment_headers.get_segment_headers(&segment_indices)) } } @@ -321,13 +327,6 @@ where .segment_headers .read() .await - .segment_headers - .iter() - .copied() - .rev() - .take(limit as usize) - .rev() - .map(Some) - .collect()) + .last_segment_headers(limit)) } }