Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix farming cluster forwarders and identification intervals #2858

Merged
merged 1 commit into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,8 @@ pub(super) async fn cache(
nats_client,
&caches,
&cache_group,
// Only one of the tasks needs to send periodic broadcast
if index == 0 {
CACHE_IDENTIFICATION_BROADCAST_INTERVAL
} else {
Duration::MAX
},
CACHE_IDENTIFICATION_BROADCAST_INTERVAL,
index == 0,
)
.await
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ pub(super) async fn controller(
)?;

let mut controller_services = (0..service_instances.get())
.map(|_| {
.map(|index| {
let nats_client = nats_client.clone();
let node_client = node_client.clone();
let piece_getter = piece_getter.clone();
Expand All @@ -211,6 +211,7 @@ pub(super) async fn controller(
&piece_getter,
&farmer_cache,
&instance,
index == 0,
)
.await
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,12 +367,8 @@ where
tokio::spawn(farmer_service(
nats_client.clone(),
farms.as_slice(),
// Only one of the tasks needs to send periodic broadcast
if index == 0 {
FARMER_IDENTIFICATION_BROADCAST_INTERVAL
} else {
Duration::MAX
},
FARMER_IDENTIFICATION_BROADCAST_INTERVAL,
index == 0,
)),
true,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use crate::commands::shared::PlottingThreadPriority;
use anyhow::anyhow;
use clap::Parser;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use prometheus_client::registry::Registry;
use std::future::Future;
use std::num::NonZeroUsize;
Expand All @@ -17,7 +15,6 @@ use subspace_farmer::cluster::plotter::plotter_service;
use subspace_farmer::plotter::cpu::CpuPlotter;
use subspace_farmer::utils::{
create_plotting_thread_pool_manager, parse_cpu_cores_sets, thread_pool_core_indices,
AsyncJoinOnDrop,
};
use subspace_proof_of_space::Table;
use tokio::sync::Semaphore;
Expand Down Expand Up @@ -78,12 +75,6 @@ pub(super) struct PlotterArgs {
/// "min", "max" or "default".
#[arg(long, default_value_t = PlottingThreadPriority::Min)]
plotting_thread_priority: PlottingThreadPriority,
/// Number of service instances.
///
/// Increasing number of services allows to process more concurrent requests, but increasing
/// beyond number of CPU cores doesn't make sense and will likely hurt performance instead.
#[arg(long, default_value = "32")]
service_instances: NonZeroUsize,
/// Additional cluster components
#[clap(raw = true)]
pub(super) additional_components: Vec<String>,
Expand All @@ -105,7 +96,6 @@ where
plotting_thread_pool_size,
plotting_cpu_cores,
plotting_thread_priority,
service_instances,
additional_components: _,
} = plotter_args;

Expand Down Expand Up @@ -180,24 +170,10 @@ where
));

// TODO: Metrics
let mut plotter_services = (0..service_instances.get())
.map(|_| {
let nats_client = nats_client.clone();
let cpu_plotter = Arc::clone(&cpu_plotter);

AsyncJoinOnDrop::new(
tokio::spawn(async move { plotter_service(&nats_client, &cpu_plotter).await }),
true,
)
})
.collect::<FuturesUnordered<_>>();

Ok(Box::pin(async move {
plotter_services
.next()
plotter_service(&nats_client, &cpu_plotter)
.await
.expect("Not empty; qed")
.map_err(|error| anyhow!("Plotter service failed: {error}"))?
.map_err(|error| anyhow!("Plotter service failed: {error}"))
}))
}
1 change: 1 addition & 0 deletions crates/subspace-farmer/src/bin/subspace-farmer/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![feature(
const_option,
duration_constructors,
extract_if,
hash_extract_if,
let_chains,
Expand Down
68 changes: 50 additions & 18 deletions crates/subspace-farmer/src/cluster/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ use futures::{select, stream, FutureExt, Stream, StreamExt};
use parity_scale_codec::{Decode, Encode};
use std::future::{pending, Future};
use std::pin::{pin, Pin};
use std::time::Duration;
use std::time::{Duration, Instant};
use subspace_core_primitives::{Piece, PieceIndex};
use tokio::time::MissedTickBehavior;
use tracing::{debug, error, info, trace, warn};

const MIN_CACHE_IDENTIFICATION_INTERVAL: Duration = Duration::from_secs(1);

/// Broadcast with identification details by caches
#[derive(Debug, Clone, Encode, Decode)]
pub struct ClusterCacheIdentifyBroadcast {
Expand Down Expand Up @@ -199,6 +201,7 @@ pub async fn cache_service<C>(
caches: &[C],
cache_group: &str,
identification_broadcast_interval: Duration,
primary_instance: bool,
) -> anyhow::Result<()>
where
C: PieceCache,
Expand All @@ -208,7 +211,9 @@ where
.map(|cache| {
let cache_id = *cache.id();

info!(%cache_id, max_num_elements = %cache.max_num_elements(), "Created cache");
if primary_instance {
info!(%cache_id, max_num_elements = %cache.max_num_elements(), "Created cache");
}

CacheDetails {
cache_id,
Expand All @@ -218,22 +223,39 @@ where
})
.collect::<Vec<_>>();

select! {
result = identify_responder(&nats_client, &caches_details, cache_group, identification_broadcast_interval).fuse() => {
result
},
result = write_piece_responder(&nats_client, &caches_details).fuse() => {
result
},
result = read_piece_index_responder(&nats_client, &caches_details).fuse() => {
result
},
result = read_piece_responder(&nats_client, &caches_details).fuse() => {
result
},
result = contents_responder(&nats_client, &caches_details).fuse() => {
result
},
if primary_instance {
select! {
result = identify_responder(&nats_client, &caches_details, cache_group, identification_broadcast_interval).fuse() => {
result
},
result = write_piece_responder(&nats_client, &caches_details).fuse() => {
result
},
result = read_piece_index_responder(&nats_client, &caches_details).fuse() => {
result
},
result = read_piece_responder(&nats_client, &caches_details).fuse() => {
result
},
result = contents_responder(&nats_client, &caches_details).fuse() => {
result
},
}
} else {
select! {
result = write_piece_responder(&nats_client, &caches_details).fuse() => {
result
},
result = read_piece_index_responder(&nats_client, &caches_details).fuse() => {
result
},
result = read_piece_responder(&nats_client, &caches_details).fuse() => {
result
},
result = contents_responder(&nats_client, &caches_details).fuse() => {
result
},
}
}
}

Expand Down Expand Up @@ -261,10 +283,13 @@ where
anyhow!("Failed to subscribe to cache identify broadcast requests: {error}")
})?
.fuse();

// Also send periodic updates in addition to the subscription response
let mut interval = tokio::time::interval(identification_broadcast_interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);

let mut last_identification = Instant::now();

loop {
select! {
maybe_message = subscription.next() => {
Expand All @@ -275,10 +300,17 @@ where

trace!(?message, "Cache received identify broadcast message");

if last_identification.elapsed() < MIN_CACHE_IDENTIFICATION_INTERVAL {
// Skip too frequent identification requests
continue;
}

last_identification = Instant::now();
send_identify_broadcast(nats_client, caches_details).await;
interval.reset();
}
_ = interval.tick().fuse() => {
last_identification = Instant::now();
trace!("Cache self-identification");

send_identify_broadcast(nats_client, caches_details).await;
Expand Down
74 changes: 46 additions & 28 deletions crates/subspace-farmer/src/cluster/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,39 +492,57 @@ pub async fn controller_service<NC, PG>(
piece_getter: &PG,
farmer_cache: &FarmerCache,
instance: &str,
primary_instance: bool,
) -> anyhow::Result<()>
where
NC: NodeClient,
PG: PieceGetter + Sync,
{
select! {
result = slot_info_broadcaster(nats_client, node_client, instance).fuse() => {
result
},
result = reward_signing_broadcaster(nats_client, node_client, instance).fuse() => {
result
},
result = archived_segment_headers_broadcaster(nats_client, node_client, instance).fuse() => {
result
},
result = solution_response_forwarder(nats_client, node_client, instance).fuse() => {
result
},
result = reward_signature_forwarder(nats_client, node_client, instance).fuse() => {
result
},
result = farmer_app_info_responder(nats_client, node_client).fuse() => {
result
},
result = segment_headers_responder(nats_client, node_client).fuse() => {
result
},
result = find_piece_responder(nats_client, farmer_cache).fuse() => {
result
},
result = piece_responder(nats_client, piece_getter).fuse() => {
result
},
if primary_instance {
select! {
result = slot_info_broadcaster(nats_client, node_client, instance).fuse() => {
result
},
result = reward_signing_broadcaster(nats_client, node_client, instance).fuse() => {
result
},
result = archived_segment_headers_broadcaster(nats_client, node_client, instance).fuse() => {
result
},
result = solution_response_forwarder(nats_client, node_client, instance).fuse() => {
result
},
result = reward_signature_forwarder(nats_client, node_client, instance).fuse() => {
result
},
result = farmer_app_info_responder(nats_client, node_client).fuse() => {
result
},
result = segment_headers_responder(nats_client, node_client).fuse() => {
result
},
result = find_piece_responder(nats_client, farmer_cache).fuse() => {
result
},
result = piece_responder(nats_client, piece_getter).fuse() => {
result
},
}
} else {
select! {
result = farmer_app_info_responder(nats_client, node_client).fuse() => {
result
},
result = segment_headers_responder(nats_client, node_client).fuse() => {
result
},
result = find_piece_responder(nats_client, farmer_cache).fuse() => {
result
},
result = piece_responder(nats_client, piece_getter).fuse() => {
result
},
}
}
}

Expand Down
Loading
Loading