Skip to content

Commit

Permalink
add linux pipes kinda
Browse files Browse the repository at this point in the history
  • Loading branch information
cosmicexplorer committed Jul 7, 2024
1 parent 8d2b9b2 commit 234c9c9
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ libc = "0.2.155"
memchr = "2.7.2"
pbkdf2 = { version = "0.12.2", optional = true }
rand = { version = "0.8.5", optional = true }
ringbuf = "0.4.1"
sha1 = { version = "0.10.6", optional = true }
thiserror = "1.0.61"
time = { workspace = true, optional = true, features = [
Expand Down
149 changes: 148 additions & 1 deletion src/read/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ mod file {
}
}

mod file {
mod linux {
use super::{CopyRange, FixedFile, InputFile, OutputFile};

use std::fs;
Expand Down Expand Up @@ -508,4 +508,151 @@ mod pipe {
len: usize,
) -> io::Result<usize>;
}

/* mod in_memory { */
/* use super::{ReadEnd, WriteEnd}; */

/* use std::io; */
/* use std::sync::{Arc, Condvar, Mutex}; */

/* use ringbuf::{storage::Heap, traits::*, HeapRb, SharedRb}; */

/* enum QueueState { */
/* Live, */
/* WriteClosed, */
/* } */

/* struct LockedState { */
/* ring: VecDeque<u8>, */
/* status: QueueState, */
/* } */

/* pub struct ReadQueue { */
/* ring: Arc<Mutex<LockedState>>, */
/* cvar: Arc<Condvar>, */
/* } */

/* impl io::Read for ReadQueue { */
/* fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { */
/* let LockedState { */
/* ref mut ring, */
/* ref status, */
/* } = *self.ring.lock().unwrap(); */
/* match status { */
/* QueueState::WriteClosed => { */
/* let n = buf.len().min(ring.len()); */
/* ring.drain */
/* } */
/* } */
/* } */
/* } */

/* pub struct WriteQueue { */
/* ring: Arc<Mutex<LockedState>>, */
/* cvar: Arc<Condvar>, */
/* } */
/* } */

mod linux {
use super::{ReadEnd, ReadSplicer, Tee, WriteEnd, WriteSplicer};

Check failure on line 557 in src/read/split.rs

View workflow job for this annotation

GitHub Actions / Build and test --no-default-features: ubuntu-latest, stable

unused imports: `ReadSplicer`, `Tee`, `WriteSplicer`

Check failure on line 557 in src/read/split.rs

View workflow job for this annotation

GitHub Actions / style_and_docs (--no-default-features)

unused imports: `ReadSplicer`, `Tee`, and `WriteSplicer`

Check failure on line 557 in src/read/split.rs

View workflow job for this annotation

GitHub Actions / style_and_docs

unused imports: `ReadSplicer`, `Tee`, and `WriteSplicer`

use std::io;
use std::ops;

Check failure on line 560 in src/read/split.rs

View workflow job for this annotation

GitHub Actions / Build and test --no-default-features: ubuntu-latest, stable

unused import: `std::ops`

Check failure on line 560 in src/read/split.rs

View workflow job for this annotation

GitHub Actions / style_and_docs (--no-default-features)

unused import: `std::ops`

Check failure on line 560 in src/read/split.rs

View workflow job for this annotation

GitHub Actions / style_and_docs

unused import: `std::ops`
use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd};

use libc;

pub struct WritePipe {
handle: OwnedFd,
}

impl WritePipe {
pub(crate) unsafe fn from_fd(fd: RawFd) -> Self {
Self {
handle: OwnedFd::from_raw_fd(fd),
}
}

pub(crate) fn fd(&self) -> RawFd {
self.handle.as_raw_fd()
}
}

impl io::Write for WritePipe {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let fd: libc::c_int = self.fd();

let iov_base: *const libc::c_void = buf.as_ptr().cast();
let iovec = libc::iovec {
iov_base: iov_base.cast_mut(),
iov_len: buf.len(),
};
let nr_segs: usize = 1;
let flags: libc::c_uint = 0;

let rc: libc::ssize_t = unsafe { libc::vmsplice(fd, &iovec, nr_segs, flags) };
if rc < 0 {
return Err(io::Error::last_os_error());
}
let n: usize = rc.try_into().unwrap();
Ok(n)
}

fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}

impl WriteEnd for WritePipe {}

pub struct ReadPipe {
handle: OwnedFd,
}

impl ReadPipe {
pub(crate) unsafe fn from_fd(fd: RawFd) -> Self {
Self {
handle: OwnedFd::from_raw_fd(fd),
}
}

pub(crate) fn fd(&self) -> RawFd {
self.handle.as_raw_fd()
}
}

impl io::Read for ReadPipe {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let fd: libc::c_int = self.fd();

let iov_base: *mut libc::c_void = buf.as_mut_ptr().cast();
let iovec = libc::iovec {
iov_base,
iov_len: buf.len(),
};
let nr_segs: usize = 1;
let flags: libc::c_uint = 0;

let rc: libc::ssize_t = unsafe { libc::vmsplice(fd, &iovec, nr_segs, flags) };
if rc < 0 {
return Err(io::Error::last_os_error());
}
let n: usize = rc.try_into().unwrap();
Ok(n)
}
}

impl ReadEnd for ReadPipe {}

pub fn create_pipe() -> io::Result<(ReadPipe, WritePipe)> {
let mut fds: [libc::c_int; 2] = [0; 2];
let rc: libc::c_int = unsafe { libc::pipe(fds.as_mut_ptr()) };
if rc < 0 {
return Err(io::Error::last_os_error());
}
let [r, w] = fds;
let (r, w) = unsafe { (ReadPipe::from_fd(r), WritePipe::from_fd(w)) };
Ok((r, w))
}
}
}

0 comments on commit 234c9c9

Please sign in to comment.