Skip to content

Commit

Permalink
Merge pull request #2873 from subspace/farmer-optimizations
Browse files Browse the repository at this point in the history
Farmer optimizations
  • Loading branch information
nazar-pc authored Jun 25, 2024
2 parents ea685dd + 3f9bc0b commit c7e1711
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ pub(super) async fn controller(

// TODO: Metrics

let node_client = CachingProxyNodeClient::new(node_client)
.await
.map_err(|error| anyhow!("Failed to create caching proxy node client: {error}"))?;

let (node, mut node_runner) = {
if network_args.bootstrap_nodes.is_empty() {
network_args
Expand All @@ -160,10 +164,6 @@ pub(super) async fn controller(
.map_err(|error| anyhow!("Failed to configure networking: {error}"))?
};

let node_client = CachingProxyNodeClient::new(node_client)
.await
.map_err(|error| anyhow!("Failed to create caching proxy node client: {error}"))?;

let kzg = Kzg::new(embedded_kzg_settings());
let validator = Some(SegmentCommitmentPieceValidator::new(
node.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use parking_lot::Mutex;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::future::{pending, ready, Future};
use std::mem;
use std::pin::{pin, Pin};
use std::sync::Arc;
use std::time::Instant;
Expand All @@ -24,6 +25,7 @@ use subspace_farmer::cluster::farmer::{ClusterFarm, ClusterFarmerIdentifyFarmBro
use subspace_farmer::cluster::nats_client::NatsClient;
use subspace_farmer::farm::plotted_pieces::PlottedPieces;
use subspace_farmer::farm::{Farm, FarmId, SectorPlottingDetails, SectorUpdate};
use tokio::task;
use tokio::time::MissedTickBehavior;
use tracing::{error, info, trace, warn};

Expand Down Expand Up @@ -126,7 +128,7 @@ impl KnownFarms {
pub(super) async fn maintain_farms(
instance: &str,
nats_client: &NatsClient,
plotted_pieces: &AsyncRwLock<PlottedPieces<FarmIndex>>,
plotted_pieces: &Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
) -> anyhow::Result<()> {
let mut known_farms = KnownFarms::default();

Expand Down Expand Up @@ -173,7 +175,18 @@ pub(super) async fn maintain_farms(
(farm_index, result) = farms.select_next_some() => {
known_farms.remove(farm_index);
farms_to_add_remove.push_back(Box::pin(async move {
plotted_pieces.write().await.delete_farm(farm_index);
let plotted_pieces = Arc::clone(plotted_pieces);

let delete_farm_fut = task::spawn_blocking(move || {
plotted_pieces.write_blocking().delete_farm(farm_index);
});
if let Err(error) = delete_farm_fut.await {
error!(
%farm_index,
%error,
"Failed to delete farm that exited"
);
}

None
}));
Expand Down Expand Up @@ -202,20 +215,39 @@ pub(super) async fn maintain_farms(
}
_ = farm_pruning_interval.tick().fuse() => {
for (farm_index, removed_farm) in known_farms.remove_expired() {
let farm_id = removed_farm.farm_id;

if removed_farm.expired_sender.send(()).is_ok() {
warn!(
%farm_index,
farm_id = %removed_farm.farm_id,
%farm_id,
"Farm expired and removed"
);
} else {
warn!(
%farm_index,
farm_id = %removed_farm.farm_id,
%farm_id,
"Farm exited before expiration notification"
);
}
plotted_pieces.write().await.delete_farm(farm_index);

farms_to_add_remove.push_back(Box::pin(async move {
let plotted_pieces = Arc::clone(plotted_pieces);

let delete_farm_fut = task::spawn_blocking(move || {
plotted_pieces.write_blocking().delete_farm(farm_index);
});
if let Err(error) = delete_farm_fut.await {
error!(
%farm_index,
%farm_id,
%error,
"Failed to delete farm that expired"
);
}

None
}));
}
}
result = farm_add_remove_in_progress => {
Expand All @@ -242,7 +274,7 @@ fn process_farm_identify_message<'a>(
nats_client: &'a NatsClient,
known_farms: &mut KnownFarms,
farms_to_add_remove: &mut VecDeque<AddRemoveFuture<'a>>,
plotted_pieces: &'a AsyncRwLock<PlottedPieces<FarmIndex>>,
plotted_pieces: &'a Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
) {
let ClusterFarmerIdentifyFarmBroadcast {
farm_id,
Expand Down Expand Up @@ -287,7 +319,19 @@ fn process_farm_identify_message<'a>(

if remove {
farms_to_add_remove.push_back(Box::pin(async move {
plotted_pieces.write().await.delete_farm(farm_index);
let plotted_pieces = Arc::clone(plotted_pieces);

let delete_farm_fut = task::spawn_blocking(move || {
plotted_pieces.write_blocking().delete_farm(farm_index);
});
if let Err(error) = delete_farm_fut.await {
error!(
%farm_index,
%farm_id,
%error,
"Failed to delete farm that was replaced"
);
}

None
}));
Expand All @@ -299,7 +343,7 @@ fn process_farm_identify_message<'a>(
farm_index,
farm_id,
total_sectors_count,
plotted_pieces,
Arc::clone(plotted_pieces),
nats_client,
)
.await
Expand Down Expand Up @@ -337,16 +381,13 @@ async fn initialize_farm(
farm_index: FarmIndex,
farm_id: FarmId,
total_sectors_count: SectorIndex,
plotted_pieces: &AsyncRwLock<PlottedPieces<FarmIndex>>,
plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
nats_client: &NatsClient,
) -> anyhow::Result<ClusterFarm> {
let farm = ClusterFarm::new(farm_id, total_sectors_count, nats_client.clone())
.await
.map_err(|error| anyhow!("Failed instantiate cluster farm {farm_id}: {error}"))?;

let mut plotted_pieces = plotted_pieces.write().await;
plotted_pieces.add_farm(farm_index, farm.piece_reader());

// Buffer sectors that are plotted while already plotted sectors are being iterated over
let plotted_sectors_buffer = Arc::new(Mutex::new(Vec::new()));
let sector_update_handler = farm.on_sector_update(Arc::new({
Expand All @@ -372,22 +413,42 @@ async fn initialize_farm(
.get()
.await
.map_err(|error| anyhow!("Failed to get plotted sectors for farm {farm_id}: {error}"))?;
while let Some(plotted_sector_result) = plotted_sectors.next().await {
let plotted_sector = plotted_sector_result
.map_err(|error| anyhow!("Failed to get plotted sector for farm {farm_id}: {error}"))?;

plotted_pieces.add_sector(farm_index, &plotted_sector);
{
let mut plotted_pieces = plotted_pieces.write().await;
plotted_pieces.add_farm(farm_index, farm.piece_reader());

while let Some(plotted_sector_result) = plotted_sectors.next().await {
let plotted_sector = plotted_sector_result.map_err(|error| {
anyhow!("Failed to get plotted sector for farm {farm_id}: {error}")
})?;

plotted_pieces.add_sector(farm_index, &plotted_sector);

task::yield_now().await;
}
}

// Add sectors that were plotted while above iteration was happening to plotted sectors
// too
drop(sector_update_handler);
for (plotted_sector, old_plotted_sector) in plotted_sectors_buffer.lock().drain(..) {
if let Some(old_plotted_sector) = old_plotted_sector {
plotted_pieces.delete_sector(farm_index, &old_plotted_sector);
let plotted_sectors_buffer = mem::take(&mut *plotted_sectors_buffer.lock());
let add_buffered_sectors_fut = task::spawn_blocking(move || {
let mut plotted_pieces = plotted_pieces.write_blocking();

for (plotted_sector, old_plotted_sector) in plotted_sectors_buffer {
if let Some(old_plotted_sector) = old_plotted_sector {
plotted_pieces.delete_sector(farm_index, &old_plotted_sector);
}
// Call delete first to avoid adding duplicates
plotted_pieces.delete_sector(farm_index, &plotted_sector);
plotted_pieces.add_sector(farm_index, &plotted_sector);
}
plotted_pieces.add_sector(farm_index, &plotted_sector);
}
});

add_buffered_sectors_fut
.await
.map_err(|error| anyhow!("Failed to add buffered sectors for farm {farm_id}: {error}"))?;

Ok(farm)
}
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,10 @@ where
let farmer_metrics = FarmerMetrics::new(&mut prometheus_metrics_registry);
let should_start_prometheus_server = !prometheus_listen_on.is_empty();

let node_client = CachingProxyNodeClient::new(node_client)
.await
.map_err(|error| anyhow!("Failed to create caching proxy node client: {error}"))?;

let (node, mut node_runner) = {
if network_args.bootstrap_nodes.is_empty() {
network_args
Expand All @@ -369,10 +373,6 @@ where
.map_err(|error| anyhow!("Failed to configure networking: {error}"))?
};

let node_client = CachingProxyNodeClient::new(node_client)
.await
.map_err(|error| anyhow!("Failed to create caching proxy node client: {error}"))?;

let _prometheus_worker = if should_start_prometheus_server {
let prometheus_task = start_prometheus_metrics_server(
prometheus_listen_on,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ use std::path::Path;
use std::sync::{Arc, Weak};
use subspace_farmer::farm::plotted_pieces::PlottedPieces;
use subspace_farmer::farmer_cache::FarmerCache;
use subspace_farmer::node_client::rpc_node_client::RpcNodeClient;
use subspace_farmer::node_client::{NodeClient, NodeClientExt};
use subspace_farmer::node_client::NodeClientExt;
use subspace_farmer::KNOWN_PEERS_CACHE_SIZE;
use subspace_networking::libp2p::identity::Keypair;
use subspace_networking::libp2p::kad::RecordKey;
Expand Down Expand Up @@ -71,7 +70,7 @@ pub(in super::super) struct NetworkArgs {
}

#[allow(clippy::too_many_arguments)]
pub(in super::super) fn configure_network<FarmIndex>(
pub(in super::super) fn configure_network<FarmIndex, NC>(
protocol_prefix: String,
base_path: &Path,
keypair: Keypair,
Expand All @@ -87,13 +86,14 @@ pub(in super::super) fn configure_network<FarmIndex>(
external_addresses,
}: NetworkArgs,
weak_plotted_pieces: Weak<AsyncRwLock<PlottedPieces<FarmIndex>>>,
node_client: RpcNodeClient,
node_client: NC,
farmer_cache: FarmerCache,
prometheus_metrics_registry: Option<&mut Registry>,
) -> Result<(Node, NodeRunner<FarmerCache>), anyhow::Error>
where
FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
usize: From<FarmIndex>,
NC: NodeClientExt + Clone,
{
let known_peers_registry = KnownPeersManager::new(KnownPeersManagerConfig {
path: Some(base_path.join("known_addresses.bin").into_boxed_path()),
Expand Down
Loading

0 comments on commit c7e1711

Please sign in to comment.