Skip to content

Commit

Permalink
quinn-proto: change defragmentation strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
geieredgar committed Jan 29, 2021
1 parent 176b454 commit 4a46217
Showing 1 changed file with 51 additions and 15 deletions.
66 changes: 51 additions & 15 deletions quinn-proto/src/connection/assembler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,19 @@ impl Assembler {
// counter to the new number of chunks left in the heap so that we can decide
// when to defragment the queue again if necessary.
fn defragment(&mut self) {
let buffered = self.data.iter().map(|c| c.bytes.len()).sum::<usize>();
let high_utilization_over_allocation = self
.data
.iter()
.filter(|b| b.high_utilization())
.map(|b| b.over_allocation)
.sum::<usize>();
let include_highly_utilized = high_utilization_over_allocation > DEFRAGMENTATION_THRESHOLD;
let buffered = self
.data
.iter()
.filter(|b| b.should_defragment(include_highly_utilized))
.map(|b| b.bytes.len())
.sum::<usize>();
let mut buffer = BytesMut::with_capacity(buffered);
let mut offset = self
.data
Expand All @@ -145,20 +157,24 @@ impl Assembler {
let new = BinaryHeap::with_capacity(self.data.len());
let old = mem::replace(&mut self.data, new);
for chunk in old.into_sorted_vec().into_iter().rev() {
let end = offset + (buffer.len() as u64);
if let Some(overlap) = end.checked_sub(chunk.offset) {
if let Some(bytes) = chunk.bytes.get(overlap as usize..) {
buffer.extend_from_slice(bytes);
if chunk.should_defragment(include_highly_utilized) {
let end = offset + (buffer.len() as u64);
if let Some(overlap) = end.checked_sub(chunk.offset) {
if let Some(bytes) = chunk.bytes.get(overlap as usize..) {
buffer.extend_from_slice(bytes);
}
} else {
let bytes = buffer.split().freeze();
self.data.push(Buffer {
offset,
bytes,
over_allocation: 0,
});
offset = chunk.offset;
buffer.extend_from_slice(&chunk.bytes);
}
} else {
let bytes = buffer.split().freeze();
self.data.push(Buffer {
offset,
bytes,
over_allocation: 0,
});
offset = chunk.offset;
buffer.extend_from_slice(&chunk.bytes);
self.data.push(chunk);
}
}

Expand All @@ -168,7 +184,11 @@ impl Assembler {
bytes,
over_allocation: 0,
});
self.over_allocation = 0;
self.over_allocation = if include_highly_utilized {
0
} else {
high_utilization_over_allocation
};
}

pub(crate) fn insert(&mut self, mut offset: u64, mut bytes: Bytes, mut allocation_size: usize) {
Expand Down Expand Up @@ -209,7 +229,7 @@ impl Assembler {
// of memory allocated. In a worst case scenario like 32 1-byte chunks,
// each one from a ~1000-byte datagram, this limits us to having a
// maximum pathological over-allocation of about 32k bytes.
if self.over_allocation > 32 * 1024 {
if self.over_allocation > DEFRAGMENTATION_THRESHOLD {
self.defragment()
}
}
Expand Down Expand Up @@ -247,6 +267,8 @@ impl Assembler {
}
}

const DEFRAGMENTATION_THRESHOLD: usize = 32 * 1024;

/// A chunk of data from the receive stream
#[non_exhaustive]
#[derive(Debug, PartialEq)]
Expand Down Expand Up @@ -277,6 +299,20 @@ struct Buffer {
over_allocation: usize,
}

impl Buffer {
fn high_utilization(&self) -> bool {
self.bytes.len() >= self.over_allocation
}

fn should_defragment(&self, include_highly_utilized: bool) -> bool {
if include_highly_utilized {
self.over_allocation > 0
} else {
!self.high_utilization()
}
}
}

impl Ord for Buffer {
// Invert ordering based on offset (max-heap, min offset first),
// prioritize longer chunks at the same offset.
Expand Down

0 comments on commit 4a46217

Please sign in to comment.