Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce SharedBuf trait for Bytes VTable #596

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Prev Previous commit
Next Next commit
tweaks to make the latest merge work.. also tracking down a miri issue
  • Loading branch information
rrichardson committed Feb 7, 2023
commit a4c65af53740ceb3b20b7dbc44b6e09df92db05b
20 changes: 4 additions & 16 deletions src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::buf::IntoIter;
use crate::impls::*;
#[allow(unused)]
use crate::loom::sync::atomic::AtomicMut;
use crate::loom::sync::atomic::AtomicPtr;
use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize};
use crate::shared_buf::{BufferParts, SharedBuf};
use crate::Buf;

Expand Down Expand Up @@ -861,26 +861,14 @@ impl From<Vec<u8>> for Bytes {
return Bytes::from(vec.into_boxed_slice());
}

let shared = Box::new(Shared {
let shared = Box::new(crate::impls::shared::Shared {
buf: ptr,
cap,
ref_cnt: AtomicUsize::new(1),
});
mem::forget(vec);

let shared = Box::into_raw(shared);
// The pointer should be aligned, so this assert should
// always succeed.
debug_assert!(
0 == (shared as usize & KIND_MASK),
"internal: Box<Shared> should have an aligned pointer",
);
Bytes {
ptr,
len,
data: AtomicPtr::new(shared as _),
vtable: &SHARED_VTABLE,
}
let imp = crate::impls::shared::SharedImpl::new(Box::into_raw(shared), ptr, len);
Bytes::from_shared_buf(imp)
}
}

Expand Down
10 changes: 10 additions & 0 deletions src/impls/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ pub(crate) struct SharedImpl {
len: usize,
}

impl SharedImpl {
pub(crate) fn new(shared: *mut Shared, ptr: *const u8, len: usize) -> Self {
SharedImpl {
shared,
offset: ptr,
len
}
}
}

unsafe impl SharedBuf for SharedImpl {
fn into_parts(this: Self) -> (AtomicPtr<()>, *const u8, usize) {
(AtomicPtr::new(this.shared.cast()), this.offset, this.len)
Expand Down
76 changes: 42 additions & 34 deletions tests/extern_buf_bytes.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
#![warn(rust_2018_idioms)]

use bytes::{Buf, BufMut, BufferParts, Bytes, BytesMut, SharedBuf};
use bytes::{BufferParts, Bytes, BytesMut, SharedBuf};

use std::alloc::{alloc, dealloc, Layout};
use std::ptr::{self, NonNull};
use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use std::usize;

const LONG: &[u8] = b"mary had a little lamb, little lamb, little lamb";
const SHORT: &[u8] = b"hello world";

struct ExternBuf {
ptr: NonNull<u8>,
cap: usize,
Expand All @@ -21,6 +18,8 @@ impl ExternBuf {
pub fn from_size(sz: usize) -> Self {
let layout = Layout::from_size_align(sz, 4).unwrap();
let ptr = NonNull::new(unsafe { alloc(layout) }).unwrap();
let num = ptr.as_ptr() as usize;
println!("Alloc'd {}", num);
ExternBuf {
ptr,
cap: sz,
Expand All @@ -47,7 +46,11 @@ impl From<&[u8]> for ExternBuf {
impl Drop for ExternBuf {
fn drop(&mut self) {
let layout = Layout::from_size_align(self.cap, 4).unwrap();
unsafe { dealloc(self.ptr.as_mut(), layout) };
unsafe {
let num = self.ptr.as_ptr() as usize;
println!("dealloc'ing {}", num);
dealloc(self.ptr.as_mut(), layout);
}
}
}

Expand Down Expand Up @@ -90,7 +93,6 @@ unsafe impl SharedBuf for ExternBufWrapper {
let buf = (*inner).ptr;
let cap = (*inner).cap;

// Deallocate Shared
drop(Box::from_raw(
inner as *mut std::mem::ManuallyDrop<ExternBuf>,
));
Expand All @@ -112,13 +114,18 @@ unsafe impl SharedBuf for ExternBufWrapper {
return;
}
(*inner).ref_count.load(Ordering::Acquire);
println!(
"invoking drop over box::from_raw on {}",
(*inner).ptr.as_ptr() as usize
);
drop(Box::from_raw(inner));
}
}

fn is_sync<T: Sync>() {}
fn is_send<T: Send>() {}

#[ignore]
#[test]
fn test_bounds() {
is_sync::<Bytes>();
Expand All @@ -127,6 +134,7 @@ fn test_bounds() {
is_send::<BytesMut>();
}

#[ignore]
#[test]
fn test_layout() {
use std::mem;
Expand Down Expand Up @@ -155,6 +163,7 @@ fn test_layout() {
);
}

#[ignore]
#[test]
fn roundtrip() {
let eb = ExternBuf::from(&b"abcdefgh"[..]);
Expand All @@ -168,36 +177,26 @@ fn roundtrip() {

#[test]
fn from_slice() {
let eb = ExternBuf::from(&b"abcdefgh"[..]);
let a = Bytes::from_shared_buf(eb.into_shared());
assert_eq!(a, b"abcdefgh"[..]);
assert_eq!(a, &b"abcdefgh"[..]);
assert_eq!(a, Vec::from(&b"abcdefgh"[..]));
assert_eq!(b"abcdefgh"[..], a);
assert_eq!(&b"abcdefgh"[..], a);
assert_eq!(Vec::from(&b"abcdefgh"[..]), a);

let eb = ExternBuf::from(&b"abcdefgh"[..]);
let a = Bytes::from_shared_buf(eb.into_shared());
assert_eq!(a, b"abcdefgh"[..]);
assert_eq!(a, &b"abcdefgh"[..]);
assert_eq!(a, Vec::from(&b"abcdefgh"[..]));
assert_eq!(b"abcdefgh"[..], a);
assert_eq!(&b"abcdefgh"[..], a);
assert_eq!(Vec::from(&b"abcdefgh"[..]), a);
}

#[test]
fn fmt() {
let a = format!("{:?}", Bytes::from(&b"abcdefg"[..]));
let b = "b\"abcdefg\"";

assert_eq!(a, b);

let a = format!("{:?}", BytesMut::from(&b"abcdefg"[..]));
assert_eq!(a, b);
let eb1 = ExternBuf::from(&b"abcdefgh"[..]);
let a1 = Bytes::from_shared_buf(eb1.into_shared());
assert_eq!(a1, b"abcdefgh"[..]);
assert_eq!(a1, &b"abcdefgh"[..]);
assert_eq!(a1, Vec::from(&b"abcdefgh"[..]));
assert_eq!(b"abcdefgh"[..], a1);
assert_eq!(&b"abcdefgh"[..], a1);
assert_eq!(Vec::from(&b"abcdefgh"[..]), a1);

let eb2 = ExternBuf::from(&b"abcdefgh"[..]);
let a2 = Bytes::from_shared_buf(eb2.into_shared());
assert_eq!(a2, b"abcdefgh"[..]);
assert_eq!(a2, &b"abcdefgh"[..]);
assert_eq!(a2, Vec::from(&b"abcdefgh"[..]));
assert_eq!(b"abcdefgh"[..], a2);
assert_eq!(&b"abcdefgh"[..], a2);
assert_eq!(Vec::from(&b"abcdefgh"[..]), a2);
}

#[ignore]
#[test]
fn len() {
let eb = ExternBuf::from(&b"abcdefg"[..]);
Expand All @@ -209,13 +208,15 @@ fn len() {
assert!(a.is_empty());
}

#[ignore]
#[test]
fn index() {
let eb = ExternBuf::from(&b"hello world"[..]);
let a = Bytes::from_shared_buf(eb.into_shared());
assert_eq!(a[0..5], *b"hello");
}

#[ignore]
#[test]
fn slice() {
let eb = ExternBuf::from(&b"hello world"[..]);
Expand All @@ -240,6 +241,7 @@ fn slice() {
assert_eq!(b, b"lo world"[..]);
}

#[ignore]
#[test]
#[should_panic]
fn slice_oob_1() {
Expand All @@ -248,6 +250,7 @@ fn slice_oob_1() {
a.slice(5..44);
}

#[ignore]
#[test]
#[should_panic]
fn slice_oob_2() {
Expand All @@ -256,6 +259,7 @@ fn slice_oob_2() {
a.slice(44..49);
}

#[ignore]
#[test]
fn split_off() {
let eb = ExternBuf::from(&b"helloworld"[..]);
Expand All @@ -266,6 +270,7 @@ fn split_off() {
assert_eq!(world, &b"world"[..]);
}

#[ignore]
#[test]
#[should_panic]
fn split_off_oob() {
Expand All @@ -274,6 +279,7 @@ fn split_off_oob() {
let _ = hello.split_off(44);
}

#[ignore]
#[test]
fn split_off_to_loop() {
let s = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
Expand Down Expand Up @@ -302,6 +308,7 @@ fn split_off_to_loop() {
}
}

#[ignore]
#[test]
fn truncate() {
let s = &b"helloworld"[..];
Expand All @@ -315,6 +322,7 @@ fn truncate() {
assert_eq!(hello, "hello");
}

#[ignore]
#[test]
// Only run these tests on little endian systems. CI uses qemu for testing
// big endian... and qemu doesn't really support threading all that well.
Expand Down