Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

inc quietly on small batches of headers #3564

Merged
merged 1 commit into from
Feb 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 33 additions & 5 deletions p2p/src/codec.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,33 @@
use crate::core::core::block::{BlockHeader, UntrustedBlockHeader};
// Copyright 2020 The Grin Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Provides a connection wrapper that handles the lower level tasks in sending
//! or receiving data from the TCP socket, as well as dealing with timeouts.
//!
//! Because of a few idiosyncracies in the Rust `TcpStream`, this has to use
//! async I/O to be able to both read *and* write on the connection. Which
//! forces us to go through some additional gymnastic to loop over the async
//! stream and make sure we get the right number of bytes out.

use crate::core::global::header_size_bytes;
use crate::core::ser::{BufReader, ProtocolVersion, Readable};
use crate::msg::{Message, MsgHeader, MsgHeaderWrapper, Type};
use crate::types::{AttachmentMeta, AttachmentUpdate, Error};
use crate::{
core::core::block::{BlockHeader, UntrustedBlockHeader},
msg::HeadersData,
};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use core::ser::Reader;
use std::cmp::min;
Expand Down Expand Up @@ -169,18 +194,21 @@ impl Codec {
headers.push(header.into());
*bytes_left = bytes_left.saturating_sub(bytes_read);
*items_left -= 1;

if headers.len() == HEADER_BATCH_SIZE || *items_left == 0 {
let remaining = *items_left as u64;
if headers.len() == HEADER_BATCH_SIZE || remaining == 0 {
let mut h = Vec::with_capacity(min(HEADER_BATCH_SIZE, *items_left));
mem::swap(headers, &mut h);
if *items_left == 0 {
if remaining == 0 {
let bytes_left = *bytes_left;
self.state = None;
if bytes_left > 0 {
return Err(Error::BadMessage);
}
}
return Ok(Message::Headers(h));
return Ok(Message::Headers(HeadersData {
headers: h,
remaining,
}));
}
}
Attachment(left, meta, now) => {
Expand Down
9 changes: 9 additions & 0 deletions p2p/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,15 @@ where
// increase the appropriate counter
match &next {
Ok(Message::Attachment(_, _)) => reader_tracker.inc_quiet_received(bytes_read),
Ok(Message::Headers(data)) => {
// We process a full 512 headers locally in smaller 32 header batches.
// We only want to increment the msg count once for the full 512 headers.
if data.remaining == 0 {
reader_tracker.inc_received(bytes_read);
} else {
reader_tracker.inc_quiet_received(bytes_read);
}
}
_ => reader_tracker.inc_received(bytes_read),
}

Expand Down
13 changes: 12 additions & 1 deletion p2p/src/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ pub enum Message {
CompactBlock(UntrustedCompactBlock),
GetHeaders(Locator),
Header(UntrustedBlockHeader),
Headers(Vec<BlockHeader>),
Headers(HeadersData),
GetPeerAddrs(GetPeerAddrs),
PeerAddrs(PeerAddrs),
TxHashSetRequest(TxHashSetRequest),
Expand All @@ -881,6 +881,17 @@ pub enum Message {
KernelSegment(SegmentResponse<TxKernel>),
}

/// We receive 512 headers from a peer.
/// But we process them in smaller batches of 32 headers.
/// HeadersData wraps the current batch and a count of the headers remaining after this batch.
pub struct HeadersData {
/// Batch of headers currently being processed.
pub headers: Vec<BlockHeader>,
/// Number of headers stil to be processed after this current batch.
/// 0 indicates this is the final batch from the larger set of headers received from the peer.
pub remaining: u64,
}

impl fmt::Display for Message {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Expand Down
4 changes: 2 additions & 2 deletions p2p/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ impl MessageHandler for Protocol {
Consumed::None
}

Message::Headers(headers) => {
adapter.headers_received(&headers, &self.peer_info)?;
Message::Headers(data) => {
adapter.headers_received(&data.headers, &self.peer_info)?;
Consumed::None
}

Expand Down