Skip to content

Commit

Permalink
Merge pull request #3374 from autonomys/remove-legacy-record-store-su…
Browse files Browse the repository at this point in the history
…pport

Remove legacy `RecordStore` support
  • Loading branch information
nazar-pc authored Feb 10, 2025
2 parents abd5b1a + f24c0a7 commit 52ca77f
Show file tree
Hide file tree
Showing 13 changed files with 55 additions and 249 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub(in super::super) fn configure_network<FarmIndex, NC>(
node_client: NC,
farmer_caches: FarmerCaches,
prometheus_metrics_registry: Option<&mut Registry>,
) -> Result<(Node, NodeRunner<FarmerCaches>), anyhow::Error>
) -> Result<(Node, NodeRunner), anyhow::Error>
where
FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static,
usize: From<FarmIndex>,
Expand All @@ -116,12 +116,7 @@ where
.map(Box::new)?;

let maybe_weak_node = Arc::new(Mutex::new(None::<WeakNode>));
let default_config = Config::new(
protocol_prefix,
keypair,
farmer_caches.clone(),
prometheus_metrics_registry,
);
let default_config = Config::new(protocol_prefix, keypair, prometheus_metrics_registry);
let config = Config {
reserved_peers,
listen_on,
Expand Down
75 changes: 3 additions & 72 deletions crates/subspace-farmer/src/farmer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,12 @@ use std::{fmt, mem};
use subspace_core_primitives::pieces::{Piece, PieceIndex};
use subspace_core_primitives::segments::{SegmentHeader, SegmentIndex};
use subspace_data_retrieval::piece_getter::PieceGetter;
use subspace_networking::libp2p::kad::{ProviderRecord, RecordKey};
use subspace_networking::libp2p::kad::RecordKey;
use subspace_networking::libp2p::PeerId;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::{KeyWithDistance, LocalRecordProvider};
use tokio::runtime::Handle;
use subspace_networking::KeyWithDistance;
use tokio::sync::Semaphore;
use tokio::task::{block_in_place, yield_now};
use tokio::task::yield_now;
use tracing::{debug, error, info, info_span, trace, warn, Instrument};

const WORKER_CHANNEL_CAPACITY: usize = 100;
Expand All @@ -50,9 +49,6 @@ const SYNC_CONCURRENT_BATCHES: usize = 4;
/// this number defines an interval in pieces after which cache is updated
const INTERMEDIATE_CACHE_UPDATE_INTERVAL: usize = 100;
const INITIAL_SYNC_FARM_INFO_CHECK_INTERVAL: Duration = Duration::from_secs(1);
/// How long to wait for `is_piece_maybe_stored` response from plot cache before timing out in order
/// to prevent blocking of executor for too long
const IS_PIECE_MAYBE_STORED_TIMEOUT: Duration = Duration::from_millis(100);

type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;
type Handler<A> = Bag<HandlerFn<A>, A>;
Expand Down Expand Up @@ -1603,65 +1599,6 @@ impl FarmerCache {
}
}

impl LocalRecordProvider for FarmerCache {
fn record(&self, key: &RecordKey) -> Option<ProviderRecord> {
let distance_key = KeyWithDistance::new(self.peer_id, key.clone());
if self
.piece_caches
.try_read()?
.contains_stored_piece(&distance_key)
{
// Note: We store our own provider records locally without local addresses
// to avoid redundant storage and outdated addresses. Instead, these are
// acquired on demand when returning a `ProviderRecord` for the local node.
return Some(ProviderRecord {
key: key.clone(),
provider: self.peer_id,
expires: None,
addresses: Vec::new(),
});
};

let found_fut = self
.plot_caches
.caches
.try_read()?
.iter()
.map(|plot_cache| {
let plot_cache = Arc::clone(plot_cache);

async move {
matches!(
plot_cache.is_piece_maybe_stored(key).await,
Ok(MaybePieceStoredResult::Yes)
)
}
})
.collect::<FuturesOrdered<_>>()
.any(|found| async move { found });

// TODO: Ideally libp2p would have an async API record store API,
let found = block_in_place(|| {
Handle::current()
.block_on(tokio::time::timeout(
IS_PIECE_MAYBE_STORED_TIMEOUT,
found_fut,
))
.unwrap_or_default()
});

// Note: We store our own provider records locally without local addresses
// to avoid redundant storage and outdated addresses. Instead, these are
// acquired on demand when returning a `ProviderRecord` for the local node.
found.then_some(ProviderRecord {
key: key.clone(),
provider: self.peer_id,
expires: None,
addresses: Vec::new(),
})
}
}

/// Collection of [`FarmerCache`] instances for load balancing
#[derive(Debug, Clone)]
pub struct FarmerCaches {
Expand Down Expand Up @@ -1733,12 +1670,6 @@ impl FarmerCaches {
}
}

impl LocalRecordProvider for FarmerCaches {
fn record(&self, key: &RecordKey) -> Option<ProviderRecord> {
self.caches.choose(&mut thread_rng())?.record(key)
}
}

/// Extracts the `PieceIndex` from a `RecordKey`.
fn decode_piece_index_from_record_key(key: &RecordKey) -> PieceIndex {
let len = key.as_ref().len();
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-gateway/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ pub async fn initialize_object_fetcher(
options: GatewayOptions,
) -> anyhow::Result<(
ObjectFetcher<DsnPieceGetter<SegmentCommitmentPieceValidator<RpcNodeClient>>>,
NodeRunner<()>,
NodeRunner,
)> {
let GatewayOptions {
dev,
Expand Down
6 changes: 3 additions & 3 deletions crates/subspace-gateway/src/commands/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub async fn configure_network(
pending_out_connections,
listen_on,
}: NetworkArgs,
) -> anyhow::Result<(Node, NodeRunner<()>, RpcNodeClient)> {
) -> anyhow::Result<(Node, NodeRunner, RpcNodeClient)> {
info!(url = %node_rpc_url, "Connecting to node RPC");
let node_client = RpcNodeClient::new(&node_rpc_url)
.await
Expand All @@ -89,12 +89,12 @@ pub async fn configure_network(
debug!(?dsn_protocol_version, "Setting DSN protocol version...");

// TODO:
// - use a fixed identity kepair
// - use a fixed identity keypair
// - cache known peers on disk
// - prometheus telemetry
let keypair = identity::ed25519::Keypair::generate();
let keypair = identity::Keypair::from(keypair);
let default_config = Config::new(dsn_protocol_version, keypair, (), None);
let default_config = Config::new(dsn_protocol_version, keypair, None);

let config = Config {
bootstrap_addresses: bootstrap_nodes,
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-networking/examples/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ pub async fn configure_dsn(
) -> Node {
let keypair = Keypair::generate_ed25519();

let default_config = Config::new(protocol_prefix, keypair, (), None);
let default_config = Config::new(protocol_prefix, keypair, None);

let config = Config {
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-networking/examples/random-walker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ async fn configure_dsn(
) -> Node {
let keypair = Keypair::generate_ed25519();

let default_config = Config::new(protocol_prefix, keypair, (), None);
let default_config = Config::new(protocol_prefix, keypair, None);

let config = Config {
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
Expand Down
22 changes: 7 additions & 15 deletions crates/subspace-networking/src/behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub(crate) mod persistent_parameters;
#[cfg(test)]
mod tests;

use crate::constructor::DummyRecordStore;
use crate::protocols::autonat_wrapper::{
Behaviour as AutonatWrapper, Config as AutonatWrapperConfig,
};
Expand Down Expand Up @@ -29,7 +30,7 @@ use void::Void as VoidEvent;

type BlockListBehaviour = AllowBlockListBehaviour<BlockedPeers>;

pub(crate) struct BehaviorConfig<RecordStore> {
pub(crate) struct BehaviorConfig {
/// Identity keypair of a node used for authenticated connections.
pub(crate) peer_id: PeerId,
/// The configuration for the [`Identify`] behaviour.
Expand All @@ -38,8 +39,6 @@ pub(crate) struct BehaviorConfig<RecordStore> {
pub(crate) kademlia: KademliaConfig,
/// The configuration for the [`Gossipsub`] behaviour.
pub(crate) gossipsub: Option<GossipsubConfig>,
/// Externally provided implementation of the custom record store for Kademlia DHT,
pub(crate) record_store: RecordStore,
/// The configuration for the [`RequestResponsesBehaviour`] protocol.
pub(crate) request_response_protocols: Vec<Box<dyn RequestHandler>>,
/// The upper bound for the number of concurrent inbound + outbound streams for request/response
Expand All @@ -55,12 +54,12 @@ pub(crate) struct BehaviorConfig<RecordStore> {

#[derive(NetworkBehaviour)]
#[behaviour(to_swarm = "Event")]
pub(crate) struct Behavior<RecordStore> {
pub(crate) struct Behavior {
// TODO: Connection limits must be the first protocol due to https://github.com/libp2p/rust-libp2p/issues/4773 as
// suggested in https://github.com/libp2p/rust-libp2p/issues/4898#issuecomment-1818013483
pub(crate) connection_limits: ConnectionLimitsBehaviour,
pub(crate) identify: Identify,
pub(crate) kademlia: Kademlia<RecordStore>,
pub(crate) kademlia: Kademlia<DummyRecordStore>,
pub(crate) gossipsub: Toggle<Gossipsub>,
pub(crate) ping: Ping,
pub(crate) request_response: RequestResponseFactoryBehaviour,
Expand All @@ -69,16 +68,9 @@ pub(crate) struct Behavior<RecordStore> {
pub(crate) autonat: AutonatWrapper,
}

impl<RecordStore> Behavior<RecordStore>
where
RecordStore: Send + Sync + libp2p::kad::store::RecordStore + 'static,
{
pub(crate) fn new(config: BehaviorConfig<RecordStore>) -> Self {
let kademlia = Kademlia::<RecordStore>::with_config(
config.peer_id,
config.record_store,
config.kademlia,
);
impl Behavior {
pub(crate) fn new(config: BehaviorConfig) -> Self {
let kademlia = Kademlia::with_config(config.peer_id, DummyRecordStore, config.kademlia);

let gossipsub = config
.gossipsub
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
kademlia_mode: KademliaMode::Static(Mode::Server),
external_addresses,

..Config::new(
protocol_version.to_string(),
keypair,
(),
dsn_metrics_registry,
)
..Config::new(protocol_version.to_string(), keypair, dsn_metrics_registry)
};
let (node, mut node_runner) =
subspace_networking::construct(config).expect("Networking stack creation failed.");
Expand Down
Loading

0 comments on commit 52ca77f

Please sign in to comment.