From 458fa2aea843babfa51cae828e3a30abc3324be4 Mon Sep 17 00:00:00 2001 From: Matthias Einwag Date: Sat, 17 Oct 2020 21:22:19 -0700 Subject: [PATCH 01/18] WIP: Changes BytesMut not to rely on Vec + Arc This changes the internal BytesMut representation to allocate memory on its own, and not to use Arc + Vec under the hood. Doing this allows to have a single representation which is shareable (by being able to modify an embedded refcount) with zero-cost right from the start. Not representation changes have to be performed over the lifetime of `BytesMut`. Therefore `BytesMut::freeze()` is now also free. The change also opens the door for adding support for buffer pools or custom allocators, since an allocator is directly called. Adding this support would mostly mean having to pass an allocator to BytesMut which is used called instead of a fixed function. This would however be a different change. The change is WIP, and misses the following parts: - The buffer growth strategy is not exactly the same as in the old version. It needs to be reviewed what the best strategy is. - Some situations where an empty buffer (which has no backing storage) is manipulated might have bugs. This needs another round of review, and some additional tests. --- src/bytes_mut.rs | 502 ++++++++++++++++------------------------------- 1 file changed, 166 insertions(+), 336 deletions(-) diff --git a/src/bytes_mut.rs b/src/bytes_mut.rs index 16cb72c2b..ae1e73720 100644 --- a/src/bytes_mut.rs +++ b/src/bytes_mut.rs @@ -1,12 +1,12 @@ +use core::alloc::Layout; use core::iter::{FromIterator, Iterator}; -use core::mem::{self, ManuallyDrop}; +use core::mem::{self}; use core::ops::{Deref, DerefMut}; use core::ptr::{self, NonNull}; use core::{cmp, fmt, hash, isize, slice, usize}; use alloc::{ borrow::{Borrow, BorrowMut}, - boxed::Box, string::String, vec::Vec, }; @@ -63,50 +63,14 @@ pub struct BytesMut { cap: usize, data: *mut Shared, } - -// Thread-safe reference-counted container for the shared storage. This mostly -// the same as `core::sync::Arc` but without the weak counter. The ref counting -// fns are based on the ones found in `std`. -// -// The main reason to use `Shared` instead of `core::sync::Arc` is that it ends -// up making the overall code simpler and easier to reason about. This is due to -// some of the logic around setting `Inner::arc` and other ways the `arc` field -// is used. Using `Arc` ended up requiring a number of funky transmutes and -// other shenanigans to make it work. struct Shared { - vec: Vec, - original_capacity_repr: usize, - ref_count: AtomicUsize, + capacity: usize, + refcount: AtomicUsize, } -// Buffer storage strategy flags. -const KIND_ARC: usize = 0b0; -const KIND_VEC: usize = 0b1; -const KIND_MASK: usize = 0b1; - -// The max original capacity value. Any `Bytes` allocated with a greater initial -// capacity will default to this. -const MAX_ORIGINAL_CAPACITY_WIDTH: usize = 17; -// The original capacity algorithm will not take effect unless the originally -// allocated capacity was at least 1kb in size. -const MIN_ORIGINAL_CAPACITY_WIDTH: usize = 10; -// The original capacity is stored in powers of 2 starting at 1kb to a max of -// 64kb. Representing it as such requires only 3 bits of storage. -const ORIGINAL_CAPACITY_MASK: usize = 0b11100; -const ORIGINAL_CAPACITY_OFFSET: usize = 2; - -// When the storage is in the `Vec` representation, the pointer can be advanced -// at most this value. This is due to the amount of storage available to track -// the offset is usize - number of KIND bits and number of ORIGINAL_CAPACITY -// bits. -const VEC_POS_OFFSET: usize = 5; -const MAX_VEC_POS: usize = usize::MAX >> VEC_POS_OFFSET; -const NOT_VEC_POS_MASK: usize = 0b11111; - -#[cfg(target_pointer_width = "64")] -const PTR_WIDTH: usize = 64; -#[cfg(target_pointer_width = "32")] -const PTR_WIDTH: usize = 32; +/// An element in static memory which we use as guard pointer for +/// empty byte arrays +const GUARD: u8 = 0; /* * @@ -139,7 +103,25 @@ impl BytesMut { /// ``` #[inline] pub fn with_capacity(capacity: usize) -> BytesMut { - BytesMut::from_vec(Vec::with_capacity(capacity)) + unsafe { + if capacity == 0 { + BytesMut { + data: ptr::null_mut(), + ptr: NonNull::new_unchecked(&GUARD as *const u8 as *mut u8), + len: 0, + cap: 0, + } + } else { + let shared = Shared::allocate_for_size(capacity) + .expect("Allocation failures are currently not handled"); + BytesMut { + data: shared, + len: 0, + ptr: (*shared).data_ptr_mut(), + cap: (*shared).capacity, + } + } + } } /// Creates a new `BytesMut` with default capacity. @@ -236,26 +218,16 @@ impl BytesMut { /// th.join().unwrap(); /// ``` #[inline] - pub fn freeze(mut self) -> Bytes { - if self.kind() == KIND_VEC { - // Just re-use `Bytes` internal Vec vtable - unsafe { - let (off, _) = self.get_vec_pos(); - let vec = rebuild_vec(self.ptr.as_ptr(), self.len, self.cap, off); - mem::forget(self); - let mut b: Bytes = vec.into(); - b.advance(off); - b - } - } else { - debug_assert_eq!(self.kind(), KIND_ARC); - - let ptr = self.ptr.as_ptr(); - let len = self.len; - let data = AtomicPtr::new(self.data as _); - mem::forget(self); - unsafe { Bytes::with_vtable(ptr, len, data, &SHARED_VTABLE) } + pub fn freeze(self) -> Bytes { + if self.data.is_null() { + return Bytes::new(); } + + let ptr = self.ptr.as_ptr(); + let len = self.len; + let data = AtomicPtr::new(self.data as _); + mem::forget(self); + unsafe { Bytes::with_vtable(ptr, len, data, &SHARED_VTABLE) } } /// Splits the bytes into two at the given index. @@ -552,48 +524,7 @@ impl BytesMut { // be inline-able. Significant helps performance. fn reserve_inner(&mut self, additional: usize) { let len = self.len(); - let kind = self.kind(); - if kind == KIND_VEC { - // If there's enough free space before the start of the buffer, then - // just copy the data backwards and reuse the already-allocated - // space. - // - // Otherwise, since backed by a vector, use `Vec::reserve` - unsafe { - let (off, prev) = self.get_vec_pos(); - - // Only reuse space if we can satisfy the requested additional space. - if self.capacity() - self.len() + off >= additional { - // There's space - reuse it - // - // Just move the pointer back to the start after copying - // data back. - let base_ptr = self.ptr.as_ptr().offset(-(off as isize)); - ptr::copy(self.ptr.as_ptr(), base_ptr, self.len); - self.ptr = vptr(base_ptr); - self.set_vec_pos(0, prev); - - // Length stays constant, but since we moved backwards we - // can gain capacity back. - self.cap += off; - } else { - // No space - allocate more - let mut v = - ManuallyDrop::new(rebuild_vec(self.ptr.as_ptr(), self.len, self.cap, off)); - v.reserve(additional); - - // Update the info - self.ptr = vptr(v.as_mut_ptr().offset(off as isize)); - self.len = v.len() - off; - self.cap = v.capacity() - off; - } - - return; - } - } - - debug_assert_eq!(kind, KIND_ARC); let shared: *mut Shared = self.data as _; // Reserving involves abandoning the currently shared buffer and @@ -602,65 +533,60 @@ impl BytesMut { // Compute the new capacity let mut new_cap = len.checked_add(additional).expect("overflow"); - let original_capacity; - let original_capacity_repr; - unsafe { - original_capacity_repr = (*shared).original_capacity_repr; - original_capacity = original_capacity_from_repr(original_capacity_repr); + let mut original_capacity = 0; + + if !shared.is_null() { + original_capacity = (*shared).capacity; - // First, try to reclaim the buffer. This is possible if the current - // handle is the only outstanding handle pointing to the buffer. - if (*shared).is_unique() { - // This is the only handle to the buffer. It can be reclaimed. + // First, try to reclaim the buffer. This is possible if the current + // handle is the only outstanding handle pointing to the buffer. // However, before doing the work of copying data, check to make // sure that the vector has enough capacity. - let v = &mut (*shared).vec; - - if v.capacity() >= new_cap { + if (*shared).is_unique() && (*shared).capacity >= new_cap { // The capacity is sufficient, reclaim the buffer - let ptr = v.as_mut_ptr(); + let ptr = (*shared).data_ptr_mut(); - ptr::copy(self.ptr.as_ptr(), ptr, len); + ptr::copy(self.ptr.as_ptr(), ptr.as_ptr(), len); - self.ptr = vptr(ptr); - self.cap = v.capacity(); + self.ptr = ptr; + self.cap = (*shared).capacity; return; } - - // The vector capacity is not sufficient. The reserve request is - // asking for more than the initial buffer capacity. Allocate more - // than requested if `new_cap` is not much bigger than the current - // capacity. - // - // There are some situations, using `reserve_exact` that the - // buffer capacity could be below `original_capacity`, so do a - // check. - let double = v.capacity().checked_shl(1).unwrap_or(new_cap); - - new_cap = cmp::max(cmp::max(double, new_cap), original_capacity); - } else { - new_cap = cmp::max(new_cap, original_capacity); } - } - - // Create a new vector to store the data - let mut v = ManuallyDrop::new(Vec::with_capacity(new_cap)); - // Copy the bytes - v.extend_from_slice(self.as_ref()); + // The backing storage capacity is not sufficient. The reserve request is + // asking for more than the initial buffer capacity. Allocate more + // than requested if `new_cap` is not much bigger than the current + // capacity. + // + // There are some situations, using `reserve_exact` that the + // buffer capacity could be below `original_capacity`, so do a + // check. + let double = self.cap.checked_shl(1).unwrap_or(new_cap); + new_cap = cmp::max(cmp::max(double, new_cap), original_capacity); + + // Create a new backing storage + let shared = Shared::allocate_for_size(new_cap).expect("Allocations can not fail"); + + if self.len != 0 { + ptr::copy_nonoverlapping( + self.ptr.as_ptr(), + (*shared).data_ptr_mut().as_ptr(), + self.len, + ); + } - // Release the shared handle. This must be done *after* the bytes are - // copied. - unsafe { release_shared(shared) }; + // Release the shared handle. + // This must be done *after* the bytes are copied. + Shared::release_refcount(self.data); - // Update self - let data = (original_capacity_repr << ORIGINAL_CAPACITY_OFFSET) | KIND_VEC; - self.data = data as _; - self.ptr = vptr(v.as_mut_ptr()); - self.len = v.len(); - self.cap = v.capacity(); + // Update self + self.data = shared; + self.ptr = (*shared).data_ptr_mut(); + self.cap = (*shared).capacity; + } } /// Appends given bytes to this `BytesMut`. @@ -732,30 +658,6 @@ impl BytesMut { // private - // For now, use a `Vec` to manage the memory for us, but we may want to - // change that in the future to some alternate allocator strategy. - // - // Thus, we don't expose an easy way to construct from a `Vec` since an - // internal change could make a simple pattern (`BytesMut::from(vec)`) - // suddenly a lot more expensive. - #[inline] - pub(crate) fn from_vec(mut vec: Vec) -> BytesMut { - let ptr = vptr(vec.as_mut_ptr()); - let len = vec.len(); - let cap = vec.capacity(); - mem::forget(vec); - - let original_capacity_repr = original_capacity_to_repr(cap); - let data = (original_capacity_repr << ORIGINAL_CAPACITY_OFFSET) | KIND_VEC; - - BytesMut { - ptr, - len, - cap, - data: data as *mut _, - } - } - #[inline] fn as_slice(&self) -> &[u8] { unsafe { slice::from_raw_parts(self.ptr.as_ptr(), self.len) } @@ -763,7 +665,16 @@ impl BytesMut { #[inline] fn as_slice_mut(&mut self) -> &mut [u8] { - unsafe { slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) } + // TODO: The guard pointer is a constant pointer, which we misuse + // here as a mutable pointer. Since the length of the slice is empty it + // it might not be an issue to directly return it. However since Rusts + // undefined behavior rules are not super clear, we go safe and return + // an empty slice if the guard is detected. + if self.len == 0 { + &mut [] + } else { + unsafe { slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) } + } } unsafe fn set_start(&mut self, start: usize) { @@ -775,27 +686,6 @@ impl BytesMut { debug_assert!(start <= self.cap, "internal: set_start out of bounds"); - let kind = self.kind(); - - if kind == KIND_VEC { - // Setting the start when in vec representation is a little more - // complicated. First, we have to track how far ahead the - // "start" of the byte buffer from the beginning of the vec. We - // also have to ensure that we don't exceed the maximum shift. - let (mut pos, prev) = self.get_vec_pos(); - pos += start; - - if pos <= MAX_VEC_POS { - self.set_vec_pos(pos, prev); - } else { - // The repr must be upgraded to ARC. This will never happen - // on 64 bit systems and will only happen on 32 bit systems - // when shifting past 134,217,727 bytes. As such, we don't - // worry too much about performance here. - self.promote_to_shared(/*ref_count = */ 1); - } - } - // Updating the start of the view is setting `ptr` to point to the // new start and updating the `len` field to reflect the new length // of the view. @@ -811,7 +701,6 @@ impl BytesMut { } unsafe fn set_end(&mut self, end: usize) { - debug_assert_eq!(self.kind(), KIND_ARC); assert!(end <= self.cap, "set_end out of bounds"); self.cap = end; @@ -824,11 +713,7 @@ impl BytesMut { } let ptr = unsafe { self.ptr.as_ptr().offset(self.len as isize) }; - if ptr == other.ptr.as_ptr() - && self.kind() == KIND_ARC - && other.kind() == KIND_ARC - && self.data == other.data - { + if ptr == other.ptr.as_ptr() && self.data == other.data { // Contiguous blocks, just combine directly self.len += other.len; self.cap += other.cap; @@ -838,44 +723,6 @@ impl BytesMut { } } - #[inline] - fn kind(&self) -> usize { - self.data as usize & KIND_MASK - } - - unsafe fn promote_to_shared(&mut self, ref_cnt: usize) { - debug_assert_eq!(self.kind(), KIND_VEC); - debug_assert!(ref_cnt == 1 || ref_cnt == 2); - - let original_capacity_repr = - (self.data as usize & ORIGINAL_CAPACITY_MASK) >> ORIGINAL_CAPACITY_OFFSET; - - // The vec offset cannot be concurrently mutated, so there - // should be no danger reading it. - let off = (self.data as usize) >> VEC_POS_OFFSET; - - // First, allocate a new `Shared` instance containing the - // `Vec` fields. It's important to note that `ptr`, `len`, - // and `cap` cannot be mutated without having `&mut self`. - // This means that these fields will not be concurrently - // updated and since the buffer hasn't been promoted to an - // `Arc`, those three fields still are the components of the - // vector. - let shared = Box::new(Shared { - vec: rebuild_vec(self.ptr.as_ptr(), self.len, self.cap, off), - original_capacity_repr, - ref_count: AtomicUsize::new(ref_cnt), - }); - - let shared = Box::into_raw(shared); - - // The pointer should be aligned, so this assert should - // always succeed. - debug_assert_eq!(shared as usize & KIND_MASK, KIND_ARC); - - self.data = shared as _; - } - /// Makes an exact shallow clone of `self`. /// /// The kind of `self` doesn't matter, but this is unsafe @@ -884,29 +731,9 @@ impl BytesMut { /// two views into the same range. #[inline] unsafe fn shallow_clone(&mut self) -> BytesMut { - if self.kind() == KIND_ARC { - increment_shared(self.data); - ptr::read(self) - } else { - self.promote_to_shared(/*ref_count = */ 2); - ptr::read(self) - } - } - - #[inline] - unsafe fn get_vec_pos(&mut self) -> (usize, usize) { - debug_assert_eq!(self.kind(), KIND_VEC); - - let prev = self.data as usize; - (prev >> VEC_POS_OFFSET, prev) - } - - #[inline] - unsafe fn set_vec_pos(&mut self, pos: usize, prev: usize) { - debug_assert_eq!(self.kind(), KIND_VEC); - debug_assert!(pos <= MAX_VEC_POS); - - self.data = ((pos << VEC_POS_OFFSET) | (prev & NOT_VEC_POS_MASK)) as *mut _; + debug_assert!(!self.data.is_null()); + Shared::increment_refcount(self.data); + ptr::read(self) } #[inline] @@ -922,17 +749,8 @@ impl BytesMut { impl Drop for BytesMut { fn drop(&mut self) { - let kind = self.kind(); - - if kind == KIND_VEC { - unsafe { - let (off, _) = self.get_vec_pos(); - - // Vector storage, free the vector - let _ = rebuild_vec(self.ptr.as_ptr(), self.len, self.cap, off); - } - } else if kind == KIND_ARC { - unsafe { release_shared(self.data as _) }; + unsafe { + Shared::release_refcount(self.data); } } } @@ -1044,7 +862,9 @@ impl DerefMut for BytesMut { impl<'a> From<&'a [u8]> for BytesMut { fn from(src: &'a [u8]) -> BytesMut { - BytesMut::from_vec(src.to_vec()) + let mut bytes = BytesMut::with_capacity(src.len()); + bytes.extend_from_slice(src); + bytes } } @@ -1160,9 +980,6 @@ impl Extend for BytesMut { let (lower, _) = iter.size_hint(); self.reserve(lower); - // TODO: optimize - // 1. If self.kind() == KIND_VEC, use Vec::extend - // 2. Make `reserve` inline-able for b in iter { self.reserve(1); self.put_u8(b); @@ -1181,7 +998,9 @@ impl<'a> Extend<&'a u8> for BytesMut { impl FromIterator for BytesMut { fn from_iter>(into_iter: T) -> Self { - BytesMut::from_vec(Vec::from_iter(into_iter)) + let mut bytes = BytesMut::new(); + bytes.extend(into_iter); + bytes } } @@ -1197,44 +1016,21 @@ impl<'a> FromIterator<&'a u8> for BytesMut { * */ -unsafe fn increment_shared(ptr: *mut Shared) { - let old_size = (*ptr).ref_count.fetch_add(1, Ordering::Relaxed); +impl Shared { + unsafe fn allocate_for_size(size: usize) -> Result<*mut Shared, ()> { + let combined_layout = Shared::layout_for_size(size)?; + let alloc_res = alloc::alloc::alloc(combined_layout) as *mut Shared; + if alloc_res.is_null() { + return Err(()); + } - if old_size > isize::MAX as usize { - crate::abort(); - } -} + let result = &mut *alloc_res; + result.capacity = size; + result.refcount.store(1, Ordering::Relaxed); -unsafe fn release_shared(ptr: *mut Shared) { - // `Shared` storage... follow the drop steps from Arc. - if (*ptr).ref_count.fetch_sub(1, Ordering::Release) != 1 { - return; - } - - // This fence is needed to prevent reordering of use of the data and - // deletion of the data. Because it is marked `Release`, the decreasing - // of the reference count synchronizes with this `Acquire` fence. This - // means that use of the data happens before decreasing the reference - // count, which happens before this fence, which happens before the - // deletion of the data. - // - // As explained in the [Boost documentation][1], - // - // > It is important to enforce any possible access to the object in one - // > thread (through an existing reference) to *happen before* deleting - // > the object in a different thread. This is achieved by a "release" - // > operation after dropping a reference (any access to the object - // > through this reference must obviously happened before), and an - // > "acquire" operation before deleting the object. - // - // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) - atomic::fence(Ordering::Acquire); - - // Drop the data - Box::from_raw(ptr); -} + Ok(alloc_res) + } -impl Shared { fn is_unique(&self) -> bool { // The goal is to check if the current handle is the only handle // that currently has access to the buffer. This is done by @@ -1246,24 +1042,66 @@ impl Shared { // are ordered before the `ref_count` is decremented. As such, // this `Acquire` will guarantee that those mutations are // visible to the current thread. - self.ref_count.load(Ordering::Acquire) == 1 + self.refcount.load(Ordering::Acquire) == 1 } -} -fn original_capacity_to_repr(cap: usize) -> usize { - let width = PTR_WIDTH - ((cap >> MIN_ORIGINAL_CAPACITY_WIDTH).leading_zeros() as usize); - cmp::min( - width, - MAX_ORIGINAL_CAPACITY_WIDTH - MIN_ORIGINAL_CAPACITY_WIDTH, - ) -} + fn layout_for_size(size: usize) -> Result { + let layout = Layout::new::(); + let total_size = layout.size().checked_add(size).ok_or(())?; + let combined_layout = + Layout::from_size_align(total_size, layout.align()).expect("Error calculating layout"); + Ok(combined_layout) + } + + unsafe fn increment_refcount(self_ptr: *mut Self) { + let old_size = (*self_ptr).refcount.fetch_add(1, Ordering::Relaxed); -fn original_capacity_from_repr(repr: usize) -> usize { - if repr == 0 { - return 0; + if old_size > isize::MAX as usize { + crate::abort(); + } } - 1 << (repr + (MIN_ORIGINAL_CAPACITY_WIDTH - 1)) + unsafe fn release_refcount(self_ptr: *mut Self) { + if self_ptr.is_null() { + return; + } + + if (*self_ptr).refcount.fetch_sub(1, Ordering::Release) != 1 { + return; + } + + // The shared state should be released + + // This fence is needed to prevent reordering of use of the data and + // deletion of the data. Because it is marked `Release`, the decreasing + // of the reference count synchronizes with this `Acquire` fence. This + // means that use of the data happens before decreasing the reference + // count, which happens before this fence, which happens before the + // deletion of the data. + // + // As explained in the [Boost documentation][1], + // + // > It is important to enforce any possible access to the object in one + // > thread (through an existing reference) to *happen before* deleting + // > the object in a different thread. This is achieved by a "release" + // > operation after dropping a reference (any access to the object + // > through this reference must obviously happened before), and an + // > "acquire" operation before deleting the object. + // + // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) + atomic::fence(Ordering::Acquire); + + alloc::alloc::dealloc( + self_ptr as *mut u8, + Self::layout_for_size((*self_ptr).capacity).unwrap(), + ) + } + + unsafe fn data_ptr_mut(&mut self) -> NonNull { + let mut end_addr = self as *const Shared as usize; + end_addr += std::mem::size_of::(); + NonNull::new_unchecked(end_addr as *mut u8) + } } /* @@ -1484,14 +1322,6 @@ fn vptr(ptr: *mut u8) -> NonNull { } } -unsafe fn rebuild_vec(ptr: *mut u8, mut len: usize, mut cap: usize, off: usize) -> Vec { - let ptr = ptr.offset(-(off as isize)); - len += off; - cap += off; - - Vec::from_raw_parts(ptr, len, cap) -} - // ===== impl SharedVtable ===== static SHARED_VTABLE: Vtable = Vtable { @@ -1501,7 +1331,7 @@ static SHARED_VTABLE: Vtable = Vtable { unsafe fn shared_v_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes { let shared = data.load(Ordering::Relaxed) as *mut Shared; - increment_shared(shared); + Shared::increment_refcount(shared); let data = AtomicPtr::new(shared as _); Bytes::with_vtable(ptr, len, data, &SHARED_VTABLE) @@ -1509,7 +1339,7 @@ unsafe fn shared_v_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> By unsafe fn shared_v_drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize) { data.with_mut(|shared| { - release_shared(*shared as *mut Shared); + Shared::release_refcount(*shared as *mut Shared); }); } From 3d12491a1bf980126529aa3a928d5d9329b8d4df Mon Sep 17 00:00:00 2001 From: Matthias Einwag Date: Sun, 18 Oct 2020 10:04:21 -0700 Subject: [PATCH 02/18] Fix splitting empty BytesMut, and use NonNull::dangling --- src/bytes_mut.rs | 28 +++++++++++++--------------- tests/test_bytes.rs | 18 ++++++++++++++++++ 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/src/bytes_mut.rs b/src/bytes_mut.rs index ae1e73720..ad85304b2 100644 --- a/src/bytes_mut.rs +++ b/src/bytes_mut.rs @@ -68,10 +68,6 @@ struct Shared { refcount: AtomicUsize, } -/// An element in static memory which we use as guard pointer for -/// empty byte arrays -const GUARD: u8 = 0; - /* * * ===== BytesMut ===== @@ -107,7 +103,7 @@ impl BytesMut { if capacity == 0 { BytesMut { data: ptr::null_mut(), - ptr: NonNull::new_unchecked(&GUARD as *const u8 as *mut u8), + ptr: NonNull::dangling(), len: 0, cap: 0, } @@ -338,6 +334,11 @@ impl BytesMut { self.len(), ); + if self.data.is_null() { + debug_assert_eq!(at, 0, "Empty Bytes is only splittable at the front"); + return BytesMut::new(); + } + unsafe { let mut other = self.shallow_clone(); other.set_end(at); @@ -665,16 +666,13 @@ impl BytesMut { #[inline] fn as_slice_mut(&mut self) -> &mut [u8] { - // TODO: The guard pointer is a constant pointer, which we misuse - // here as a mutable pointer. Since the length of the slice is empty it - // it might not be an issue to directly return it. However since Rusts - // undefined behavior rules are not super clear, we go safe and return - // an empty slice if the guard is detected. - if self.len == 0 { - &mut [] - } else { - unsafe { slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) } - } + // TODO: + // The pointer is always valid, but might not backed by real storage + // in case of an empty BytesMut. + // This shouldn't matter due to the array length being 0 and the pointer + // never being accessed. However it is still not fully clear if this is + // valid within Rusts's UB rules. + unsafe { slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) } } unsafe fn set_start(&mut self, start: usize) { diff --git a/tests/test_bytes.rs b/tests/test_bytes.rs index 6b106a6bc..0a0ee41bf 100644 --- a/tests/test_bytes.rs +++ b/tests/test_bytes.rs @@ -526,6 +526,24 @@ fn reserve_in_arc_nonunique_does_not_overallocate() { assert_eq!(2001, bytes.capacity()); } +#[test] +fn access_empty_bytes() { + let mut bytes = BytesMut::new(); + assert_eq!(0, bytes.as_ref().len()); + assert_eq!(0, bytes.as_mut().len()); +} + +#[test] +fn split_empty_bytes() { + let mut bytes = BytesMut::new(); + let bytes1 = bytes.split(); + for b in &mut [bytes, bytes1] { + assert_eq!(0, b.as_ref().len()); + assert_eq!(0, b.as_mut().len()); + assert_eq!(0, b.capacity()); + } +} + #[test] fn extend_mut() { let mut bytes = BytesMut::with_capacity(0); From 32c2ae0afc3a37142a968c340ef656d3718a222a Mon Sep 17 00:00:00 2001 From: Matthias Einwag Date: Sun, 18 Oct 2020 10:09:31 -0700 Subject: [PATCH 03/18] Add a remark about unsplitting empty Bytes --- src/bytes_mut.rs | 3 +++ tests/test_bytes.rs | 11 +++++++++++ 2 files changed, 14 insertions(+) diff --git a/src/bytes_mut.rs b/src/bytes_mut.rs index ad85304b2..cfef6ccc0 100644 --- a/src/bytes_mut.rs +++ b/src/bytes_mut.rs @@ -710,6 +710,9 @@ impl BytesMut { return Ok(()); } + // Note that the following comparison will also yield true for 2 empty + // BytesMut. This will however be fine - it will simply combine them + // into a single empty BytesMut. let ptr = unsafe { self.ptr.as_ptr().offset(self.len as isize) }; if ptr == other.ptr.as_ptr() && self.data == other.data { // Contiguous blocks, just combine directly diff --git a/tests/test_bytes.rs b/tests/test_bytes.rs index 0a0ee41bf..5aeeb70a7 100644 --- a/tests/test_bytes.rs +++ b/tests/test_bytes.rs @@ -544,6 +544,17 @@ fn split_empty_bytes() { } } +#[test] +fn unsplit_empty_bytes() { + let mut bytes = BytesMut::new(); + let bytes1 = bytes.split(); + bytes.unsplit(bytes1); + + assert_eq!(0, bytes.as_ref().len()); + assert_eq!(0, bytes.as_mut().len()); + assert_eq!(0, bytes.capacity()); +} + #[test] fn extend_mut() { let mut bytes = BytesMut::with_capacity(0); From d4d5bb29ad4d93ca631f5d1667d48d38672364a0 Mon Sep 17 00:00:00 2001 From: Matthias Einwag Date: Sun, 18 Oct 2020 10:26:17 -0700 Subject: [PATCH 04/18] Add tests around frozen empty BytesMut --- tests/test_bytes.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/test_bytes.rs b/tests/test_bytes.rs index 5aeeb70a7..942c4e5a1 100644 --- a/tests/test_bytes.rs +++ b/tests/test_bytes.rs @@ -531,6 +531,11 @@ fn access_empty_bytes() { let mut bytes = BytesMut::new(); assert_eq!(0, bytes.as_ref().len()); assert_eq!(0, bytes.as_mut().len()); + + let bytes = bytes.freeze(); + assert_eq!(0, bytes[..].len()); + let bytes2 = bytes.clone(); + assert_eq!(0, bytes2[..].len()); } #[test] From 3bd5735fe4e6eb20020075a111d1b824de309296 Mon Sep 17 00:00:00 2001 From: Matthias Einwag Date: Sun, 18 Oct 2020 10:31:21 -0700 Subject: [PATCH 05/18] Do not propagate Layout errors --- src/bytes_mut.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/bytes_mut.rs b/src/bytes_mut.rs index cfef6ccc0..3da3d96f8 100644 --- a/src/bytes_mut.rs +++ b/src/bytes_mut.rs @@ -109,7 +109,7 @@ impl BytesMut { } } else { let shared = Shared::allocate_for_size(capacity) - .expect("Allocation failures are currently not handled"); + .expect("Fallible allocations are not yet supported"); BytesMut { data: shared, len: 0, @@ -569,7 +569,8 @@ impl BytesMut { new_cap = cmp::max(cmp::max(double, new_cap), original_capacity); // Create a new backing storage - let shared = Shared::allocate_for_size(new_cap).expect("Allocations can not fail"); + let shared = Shared::allocate_for_size(new_cap) + .expect("Fallible allocations are not yet supported"); if self.len != 0 { ptr::copy_nonoverlapping( @@ -1019,7 +1020,7 @@ impl<'a> FromIterator<&'a u8> for BytesMut { impl Shared { unsafe fn allocate_for_size(size: usize) -> Result<*mut Shared, ()> { - let combined_layout = Shared::layout_for_size(size)?; + let combined_layout = Shared::layout_for_size(size); let alloc_res = alloc::alloc::alloc(combined_layout) as *mut Shared; if alloc_res.is_null() { return Err(()); @@ -1046,12 +1047,15 @@ impl Shared { self.refcount.load(Ordering::Acquire) == 1 } - fn layout_for_size(size: usize) -> Result { + fn layout_for_size(size: usize) -> Layout { let layout = Layout::new::(); - let total_size = layout.size().checked_add(size).ok_or(())?; + let total_size = layout + .size() + .checked_add(size) + .expect("Overflow during layout calculation"); let combined_layout = Layout::from_size_align(total_size, layout.align()).expect("Error calculating layout"); - Ok(combined_layout) + combined_layout } unsafe fn increment_refcount(self_ptr: *mut Self) { @@ -1094,7 +1098,7 @@ impl Shared { alloc::alloc::dealloc( self_ptr as *mut u8, - Self::layout_for_size((*self_ptr).capacity).unwrap(), + Self::layout_for_size((*self_ptr).capacity), ) } From 542f111282d76777f2e8135396157aef1c03ecb6 Mon Sep 17 00:00:00 2001 From: Matthias Einwag Date: Sun, 18 Oct 2020 10:40:45 -0700 Subject: [PATCH 06/18] Improve recycling test --- tests/test_bytes.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_bytes.rs b/tests/test_bytes.rs index 942c4e5a1..a21c7709b 100644 --- a/tests/test_bytes.rs +++ b/tests/test_bytes.rs @@ -481,12 +481,14 @@ fn reserve_vec_recycling() { let mut bytes = BytesMut::with_capacity(16); assert_eq!(bytes.capacity(), 16); let addr = bytes.as_ptr() as usize; - bytes.put("0123456789012345".as_bytes()); + bytes.put("0123456789abcdef".as_bytes()); assert_eq!(bytes.as_ptr() as usize, addr); bytes.advance(10); + assert_eq!("abcdef".as_bytes(), bytes); assert_eq!(bytes.capacity(), 6); bytes.reserve(8); assert_eq!(bytes.capacity(), 16); + assert_eq!("abcdef".as_bytes(), bytes); assert_eq!(bytes.as_ptr() as usize, addr); } From 626edb5006d699841deb287c25dd9752ffa9ef6f Mon Sep 17 00:00:00 2001 From: Matthias Einwag Date: Sun, 18 Oct 2020 10:47:25 -0700 Subject: [PATCH 07/18] Fix split_off for empty BytesMut --- src/bytes_mut.rs | 8 +++++++- tests/test_bytes.rs | 5 ++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/bytes_mut.rs b/src/bytes_mut.rs index 3da3d96f8..025f8975f 100644 --- a/src/bytes_mut.rs +++ b/src/bytes_mut.rs @@ -260,6 +260,12 @@ impl BytesMut { at, self.capacity(), ); + + if self.data.is_null() { + debug_assert_eq!(at, 0, "Empty Bytes is only splittable at the front"); + return BytesMut::new(); + } + unsafe { let mut other = self.shallow_clone(); other.set_start(at); @@ -733,7 +739,6 @@ impl BytesMut { /// two views into the same range. #[inline] unsafe fn shallow_clone(&mut self) -> BytesMut { - debug_assert!(!self.data.is_null()); Shared::increment_refcount(self.data); ptr::read(self) } @@ -1059,6 +1064,7 @@ impl Shared { } unsafe fn increment_refcount(self_ptr: *mut Self) { + debug_assert!(!self_ptr.is_null()); let old_size = (*self_ptr).refcount.fetch_add(1, Ordering::Relaxed); if old_size > isize::MAX as usize { diff --git a/tests/test_bytes.rs b/tests/test_bytes.rs index a21c7709b..eb7e01ad2 100644 --- a/tests/test_bytes.rs +++ b/tests/test_bytes.rs @@ -544,7 +544,10 @@ fn access_empty_bytes() { fn split_empty_bytes() { let mut bytes = BytesMut::new(); let bytes1 = bytes.split(); - for b in &mut [bytes, bytes1] { + let bytes2 = bytes.split_off(0); + let bytes3 = bytes.split_to(0); + + for b in &mut [bytes, bytes1, bytes2, bytes3] { assert_eq!(0, b.as_ref().len()); assert_eq!(0, b.as_mut().len()); assert_eq!(0, b.capacity()); From ef92196134b6d358b7530136fa246aac6e957d60 Mon Sep 17 00:00:00 2001 From: Matthias Einwag Date: Sun, 18 Oct 2020 11:34:39 -0700 Subject: [PATCH 08/18] Let the memory allocation strategy be closer to what std and previous versions did Also add a lot more comments around it --- src/bytes_mut.rs | 38 +++++++++++++++++++++++++++++++------- tests/test_bytes.rs | 12 +++++++++++- 2 files changed, 42 insertions(+), 8 deletions(-) diff --git a/src/bytes_mut.rs b/src/bytes_mut.rs index 025f8975f..e2dfd9294 100644 --- a/src/bytes_mut.rs +++ b/src/bytes_mut.rs @@ -68,6 +68,12 @@ struct Shared { refcount: AtomicUsize, } +/// The max original capacity value which is reused when a `BytesMut` which +/// had been previously split into a smaller size is resized. +/// The `BytesMut` will try to regain the original capacity that was available +/// before the `split` - but the capacity will be bounded by this value. +const MAX_ORIGINAL_CAPACITY: usize = 64 * 1024; + /* * * ===== BytesMut ===== @@ -535,7 +541,7 @@ impl BytesMut { let shared: *mut Shared = self.data as _; // Reserving involves abandoning the currently shared buffer and - // allocating a new vector with the requested capacity. + // allocating a new shared state with the requested capacity. // // Compute the new capacity let mut new_cap = len.checked_add(additional).expect("overflow"); @@ -549,7 +555,7 @@ impl BytesMut { // First, try to reclaim the buffer. This is possible if the current // handle is the only outstanding handle pointing to the buffer. // However, before doing the work of copying data, check to make - // sure that the vector has enough capacity. + // sure that the old shared state has enough capacity. if (*shared).is_unique() && (*shared).capacity >= new_cap { // The capacity is sufficient, reclaim the buffer let ptr = (*shared).data_ptr_mut(); @@ -568,16 +574,34 @@ impl BytesMut { // than requested if `new_cap` is not much bigger than the current // capacity. // - // There are some situations, using `reserve_exact` that the - // buffer capacity could be below `original_capacity`, so do a - // check. + // The reservation strategy follows what `std` does: + // https://github.com/rust-lang/rust/blob/834821e3b666f77bb7caf1ed88ed662c395fbc11/library/alloc/src/raw_vec.rs#L401-L417 + // + // - The old capacity is at least doubled let double = self.cap.checked_shl(1).unwrap_or(new_cap); - new_cap = cmp::max(cmp::max(double, new_cap), original_capacity); + // - If this is not sufficient for what the user asked for, the higher + // value is taken. + new_cap = cmp::max(double, new_cap); + // - We want to allocate at least 8 bytes, because tiny allocations + // are not efficient + new_cap = cmp::max(8, new_cap); + // - If we had an existing backing storage, we want to obtain at + // least the same capacity again that this one had. This is specific + // to BytesMut and not found in Vecs. The use-case is that BytesMut + // which are used over time to split off chunks and then gain new + // input data do not oscillate too much in size. + // This functionality is however bounded by `MAX_ORIGINAL_CAPACITY`. + // A `BytesMut` will not automatically reserve more capacity than this. + // TODO: Do we really want to keep this? This seems like a footgun + // where people splitting `BytesMut` and reusing them for different + // purposes might accidentally keep around big memory allocations. + new_cap = cmp::max(cmp::min(original_capacity, MAX_ORIGINAL_CAPACITY), new_cap); // Create a new backing storage let shared = Shared::allocate_for_size(new_cap) .expect("Fallible allocations are not yet supported"); + // Copy data over into the new storage if self.len != 0 { ptr::copy_nonoverlapping( self.ptr.as_ptr(), @@ -1327,7 +1351,7 @@ impl PartialEq for BytesMut { fn vptr(ptr: *mut u8) -> NonNull { if cfg!(debug_assertions) { - NonNull::new(ptr).expect("Vec pointer should be non-null") + NonNull::new(ptr).expect("data pointer should be non-null") } else { unsafe { NonNull::new_unchecked(ptr) } } diff --git a/tests/test_bytes.rs b/tests/test_bytes.rs index eb7e01ad2..0623b2b90 100644 --- a/tests/test_bytes.rs +++ b/tests/test_bytes.rs @@ -442,8 +442,18 @@ fn reserve_growth() { bytes.put("hello world".as_bytes()); let _ = bytes.split(); + // There are 53 bytes left in `bytes` after this + // We expect a doubling of capacity to 106 bytes.reserve(65); - assert_eq!(bytes.capacity(), 128); + assert_eq!(bytes.capacity(), 106); +} + + +#[test] +fn reserve_at_least_8_bytes() { + let mut bytes = BytesMut::new(); + bytes.put("hello".as_bytes()); + assert_eq!(bytes.capacity(), 8); } #[test] From ab0a0d02a9ac903f43dd240f35c0769ab1b544d4 Mon Sep 17 00:00:00 2001 From: Matthias Einwag Date: Sun, 18 Oct 2020 11:37:41 -0700 Subject: [PATCH 09/18] Fix rustfmt --- tests/test_bytes.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/test_bytes.rs b/tests/test_bytes.rs index 0623b2b90..895f25aec 100644 --- a/tests/test_bytes.rs +++ b/tests/test_bytes.rs @@ -443,12 +443,11 @@ fn reserve_growth() { let _ = bytes.split(); // There are 53 bytes left in `bytes` after this - // We expect a doubling of capacity to 106 + // We expect a doubling of capacity to 106 bytes.reserve(65); assert_eq!(bytes.capacity(), 106); } - #[test] fn reserve_at_least_8_bytes() { let mut bytes = BytesMut::new(); From e6d803893cd9a651e248fbff938f47af5a1f73a3 Mon Sep 17 00:00:00 2001 From: Matthias Einwag Date: Sun, 18 Oct 2020 11:42:03 -0700 Subject: [PATCH 10/18] Remove std reference --- src/bytes_mut.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bytes_mut.rs b/src/bytes_mut.rs index e2dfd9294..9158fe6ae 100644 --- a/src/bytes_mut.rs +++ b/src/bytes_mut.rs @@ -1134,7 +1134,7 @@ impl Shared { unsafe fn data_ptr_mut(&mut self) -> NonNull { let mut end_addr = self as *const Shared as usize; - end_addr += std::mem::size_of::(); + end_addr += mem::size_of::(); NonNull::new_unchecked(end_addr as *mut u8) } } From 0caf28b73fdd449ba20305f89541088618f07bfa Mon Sep 17 00:00:00 2001 From: Matthias Einwag Date: Sun, 18 Oct 2020 12:02:07 -0700 Subject: [PATCH 11/18] Fix serde feature --- src/bytes_mut.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/bytes_mut.rs b/src/bytes_mut.rs index 9158fe6ae..c2a77b851 100644 --- a/src/bytes_mut.rs +++ b/src/bytes_mut.rs @@ -690,6 +690,17 @@ impl BytesMut { // private + // This method is only used in combination with the serde feature at the moment. + // BytesMut doesn't expose a public `From>` implementation in order + // not provide the wrong impression that the conversion is free. + // Even if one implementation `BytesMut` would have a free conversion, + // a future change of the allocation strategy might remove this guarantee. + #[inline] + #[cfg(feature = "serde")] + pub(crate) fn from_vec(vec: Vec) -> BytesMut { + BytesMut::from(&vec[..]) + } + #[inline] fn as_slice(&self) -> &[u8] { unsafe { slice::from_raw_parts(self.ptr.as_ptr(), self.len) } From 45953576ab4afa0093508dac4166715290702673 Mon Sep 17 00:00:00 2001 From: Matthias Einwag Date: Sun, 18 Oct 2020 15:44:10 -0700 Subject: [PATCH 12/18] Document how `BytesMut::freeze()` handles excess capacity --- src/bytes_mut.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/bytes_mut.rs b/src/bytes_mut.rs index c2a77b851..baae87bb7 100644 --- a/src/bytes_mut.rs +++ b/src/bytes_mut.rs @@ -201,6 +201,15 @@ impl BytesMut { /// referenced by the handle will no longer be mutated. Once the conversion /// is done, the handle can be cloned and shared across threads. /// + /// Excess capacity which was available in the `BytesMut` object will not + /// get freed in this conversion. It will only get freed if all references + /// to the returned `Bytes` will get dropped. + /// + /// If freeing excess capacity is important, users can create an + /// exact-sized `Bytes` instance by using `BytesMut::with_capacity()` and + /// copying all data over before calling `freeze()`. + /// + /// /// # Examples /// /// ``` From f45a0bf5b8b189bc34ec7eaea6f7897e074568e0 Mon Sep 17 00:00:00 2001 From: Matthias Einwag Date: Sun, 18 Oct 2020 15:50:30 -0700 Subject: [PATCH 13/18] Try fix loom --- src/bytes_mut.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bytes_mut.rs b/src/bytes_mut.rs index baae87bb7..f594a7b7f 100644 --- a/src/bytes_mut.rs +++ b/src/bytes_mut.rs @@ -1077,7 +1077,7 @@ impl Shared { let result = &mut *alloc_res; result.capacity = size; - result.refcount.store(1, Ordering::Relaxed); + result.refcount = AtomicUsize::new(1); Ok(alloc_res) } From cffa76816cd82c7cded34390473862f1bce719b8 Mon Sep 17 00:00:00 2001 From: Matthias Einwag Date: Sun, 18 Oct 2020 15:55:03 -0700 Subject: [PATCH 14/18] Improve correctness of shared state initialization --- src/bytes_mut.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/bytes_mut.rs b/src/bytes_mut.rs index f594a7b7f..80d45a02f 100644 --- a/src/bytes_mut.rs +++ b/src/bytes_mut.rs @@ -1075,9 +1075,8 @@ impl Shared { return Err(()); } - let result = &mut *alloc_res; - result.capacity = size; - result.refcount = AtomicUsize::new(1); + ptr::write(&mut (*alloc_res).capacity, size); + ptr::write(&mut (*alloc_res).refcount, AtomicUsize::new(1)); Ok(alloc_res) } From 43d9ddcb40cab4da4b922c2abb9d74ae2dd02d7f Mon Sep 17 00:00:00 2001 From: Matthias Einwag Date: Mon, 19 Oct 2020 12:49:28 -0700 Subject: [PATCH 15/18] Apply suggestions from code review Co-authored-by: Cameron Bytheway --- src/bytes_mut.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/bytes_mut.rs b/src/bytes_mut.rs index 80d45a02f..a84084118 100644 --- a/src/bytes_mut.rs +++ b/src/bytes_mut.rs @@ -1,6 +1,6 @@ use core::alloc::Layout; use core::iter::{FromIterator, Iterator}; -use core::mem::{self}; +use core::mem; use core::ops::{Deref, DerefMut}; use core::ptr::{self, NonNull}; use core::{cmp, fmt, hash, isize, slice, usize}; @@ -702,7 +702,7 @@ impl BytesMut { // This method is only used in combination with the serde feature at the moment. // BytesMut doesn't expose a public `From>` implementation in order // not provide the wrong impression that the conversion is free. - // Even if one implementation `BytesMut` would have a free conversion, + // Even if one implementation of `BytesMut` would have a free conversion, // a future change of the allocation strategy might remove this guarantee. #[inline] #[cfg(feature = "serde")] From 800c527e27f7ccaf7699a9c6c21c5db72bfc64f2 Mon Sep 17 00:00:00 2001 From: Matthias Einwag Date: Tue, 20 Oct 2020 15:13:34 -0700 Subject: [PATCH 16/18] Add more docs. Minimize unsafe blocks --- src/bytes_mut.rs | 176 +++++++++++++++++++++++++---------------------- 1 file changed, 92 insertions(+), 84 deletions(-) diff --git a/src/bytes_mut.rs b/src/bytes_mut.rs index a84084118..73d0509c8 100644 --- a/src/bytes_mut.rs +++ b/src/bytes_mut.rs @@ -105,23 +105,24 @@ impl BytesMut { /// ``` #[inline] pub fn with_capacity(capacity: usize) -> BytesMut { - unsafe { - if capacity == 0 { - BytesMut { - data: ptr::null_mut(), - ptr: NonNull::dangling(), - len: 0, - cap: 0, - } - } else { - let shared = Shared::allocate_for_size(capacity) - .expect("Fallible allocations are not yet supported"); - BytesMut { - data: shared, - len: 0, - ptr: (*shared).data_ptr_mut(), - cap: (*shared).capacity, - } + if capacity == 0 { + BytesMut { + data: ptr::null_mut(), + ptr: NonNull::dangling(), + len: 0, + cap: 0, + } + } else { + let shared = Shared::allocate_for_size(capacity) + .expect("Fallible allocations are not yet supported"); + // Safety: The object from the allocation is always a valid `Shared` + // object. If the allocation would fail, the method would panic. + let (ptr, cap) = unsafe { ((*shared).data_ptr_mut(), (*shared).capacity) }; + BytesMut { + data: shared, + len: 0, + ptr, + cap, } } } @@ -605,6 +606,14 @@ impl BytesMut { // where people splitting `BytesMut` and reusing them for different // purposes might accidentally keep around big memory allocations. new_cap = cmp::max(cmp::min(original_capacity, MAX_ORIGINAL_CAPACITY), new_cap); + // The buffer length may not exceed `std::isize::MAX`. + // This is checked by std containers like `Vec`, but not here. + // See also https://doc.rust-lang.org/std/primitive.pointer.html#safety-2 + // The computed offset, in bytes, cannot overflow an isize. + assert!( + new_cap <= std::isize::MAX as usize, + "Maximum allocation size" + ); // Create a new backing storage let shared = Shared::allocate_for_size(new_cap) @@ -712,17 +721,19 @@ impl BytesMut { #[inline] fn as_slice(&self) -> &[u8] { + // Safety: This always stores a valid pointer. + // If `BytesMut` is not allocated, this points to NonNull::dangling, + // which is ok to use for this purpose: + // https://doc.rust-lang.org/std/slice/fn.from_raw_parts.html#safety unsafe { slice::from_raw_parts(self.ptr.as_ptr(), self.len) } } #[inline] fn as_slice_mut(&mut self) -> &mut [u8] { - // TODO: - // The pointer is always valid, but might not backed by real storage - // in case of an empty BytesMut. - // This shouldn't matter due to the array length being 0 and the pointer - // never being accessed. However it is still not fully clear if this is - // valid within Rusts's UB rules. + // Safety: This always stores a valid pointer. + // If `BytesMut` is not allocated, this points to NonNull::dangling, + // which is ok to use for this purpose: + // https://doc.rust-lang.org/std/slice/fn.from_raw_parts.html#safety unsafe { slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) } } @@ -1068,19 +1079,31 @@ impl<'a> FromIterator<&'a u8> for BytesMut { */ impl Shared { - unsafe fn allocate_for_size(size: usize) -> Result<*mut Shared, ()> { + /// Allocates a `Shared` state for a buffer of the given capacity + /// + /// This method will allocate the reference count of `Shared` to 1, + /// and set the provided capacity on the `Shared` structure. + fn allocate_for_size(size: usize) -> Result<*mut Shared, ()> { let combined_layout = Shared::layout_for_size(size); - let alloc_res = alloc::alloc::alloc(combined_layout) as *mut Shared; - if alloc_res.is_null() { - return Err(()); - } + // Safety: + // This allocates and initializes the `Shared` structure + let alloc_res = unsafe { + let alloc_res = alloc::alloc::alloc(combined_layout) as *mut Shared; + if alloc_res.is_null() { + return Err(()); + } + + ptr::write(&mut (*alloc_res).capacity, size); + ptr::write(&mut (*alloc_res).refcount, AtomicUsize::new(1)); - ptr::write(&mut (*alloc_res).capacity, size); - ptr::write(&mut (*alloc_res).refcount, AtomicUsize::new(1)); + alloc_res + }; Ok(alloc_res) } + /// Returns true if this `Shared` structure is only referenced by a single + /// `BytesMut` instance. fn is_unique(&self) -> bool { // The goal is to check if the current handle is the only handle // that currently has access to the buffer. This is done by @@ -1106,8 +1129,29 @@ impl Shared { combined_layout } + /// Increments the reference count for `Shared` objects + /// + /// # Safety + /// + /// This method is only safe to be called on null pointers and pointers + /// returned from `Shared::allocate_for_size` which which have not been + /// freed using `release_refcount`. unsafe fn increment_refcount(self_ptr: *mut Self) { debug_assert!(!self_ptr.is_null()); + // This uses `Relaxed` ordering, just like the `Arc::clone` implementation. + // The following documentation is directly taken from `Arc::clone`: + // + // Using a relaxed ordering is alright here, as knowledge of the + // original reference prevents other threads from erroneously deleting + // the object. + // + // As explained in the [Boost documentation][1], Increasing the + // reference counter can always be done with memory_order_relaxed: New + // references to an object can only be formed from an existing + // reference, and passing an existing reference from one thread to + // another must already provide any required synchronization. + // + // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) let old_size = (*self_ptr).refcount.fetch_add(1, Ordering::Relaxed); if old_size > isize::MAX as usize { @@ -1115,6 +1159,13 @@ impl Shared { } } + /// Decrements the reference count for `Shared` objects + /// + /// # Safety + /// + /// This method is only safe to be called on null pointers and pointers + /// returned from `Shared::allocate_for_size` which which have not been + /// freed using `release_refcount`. unsafe fn release_refcount(self_ptr: *mut Self) { if self_ptr.is_null() { return; @@ -1151,64 +1202,21 @@ impl Shared { ) } + /// Returns the start of the data section of this buffer which starts behind + /// the `Shared` header. + /// + /// # Safety + /// + /// This method is only safe to be called on null pointers and pointers + /// returned from `Shared::allocate_for_size` which which have not been + /// freed using `release_refcount`. unsafe fn data_ptr_mut(&mut self) -> NonNull { - let mut end_addr = self as *const Shared as usize; - end_addr += mem::size_of::(); - NonNull::new_unchecked(end_addr as *mut u8) + let shared_addr = self as *mut Self; + let data_addr = shared_addr.offset(1) as *mut u8; + NonNull::new_unchecked(data_addr) } } -/* -#[test] -fn test_original_capacity_to_repr() { - assert_eq!(original_capacity_to_repr(0), 0); - - let max_width = 32; - - for width in 1..(max_width + 1) { - let cap = 1 << width - 1; - - let expected = if width < MIN_ORIGINAL_CAPACITY_WIDTH { - 0 - } else if width < MAX_ORIGINAL_CAPACITY_WIDTH { - width - MIN_ORIGINAL_CAPACITY_WIDTH - } else { - MAX_ORIGINAL_CAPACITY_WIDTH - MIN_ORIGINAL_CAPACITY_WIDTH - }; - - assert_eq!(original_capacity_to_repr(cap), expected); - - if width > 1 { - assert_eq!(original_capacity_to_repr(cap + 1), expected); - } - - // MIN_ORIGINAL_CAPACITY_WIDTH must be bigger than 7 to pass tests below - if width == MIN_ORIGINAL_CAPACITY_WIDTH + 1 { - assert_eq!(original_capacity_to_repr(cap - 24), expected - 1); - assert_eq!(original_capacity_to_repr(cap + 76), expected); - } else if width == MIN_ORIGINAL_CAPACITY_WIDTH + 2 { - assert_eq!(original_capacity_to_repr(cap - 1), expected - 1); - assert_eq!(original_capacity_to_repr(cap - 48), expected - 1); - } - } -} - -#[test] -fn test_original_capacity_from_repr() { - assert_eq!(0, original_capacity_from_repr(0)); - - let min_cap = 1 << MIN_ORIGINAL_CAPACITY_WIDTH; - - assert_eq!(min_cap, original_capacity_from_repr(1)); - assert_eq!(min_cap * 2, original_capacity_from_repr(2)); - assert_eq!(min_cap * 4, original_capacity_from_repr(3)); - assert_eq!(min_cap * 8, original_capacity_from_repr(4)); - assert_eq!(min_cap * 16, original_capacity_from_repr(5)); - assert_eq!(min_cap * 32, original_capacity_from_repr(6)); - assert_eq!(min_cap * 64, original_capacity_from_repr(7)); -} -*/ - unsafe impl Send for BytesMut {} unsafe impl Sync for BytesMut {} From e42961375bfce717ca37943da091119295c68003 Mon Sep 17 00:00:00 2001 From: Matthias Einwag Date: Tue, 20 Oct 2020 15:20:26 -0700 Subject: [PATCH 17/18] fix no_std --- src/bytes_mut.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bytes_mut.rs b/src/bytes_mut.rs index 73d0509c8..3c8b91f0b 100644 --- a/src/bytes_mut.rs +++ b/src/bytes_mut.rs @@ -611,7 +611,7 @@ impl BytesMut { // See also https://doc.rust-lang.org/std/primitive.pointer.html#safety-2 // The computed offset, in bytes, cannot overflow an isize. assert!( - new_cap <= std::isize::MAX as usize, + new_cap <= core::isize::MAX as usize, "Maximum allocation size" ); From a9ca97df1dea13dad4645bbb079df8193fd7b6bb Mon Sep 17 00:00:00 2001 From: Matthias Einwag Date: Tue, 20 Oct 2020 15:31:00 -0700 Subject: [PATCH 18/18] Remove the considerations of original capacity If users want that behavior, the can still reserve for the desired capacity manually. --- src/bytes_mut.rs | 21 --------------------- tests/test_bytes.rs | 30 ------------------------------ 2 files changed, 51 deletions(-) diff --git a/src/bytes_mut.rs b/src/bytes_mut.rs index 3c8b91f0b..d102e4a32 100644 --- a/src/bytes_mut.rs +++ b/src/bytes_mut.rs @@ -68,12 +68,6 @@ struct Shared { refcount: AtomicUsize, } -/// The max original capacity value which is reused when a `BytesMut` which -/// had been previously split into a smaller size is resized. -/// The `BytesMut` will try to regain the original capacity that was available -/// before the `split` - but the capacity will be bounded by this value. -const MAX_ORIGINAL_CAPACITY: usize = 64 * 1024; - /* * * ===== BytesMut ===== @@ -557,11 +551,7 @@ impl BytesMut { let mut new_cap = len.checked_add(additional).expect("overflow"); unsafe { - let mut original_capacity = 0; - if !shared.is_null() { - original_capacity = (*shared).capacity; - // First, try to reclaim the buffer. This is possible if the current // handle is the only outstanding handle pointing to the buffer. // However, before doing the work of copying data, check to make @@ -595,17 +585,6 @@ impl BytesMut { // - We want to allocate at least 8 bytes, because tiny allocations // are not efficient new_cap = cmp::max(8, new_cap); - // - If we had an existing backing storage, we want to obtain at - // least the same capacity again that this one had. This is specific - // to BytesMut and not found in Vecs. The use-case is that BytesMut - // which are used over time to split off chunks and then gain new - // input data do not oscillate too much in size. - // This functionality is however bounded by `MAX_ORIGINAL_CAPACITY`. - // A `BytesMut` will not automatically reserve more capacity than this. - // TODO: Do we really want to keep this? This seems like a footgun - // where people splitting `BytesMut` and reusing them for different - // purposes might accidentally keep around big memory allocations. - new_cap = cmp::max(cmp::min(original_capacity, MAX_ORIGINAL_CAPACITY), new_cap); // The buffer length may not exceed `std::isize::MAX`. // This is checked by std containers like `Vec`, but not here. // See also https://doc.rust-lang.org/std/primitive.pointer.html#safety-2 diff --git a/tests/test_bytes.rs b/tests/test_bytes.rs index 895f25aec..fa3988613 100644 --- a/tests/test_bytes.rs +++ b/tests/test_bytes.rs @@ -455,36 +455,6 @@ fn reserve_at_least_8_bytes() { assert_eq!(bytes.capacity(), 8); } -#[test] -fn reserve_allocates_at_least_original_capacity() { - let mut bytes = BytesMut::with_capacity(1024); - - for i in 0..1020 { - bytes.put_u8(i as u8); - } - - let _other = bytes.split(); - - bytes.reserve(16); - assert_eq!(bytes.capacity(), 1024); -} - -#[test] -fn reserve_max_original_capacity_value() { - const SIZE: usize = 128 * 1024; - - let mut bytes = BytesMut::with_capacity(SIZE); - - for _ in 0..SIZE { - bytes.put_u8(0u8); - } - - let _other = bytes.split(); - - bytes.reserve(16); - assert_eq!(bytes.capacity(), 64 * 1024); -} - #[test] fn reserve_vec_recycling() { let mut bytes = BytesMut::with_capacity(16);