Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Farmer optimizations #2873

Merged
merged 5 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ pub(super) async fn controller(

// TODO: Metrics

let node_client = CachingProxyNodeClient::new(node_client)
.await
.map_err(|error| anyhow!("Failed to create caching proxy node client: {error}"))?;

let (node, mut node_runner) = {
if network_args.bootstrap_nodes.is_empty() {
network_args
Expand All @@ -160,10 +164,6 @@ pub(super) async fn controller(
.map_err(|error| anyhow!("Failed to configure networking: {error}"))?
};

let node_client = CachingProxyNodeClient::new(node_client)
.await
.map_err(|error| anyhow!("Failed to create caching proxy node client: {error}"))?;

let kzg = Kzg::new(embedded_kzg_settings());
let validator = Some(SegmentCommitmentPieceValidator::new(
node.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use parking_lot::Mutex;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::future::{pending, ready, Future};
use std::mem;
use std::pin::{pin, Pin};
use std::sync::Arc;
use std::time::Instant;
Expand All @@ -24,6 +25,7 @@ use subspace_farmer::cluster::farmer::{ClusterFarm, ClusterFarmerIdentifyFarmBro
use subspace_farmer::cluster::nats_client::NatsClient;
use subspace_farmer::farm::plotted_pieces::PlottedPieces;
use subspace_farmer::farm::{Farm, FarmId, SectorPlottingDetails, SectorUpdate};
use tokio::task;
use tokio::time::MissedTickBehavior;
use tracing::{error, info, trace, warn};

Expand Down Expand Up @@ -126,7 +128,7 @@ impl KnownFarms {
pub(super) async fn maintain_farms(
instance: &str,
nats_client: &NatsClient,
plotted_pieces: &AsyncRwLock<PlottedPieces<FarmIndex>>,
plotted_pieces: &Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
) -> anyhow::Result<()> {
let mut known_farms = KnownFarms::default();

Expand Down Expand Up @@ -173,7 +175,18 @@ pub(super) async fn maintain_farms(
(farm_index, result) = farms.select_next_some() => {
known_farms.remove(farm_index);
farms_to_add_remove.push_back(Box::pin(async move {
plotted_pieces.write().await.delete_farm(farm_index);
let plotted_pieces = Arc::clone(plotted_pieces);

let delete_farm_fut = task::spawn_blocking(move || {
plotted_pieces.write_blocking().delete_farm(farm_index);
});
if let Err(error) = delete_farm_fut.await {
error!(
%farm_index,
%error,
"Failed to delete farm that exited"
);
}

None
}));
Expand Down Expand Up @@ -202,20 +215,39 @@ pub(super) async fn maintain_farms(
}
_ = farm_pruning_interval.tick().fuse() => {
for (farm_index, removed_farm) in known_farms.remove_expired() {
let farm_id = removed_farm.farm_id;

if removed_farm.expired_sender.send(()).is_ok() {
warn!(
%farm_index,
farm_id = %removed_farm.farm_id,
%farm_id,
"Farm expired and removed"
);
} else {
warn!(
%farm_index,
farm_id = %removed_farm.farm_id,
%farm_id,
"Farm exited before expiration notification"
);
}
plotted_pieces.write().await.delete_farm(farm_index);

farms_to_add_remove.push_back(Box::pin(async move {
let plotted_pieces = Arc::clone(plotted_pieces);

let delete_farm_fut = task::spawn_blocking(move || {
plotted_pieces.write_blocking().delete_farm(farm_index);
});
if let Err(error) = delete_farm_fut.await {
error!(
%farm_index,
%farm_id,
%error,
"Failed to delete farm that expired"
);
}

None
}));
}
}
result = farm_add_remove_in_progress => {
Expand All @@ -242,7 +274,7 @@ fn process_farm_identify_message<'a>(
nats_client: &'a NatsClient,
known_farms: &mut KnownFarms,
farms_to_add_remove: &mut VecDeque<AddRemoveFuture<'a>>,
plotted_pieces: &'a AsyncRwLock<PlottedPieces<FarmIndex>>,
plotted_pieces: &'a Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
) {
let ClusterFarmerIdentifyFarmBroadcast {
farm_id,
Expand Down Expand Up @@ -287,7 +319,19 @@ fn process_farm_identify_message<'a>(

if remove {
farms_to_add_remove.push_back(Box::pin(async move {
plotted_pieces.write().await.delete_farm(farm_index);
let plotted_pieces = Arc::clone(plotted_pieces);

let delete_farm_fut = task::spawn_blocking(move || {
plotted_pieces.write_blocking().delete_farm(farm_index);
});
if let Err(error) = delete_farm_fut.await {
error!(
%farm_index,
%farm_id,
%error,
"Failed to delete farm that was replaced"
);
}

None
}));
Expand All @@ -299,7 +343,7 @@ fn process_farm_identify_message<'a>(
farm_index,
farm_id,
total_sectors_count,
plotted_pieces,
Arc::clone(plotted_pieces),
nats_client,
)
.await
Expand Down Expand Up @@ -337,16 +381,13 @@ async fn initialize_farm(
farm_index: FarmIndex,
farm_id: FarmId,
total_sectors_count: SectorIndex,
plotted_pieces: &AsyncRwLock<PlottedPieces<FarmIndex>>,
plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
nats_client: &NatsClient,
) -> anyhow::Result<ClusterFarm> {
let farm = ClusterFarm::new(farm_id, total_sectors_count, nats_client.clone())
.await
.map_err(|error| anyhow!("Failed instantiate cluster farm {farm_id}: {error}"))?;

let mut plotted_pieces = plotted_pieces.write().await;
plotted_pieces.add_farm(farm_index, farm.piece_reader());

// Buffer sectors that are plotted while already plotted sectors are being iterated over
let plotted_sectors_buffer = Arc::new(Mutex::new(Vec::new()));
let sector_update_handler = farm.on_sector_update(Arc::new({
Expand All @@ -372,22 +413,42 @@ async fn initialize_farm(
.get()
.await
.map_err(|error| anyhow!("Failed to get plotted sectors for farm {farm_id}: {error}"))?;
while let Some(plotted_sector_result) = plotted_sectors.next().await {
let plotted_sector = plotted_sector_result
.map_err(|error| anyhow!("Failed to get plotted sector for farm {farm_id}: {error}"))?;

plotted_pieces.add_sector(farm_index, &plotted_sector);
{
let mut plotted_pieces = plotted_pieces.write().await;
plotted_pieces.add_farm(farm_index, farm.piece_reader());

while let Some(plotted_sector_result) = plotted_sectors.next().await {
let plotted_sector = plotted_sector_result.map_err(|error| {
anyhow!("Failed to get plotted sector for farm {farm_id}: {error}")
})?;

plotted_pieces.add_sector(farm_index, &plotted_sector);

task::yield_now().await;
}
}

// Add sectors that were plotted while above iteration was happening to plotted sectors
// too
drop(sector_update_handler);
for (plotted_sector, old_plotted_sector) in plotted_sectors_buffer.lock().drain(..) {
if let Some(old_plotted_sector) = old_plotted_sector {
plotted_pieces.delete_sector(farm_index, &old_plotted_sector);
let plotted_sectors_buffer = mem::take(&mut *plotted_sectors_buffer.lock());
let add_buffered_sectors_fut = task::spawn_blocking(move || {
let mut plotted_pieces = plotted_pieces.write_blocking();

for (plotted_sector, old_plotted_sector) in plotted_sectors_buffer {
if let Some(old_plotted_sector) = old_plotted_sector {
plotted_pieces.delete_sector(farm_index, &old_plotted_sector);
}
// Call delete first to avoid adding duplicates
plotted_pieces.delete_sector(farm_index, &plotted_sector);
plotted_pieces.add_sector(farm_index, &plotted_sector);
}
plotted_pieces.add_sector(farm_index, &plotted_sector);
}
});

add_buffered_sectors_fut
.await
.map_err(|error| anyhow!("Failed to add buffered sectors for farm {farm_id}: {error}"))?;

Ok(farm)
}
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,10 @@ where
let farmer_metrics = FarmerMetrics::new(&mut prometheus_metrics_registry);
let should_start_prometheus_server = !prometheus_listen_on.is_empty();

let node_client = CachingProxyNodeClient::new(node_client)
.await
.map_err(|error| anyhow!("Failed to create caching proxy node client: {error}"))?;

let (node, mut node_runner) = {
if network_args.bootstrap_nodes.is_empty() {
network_args
Expand All @@ -369,10 +373,6 @@ where
.map_err(|error| anyhow!("Failed to configure networking: {error}"))?
};

let node_client = CachingProxyNodeClient::new(node_client)
.await
.map_err(|error| anyhow!("Failed to create caching proxy node client: {error}"))?;

let _prometheus_worker = if should_start_prometheus_server {
let prometheus_task = start_prometheus_metrics_server(
prometheus_listen_on,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ use std::path::Path;
use std::sync::{Arc, Weak};
use subspace_farmer::farm::plotted_pieces::PlottedPieces;
use subspace_farmer::farmer_cache::FarmerCache;
use subspace_farmer::node_client::rpc_node_client::RpcNodeClient;
use subspace_farmer::node_client::{NodeClient, NodeClientExt};
use subspace_farmer::node_client::NodeClientExt;
use subspace_farmer::KNOWN_PEERS_CACHE_SIZE;
use subspace_networking::libp2p::identity::Keypair;
use subspace_networking::libp2p::kad::RecordKey;
Expand Down Expand Up @@ -71,7 +70,7 @@ pub(in super::super) struct NetworkArgs {
}

#[allow(clippy::too_many_arguments)]
pub(in super::super) fn configure_network<FarmIndex>(
pub(in super::super) fn configure_network<FarmIndex, NC>(
protocol_prefix: String,
base_path: &Path,
keypair: Keypair,
Expand All @@ -87,13 +86,14 @@ pub(in super::super) fn configure_network<FarmIndex>(
external_addresses,
}: NetworkArgs,
weak_plotted_pieces: Weak<AsyncRwLock<PlottedPieces<FarmIndex>>>,
node_client: RpcNodeClient,
node_client: NC,
farmer_cache: FarmerCache,
prometheus_metrics_registry: Option<&mut Registry>,
) -> Result<(Node, NodeRunner<FarmerCache>), anyhow::Error>
where
FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
usize: From<FarmIndex>,
NC: NodeClientExt + Clone,
{
let known_peers_registry = KnownPeersManager::new(KnownPeersManagerConfig {
path: Some(base_path.join("known_addresses.bin").into_boxed_path()),
Expand Down
Loading
Loading