Skip to content

Commit

Permalink
Allow customizing the Kademlia maximum packet size (#1502)
Browse files Browse the repository at this point in the history
* Allow customizing the Kademlia maximum packet size

* Address concern
  • Loading branch information
tomaka authored Mar 19, 2020
1 parent 439dc82 commit 92ce5d6
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 20 deletions.
31 changes: 16 additions & 15 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ pub struct Kademlia<TStore> {
/// The Kademlia routing table.
kbuckets: KBucketsTable<kbucket::Key<PeerId>, Addresses>,

/// An optional protocol name override to segregate DHTs in the network.
protocol_name_override: Option<Cow<'static, [u8]>>,
/// Configuration of the wire protocol.
protocol_config: KademliaProtocolConfig,

/// The currently active (i.e. in-progress) queries.
queries: QueryPool<QueryInner>,
Expand Down Expand Up @@ -94,7 +94,7 @@ pub struct Kademlia<TStore> {
pub struct KademliaConfig {
kbucket_pending_timeout: Duration,
query_config: QueryConfig,
protocol_name_override: Option<Cow<'static, [u8]>>,
protocol_config: KademliaProtocolConfig,
record_ttl: Option<Duration>,
record_replication_interval: Option<Duration>,
record_publication_interval: Option<Duration>,
Expand All @@ -108,7 +108,7 @@ impl Default for KademliaConfig {
KademliaConfig {
kbucket_pending_timeout: Duration::from_secs(60),
query_config: QueryConfig::default(),
protocol_name_override: None,
protocol_config: Default::default(),
record_ttl: Some(Duration::from_secs(36 * 60 * 60)),
record_replication_interval: Some(Duration::from_secs(60 * 60)),
record_publication_interval: Some(Duration::from_secs(24 * 60 * 60)),
Expand All @@ -125,7 +125,7 @@ impl KademliaConfig {
/// Kademlia nodes only communicate with other nodes using the same protocol name. Using a
/// custom name therefore allows to segregate the DHT from others, if that is desired.
pub fn set_protocol_name(&mut self, name: impl Into<Cow<'static, [u8]>>) -> &mut Self {
self.protocol_name_override = Some(name.into());
self.protocol_config.set_protocol_name(name);
self
}

Expand Down Expand Up @@ -228,6 +228,14 @@ impl KademliaConfig {
self.connection_idle_timeout = duration;
self
}

/// Modifies the maximum allowed size of individual Kademlia packets.
///
/// It might be necessary to increase this value if trying to put large records.
pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self {
self.protocol_config.set_max_packet_size(size);
self
}
}

impl<TStore> Kademlia<TStore>
Expand All @@ -241,9 +249,7 @@ where

/// Get the protocol name of this kademlia instance.
pub fn protocol_name(&self) -> &[u8] {
self.protocol_name_override
.as_ref()
.map_or(crate::protocol::DEFAULT_PROTO_NAME.as_ref(), AsRef::as_ref)
self.protocol_config.protocol_name()
}

/// Creates a new `Kademlia` network behaviour with the given configuration.
Expand All @@ -267,7 +273,7 @@ where
Kademlia {
store,
kbuckets: KBucketsTable::new(local_key, config.kbucket_pending_timeout),
protocol_name_override: config.protocol_name_override,
protocol_config: config.protocol_config,
queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
queries: QueryPool::new(config.query_config),
connected_peers: Default::default(),
Expand Down Expand Up @@ -1064,13 +1070,8 @@ where
type OutEvent = KademliaEvent;

fn new_handler(&mut self) -> Self::ProtocolsHandler {
let mut protocol_config = KademliaProtocolConfig::default();
if let Some(name) = self.protocol_name_override.as_ref() {
protocol_config = protocol_config.with_protocol_name(name.clone());
}

KademliaHandler::new(KademliaHandlerConfig {
protocol_config,
protocol_config: self.protocol_config.clone(),
allow_listening: true,
idle_timeout: self.connection_idle_timeout,
})
Expand Down
22 changes: 17 additions & 5 deletions protocols/kad/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,21 +141,33 @@ impl Into<proto::message::Peer> for KadPeer {
#[derive(Debug, Clone)]
pub struct KademliaProtocolConfig {
protocol_name: Cow<'static, [u8]>,
/// Maximum allowed size of a packet.
max_packet_size: usize,
}

impl KademliaProtocolConfig {
/// Returns the configured protocol name.
pub fn protocol_name(&self) -> &[u8] {
&self.protocol_name
}

/// Modifies the protocol name used on the wire. Can be used to create incompatibilities
/// between networks on purpose.
pub fn with_protocol_name(mut self, name: impl Into<Cow<'static, [u8]>>) -> Self {
pub fn set_protocol_name(&mut self, name: impl Into<Cow<'static, [u8]>>) {
self.protocol_name = name.into();
self
}

/// Modifies the maximum allowed size of a single Kademlia packet.
pub fn set_max_packet_size(&mut self, size: usize) {
self.max_packet_size = size;
}
}

impl Default for KademliaProtocolConfig {
fn default() -> Self {
KademliaProtocolConfig {
protocol_name: Cow::Borrowed(DEFAULT_PROTO_NAME)
protocol_name: Cow::Borrowed(DEFAULT_PROTO_NAME),
max_packet_size: 4096,
}
}
}
Expand All @@ -179,7 +191,7 @@ where

fn upgrade_inbound(self, incoming: C, _: Self::Info) -> Self::Future {
let mut codec = UviBytes::default();
codec.set_max_len(4096);
codec.set_max_len(self.max_packet_size);

future::ok(
Framed::new(incoming, codec)
Expand Down Expand Up @@ -211,7 +223,7 @@ where

fn upgrade_outbound(self, incoming: C, _: Self::Info) -> Self::Future {
let mut codec = UviBytes::default();
codec.set_max_len(4096);
codec.set_max_len(self.max_packet_size);

future::ok(
Framed::new(incoming, codec)
Expand Down

0 comments on commit 92ce5d6

Please sign in to comment.