Skip to content

Commit

Permalink
Encapsulate logic of interacting with segment headers into `SegmentHe…
Browse files Browse the repository at this point in the history
…aders` data structure
  • Loading branch information
nazar-pc committed Jun 24, 2024
1 parent 24ba5a3 commit 4914476
Showing 1 changed file with 41 additions and 42 deletions.
83 changes: 41 additions & 42 deletions crates/subspace-farmer/src/node_client/caching_proxy_node_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,36 @@ struct SegmentHeaders {
}

impl SegmentHeaders {
async fn sync_segment_headers<NC>(&mut self, client: &NC) -> Result<(), Error>
fn push(&mut self, archived_segment_header: SegmentHeader) {
if self.segment_headers.len() == u64::from(archived_segment_header.segment_index()) as usize
{
self.segment_headers.push(archived_segment_header);
}
}

fn get_segment_headers(&self, segment_indices: &[SegmentIndex]) -> Vec<Option<SegmentHeader>> {
segment_indices
.iter()
.map(|segment_index| {
self.segment_headers
.get(u64::from(*segment_index) as usize)
.copied()
})
.collect::<Vec<_>>()
}

fn last_segment_headers(&self, limit: u64) -> Vec<Option<SegmentHeader>> {
self.segment_headers
.iter()
.copied()
.rev()
.take(limit as usize)
.rev()
.map(Some)
.collect()
}

async fn sync<NC>(&mut self, client: &NC) -> Result<(), Error>
where
NC: NodeClient,
{
Expand Down Expand Up @@ -92,7 +121,7 @@ where
client.subscribe_archived_segment_headers().await?;

info!("Downloading all segment headers from node...");
segment_headers.sync_segment_headers(&client).await?;
segment_headers.sync(&client).await?;
info!("Downloaded all segment headers from node successfully");

let segment_headers = Arc::new(AsyncRwLock::new(segment_headers));
Expand Down Expand Up @@ -126,14 +155,7 @@ where
"New archived archived segment header notification"
);

let mut segment_headers = segment_headers.write().await;
if segment_headers.segment_headers.len()
== u64::from(archived_segment_header.segment_index()) as usize
{
segment_headers
.segment_headers
.push(archived_segment_header);
}
segment_headers.write().await.push(archived_segment_header);

while let Err(error) = client
.acknowledge_archived_segment_header(
Expand Down Expand Up @@ -265,36 +287,20 @@ where
&self,
segment_indices: Vec<SegmentIndex>,
) -> Result<Vec<Option<SegmentHeader>>, RpcError> {
let retrieved_segment_headers = {
let segment_headers = self.segment_headers.read().await;

segment_indices
.iter()
.map(|segment_index| {
segment_headers
.segment_headers
.get(u64::from(*segment_index) as usize)
.copied()
})
.collect::<Vec<_>>()
};
let retrieved_segment_headers = self
.segment_headers
.read()
.await
.get_segment_headers(&segment_indices);

if retrieved_segment_headers.iter().all(Option::is_some) {
Ok(retrieved_segment_headers)
} else {
// Re-sync segment headers
let mut segment_headers = self.segment_headers.write().await;
segment_headers.sync_segment_headers(&self.inner).await?;

Ok(segment_indices
.iter()
.map(|segment_index| {
segment_headers
.segment_headers
.get(u64::from(*segment_index) as usize)
.copied()
})
.collect::<Vec<_>>())
segment_headers.sync(&self.inner).await?;

Ok(segment_headers.get_segment_headers(&segment_indices))
}
}

Expand All @@ -321,13 +327,6 @@ where
.segment_headers
.read()
.await
.segment_headers
.iter()
.copied()
.rev()
.take(limit as usize)
.rev()
.map(Some)
.collect())
.last_segment_headers(limit))
}
}

0 comments on commit 4914476

Please sign in to comment.