Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow customizing the Kademlia maximum packet size #1502

Merged
merged 2 commits into from
Mar 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ pub struct Kademlia<TStore> {
/// The Kademlia routing table.
kbuckets: KBucketsTable<kbucket::Key<PeerId>, Addresses>,

/// An optional protocol name override to segregate DHTs in the network.
protocol_name_override: Option<Cow<'static, [u8]>>,
/// Configuration of the wire protocol.
protocol_config: KademliaProtocolConfig,

/// The currently active (i.e. in-progress) queries.
queries: QueryPool<QueryInner>,
Expand Down Expand Up @@ -94,7 +94,7 @@ pub struct Kademlia<TStore> {
pub struct KademliaConfig {
kbucket_pending_timeout: Duration,
query_config: QueryConfig,
protocol_name_override: Option<Cow<'static, [u8]>>,
protocol_config: KademliaProtocolConfig,
record_ttl: Option<Duration>,
record_replication_interval: Option<Duration>,
record_publication_interval: Option<Duration>,
Expand All @@ -108,7 +108,7 @@ impl Default for KademliaConfig {
KademliaConfig {
kbucket_pending_timeout: Duration::from_secs(60),
query_config: QueryConfig::default(),
protocol_name_override: None,
protocol_config: Default::default(),
record_ttl: Some(Duration::from_secs(36 * 60 * 60)),
record_replication_interval: Some(Duration::from_secs(60 * 60)),
record_publication_interval: Some(Duration::from_secs(24 * 60 * 60)),
Expand All @@ -125,7 +125,7 @@ impl KademliaConfig {
/// Kademlia nodes only communicate with other nodes using the same protocol name. Using a
/// custom name therefore allows to segregate the DHT from others, if that is desired.
pub fn set_protocol_name(&mut self, name: impl Into<Cow<'static, [u8]>>) -> &mut Self {
self.protocol_name_override = Some(name.into());
self.protocol_config.set_protocol_name(name);
self
}

Expand Down Expand Up @@ -228,6 +228,14 @@ impl KademliaConfig {
self.connection_idle_timeout = duration;
self
}

/// Modifies the maximum allowed size of individual Kademlia packets.
///
/// It might be necessary to increase this value if trying to put large records.
pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self {
self.protocol_config.set_max_packet_size(size);
self
}
}

impl<TStore> Kademlia<TStore>
Expand All @@ -241,9 +249,7 @@ where

/// Get the protocol name of this kademlia instance.
pub fn protocol_name(&self) -> &[u8] {
self.protocol_name_override
.as_ref()
.map_or(crate::protocol::DEFAULT_PROTO_NAME.as_ref(), AsRef::as_ref)
self.protocol_config.protocol_name()
}

/// Creates a new `Kademlia` network behaviour with the given configuration.
Expand All @@ -267,7 +273,7 @@ where
Kademlia {
store,
kbuckets: KBucketsTable::new(local_key, config.kbucket_pending_timeout),
protocol_name_override: config.protocol_name_override,
protocol_config: config.protocol_config,
queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
queries: QueryPool::new(config.query_config),
connected_peers: Default::default(),
Expand Down Expand Up @@ -1064,13 +1070,8 @@ where
type OutEvent = KademliaEvent;

fn new_handler(&mut self) -> Self::ProtocolsHandler {
let mut protocol_config = KademliaProtocolConfig::default();
if let Some(name) = self.protocol_name_override.as_ref() {
protocol_config = protocol_config.with_protocol_name(name.clone());
}

KademliaHandler::new(KademliaHandlerConfig {
protocol_config,
protocol_config: self.protocol_config.clone(),
allow_listening: true,
idle_timeout: self.connection_idle_timeout,
})
Expand Down
22 changes: 17 additions & 5 deletions protocols/kad/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,21 +141,33 @@ impl Into<proto::message::Peer> for KadPeer {
#[derive(Debug, Clone)]
pub struct KademliaProtocolConfig {
protocol_name: Cow<'static, [u8]>,
/// Maximum allowed size of a packet.
max_packet_size: usize,
}

impl KademliaProtocolConfig {
/// Returns the configured protocol name.
pub fn protocol_name(&self) -> &[u8] {
&self.protocol_name
}

/// Modifies the protocol name used on the wire. Can be used to create incompatibilities
/// between networks on purpose.
pub fn with_protocol_name(mut self, name: impl Into<Cow<'static, [u8]>>) -> Self {
pub fn set_protocol_name(&mut self, name: impl Into<Cow<'static, [u8]>>) {
self.protocol_name = name.into();
self
}

/// Modifies the maximum allowed size of a single Kademlia packet.
pub fn set_max_packet_size(&mut self, size: usize) {
self.max_packet_size = size;
}
}

impl Default for KademliaProtocolConfig {
fn default() -> Self {
KademliaProtocolConfig {
protocol_name: Cow::Borrowed(DEFAULT_PROTO_NAME)
protocol_name: Cow::Borrowed(DEFAULT_PROTO_NAME),
max_packet_size: 4096,
}
}
}
Expand All @@ -179,7 +191,7 @@ where

fn upgrade_inbound(self, incoming: C, _: Self::Info) -> Self::Future {
let mut codec = UviBytes::default();
codec.set_max_len(4096);
codec.set_max_len(self.max_packet_size);

future::ok(
Framed::new(incoming, codec)
Expand Down Expand Up @@ -211,7 +223,7 @@ where

fn upgrade_outbound(self, incoming: C, _: Self::Info) -> Self::Future {
let mut codec = UviBytes::default();
codec.set_max_len(4096);
codec.set_max_len(self.max_packet_size);

future::ok(
Framed::new(incoming, codec)
Expand Down