Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rust file-like objects and accept bytes/bytearray/numpy #45

Merged
merged 30 commits into from
Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
748dc50
Initial File object
milesgranger Mar 10, 2021
dfe4ac0
impl Read/Write for RustyBuffer/File
milesgranger Mar 10, 2021
a0c4980
Successfull conversion to accepting Read/Write; missing lz4 and snapp…
milesgranger Mar 10, 2021
dd6915e
Add example python test, rust tests are fubar for the moment
milesgranger Mar 10, 2021
1384f84
Swap lz4, minor updates to generic macro
milesgranger Mar 11, 2021
007f07f
Update README, lz4 supports de/compress_into now
milesgranger Mar 11, 2021
ab63626
chkpt: fix tests, need to implement Seek for WritablePyByteArray
milesgranger Mar 11, 2021
587c978
full lz4 support
milesgranger Mar 12, 2021
e581346
Update Cargo.toml pyo3 features
milesgranger Mar 12, 2021
c1ffcc8
chkpt: start on a mostly IOBase interface
milesgranger Mar 12, 2021
5fa1fea
Support .tell()
milesgranger Mar 12, 2021
bc8d02a
Initial impl of readinto
milesgranger Mar 12, 2021
24292e0
rust file-like obj api test, other cleanup
milesgranger Mar 12, 2021
753db39
.write also take BytesType enum
milesgranger Mar 13, 2021
e1f69a2
get ci to work, remove rlib?
milesgranger Mar 13, 2021
024922b
Use PyBytes::new_with in read with known n_bytes
milesgranger Mar 13, 2021
3201a5a
Fix PyPy and linux cross compilation (#46)
messense Mar 13, 2021
a28dc56
Support Write for BytesType
milesgranger Mar 13, 2021
8a497dc
Initial impl support seek from positions .seek(.., whence=..)
milesgranger Mar 13, 2021
b6ba051
Convert all variants to some Rust wrapper
milesgranger Mar 13, 2021
1a5b811
impl Seek for BytesType and update lz4 n compressed bytes
milesgranger Mar 14, 2021
b36947e
chkpt: RawEncoder/RawDecoder impl
milesgranger Mar 14, 2021
7b9aad0
RawEncoder/Decoder use entire bytes, cannot stream
milesgranger Mar 16, 2021
26024a9
Support Buffer taking BytesType directly, like BytesIO does
milesgranger Mar 17, 2021
6b2557b
chkpt: working on docs
milesgranger Mar 17, 2021
5ce7bbd
chkpt: working on docs
milesgranger Mar 17, 2021
6e33f60
chkpt: more docs
milesgranger Mar 17, 2021
57ba3b9
more docs
milesgranger Mar 18, 2021
66150e1
even more..
milesgranger Mar 18, 2021
d3c1328
Remove dead code
milesgranger Mar 18, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
RawEncoder/Decoder use entire bytes, cannot stream
  • Loading branch information
milesgranger committed Mar 16, 2021
commit 7b9aad061324fe039fc29140d855575e90045cfa
42 changes: 18 additions & 24 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,22 +121,20 @@ impl<'a> IntoPy<PyObject> for BytesType<'a> {

#[macro_export]
macro_rules! generic {
($op:ident($input:ident), py=$py:ident, output_len=$output_len:ident $(, level=$level:ident)?) => {
($op:ident($input:expr), py=$py:ident, output_len=$output_len:ident $(, level=$level:ident)?) => {
{
use crate::io::{RustyPyBytes, RustyPyByteArray, RustyNumpyArray};

match $input {
BytesType::Bytes(b) => {
let bytes = b.as_bytes();
let mut input_cursor = Cursor::new(bytes);
BytesType::Bytes(_) => {
match $output_len {
Some(len) => {
let pybytes = PyBytes::new_with($py, len, |buffer| {
let mut cursor = Cursor::new(buffer);
if stringify!($op) == "compress" {
to_py_err!(CompressionError -> self::internal::$op(&mut input_cursor, &mut cursor $(, $level)? ))?;
to_py_err!(CompressionError -> self::internal::$op($input, &mut cursor $(, $level)? ))?;
} else {
to_py_err!(DecompressionError -> self::internal::$op(&mut input_cursor, &mut cursor $(, $level)? ))?;
to_py_err!(DecompressionError -> self::internal::$op($input, &mut cursor $(, $level)? ))?;
}
Ok(())
})?;
Expand All @@ -145,52 +143,48 @@ macro_rules! generic {
None => {
let mut buffer = Vec::new();
if stringify!($op) == "compress" {
to_py_err!(CompressionError -> self::internal::$op(&mut input_cursor, &mut Cursor::new(&mut buffer) $(, $level)? ))?;
to_py_err!(CompressionError -> self::internal::$op($input, &mut Cursor::new(&mut buffer) $(, $level)? ))?;
} else {
to_py_err!(DecompressionError -> self::internal::$op(&mut input_cursor, &mut Cursor::new(&mut buffer) $(, $level)? ))?;
to_py_err!(DecompressionError -> self::internal::$op($input, &mut Cursor::new(&mut buffer) $(, $level)? ))?;
}

Ok(BytesType::Bytes(RustyPyBytes::from(PyBytes::new($py, &buffer))))
}
}
},
BytesType::ByteArray(b) => {
let bytes = b.as_bytes();
let mut cursor = Cursor::new(bytes);
BytesType::ByteArray(_) => {
let mut pybytes = RustyPyByteArray::new($py, $output_len.unwrap_or_else(|| 0));
if stringify!($op) == "compress" {
to_py_err!(CompressionError -> self::internal::$op(&mut cursor, &mut pybytes $(, $level)? ))?;
to_py_err!(CompressionError -> self::internal::$op($input, &mut pybytes $(, $level)? ))?;
} else {
to_py_err!(DecompressionError -> self::internal::$op(&mut cursor, &mut pybytes $(, $level)? ))?;
to_py_err!(DecompressionError -> self::internal::$op($input, &mut pybytes $(, $level)? ))?;
}
Ok(BytesType::ByteArray(pybytes))
},
BytesType::NumpyArray(b) => {
let buffer: &[u8] = b.as_slice();
let mut cursor = Cursor::new(buffer);
BytesType::NumpyArray(_) => {
let mut output = Vec::new();
if stringify!($op) == "compress" {
to_py_err!(CompressionError -> self::internal::$op(&mut cursor, &mut Cursor::new(&mut output) $(, $level)? ))?;
to_py_err!(CompressionError -> self::internal::$op($input, &mut Cursor::new(&mut output) $(, $level)? ))?;
} else {
to_py_err!(DecompressionError -> self::internal::$op(&mut cursor, &mut Cursor::new(&mut output) $(, $level)? ))?;
to_py_err!(DecompressionError -> self::internal::$op($input, &mut Cursor::new(&mut output) $(, $level)? ))?;
}
Ok(BytesType::NumpyArray(RustyNumpyArray::from_vec($py, output)))
},
BytesType::RustyFile(file) => {
BytesType::RustyFile(_) => {
let mut output = crate::io::RustyBuffer::default();
if stringify!($op) == "compress" {
to_py_err!(CompressionError -> self::internal::$op(&mut *file.borrow_mut(), &mut output $(, $level)? ))?;
to_py_err!(CompressionError -> self::internal::$op($input, &mut output $(, $level)? ))?;
} else {
to_py_err!(DecompressionError -> self::internal::$op(&mut *file.borrow_mut(), &mut output $(, $level)? ))?;
to_py_err!(DecompressionError -> self::internal::$op($input, &mut output $(, $level)? ))?;
}
Ok(BytesType::RustyBuffer(PyCell::new($py, output).unwrap()))
},
BytesType::RustyBuffer(buffer) => {
BytesType::RustyBuffer(_) => {
let mut output = crate::io::RustyBuffer::default();
if stringify!($op) == "compress" {
to_py_err!(CompressionError -> self::internal::$op(&mut *buffer.borrow_mut(), &mut output $(, $level)? ))?;
to_py_err!(CompressionError -> self::internal::$op($input, &mut output $(, $level)? ))?;
} else {
to_py_err!(DecompressionError -> self::internal::$op(&mut *buffer.borrow_mut(), &mut output $(, $level)? ))?;
to_py_err!(DecompressionError -> self::internal::$op($input, &mut output $(, $level)? ))?;
}
Ok(BytesType::RustyBuffer(PyCell::new($py, output).unwrap()))
}
Expand Down
4 changes: 2 additions & 2 deletions src/lz4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ pub fn decompress<'a>(py: Python<'a>, data: BytesType<'a>, output_len: Option<us
#[pyfunction]
pub fn compress<'a>(
py: Python<'a>,
data: BytesType<'a>,
mut data: BytesType<'a>,
level: Option<u32>,
output_len: Option<usize>,
) -> PyResult<BytesType<'a>> {
crate::generic!(compress(data), py = py, output_len = output_len, level = level)
crate::generic!(compress(&mut data), py = py, output_len = output_len, level = level)
}

/// Compress directly into an output buffer
Expand Down
137 changes: 93 additions & 44 deletions src/snappy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,86 +115,135 @@ pub fn decompress_raw_len<'a>(_py: Python<'a>, data: BytesType<'a>) -> PyResult<
}

pub(crate) mod internal {
use crate::BytesType;
use snap::raw::{decompress_len, max_compress_len, Decoder, Encoder};
use snap::read::{FrameDecoder, FrameEncoder};
use std::io::{Cursor, Error, Read, Write};

pub(crate) struct RawEncoder<R: Read> {
inner: R,
buffer: Vec<u8>, // raw data read from inner
compressed: Cursor<Vec<u8>>, // compressed data.
pub(crate) struct RawEncoder<'a, 'b> {
inner: &'b BytesType<'a>,
overflow: Option<Cursor<Vec<u8>>>,
encoder: Encoder,
is_finished: bool,
}
impl<R: Read> RawEncoder<R> {
pub fn new(inner: R) -> Self {
impl<'a, 'b> RawEncoder<'a, 'b> {
pub fn new(inner: &'b BytesType<'a>) -> Self {
Self {
inner,
buffer: Vec::new(),
compressed: Cursor::new(Vec::new()),
encoder: Encoder::new(),
overflow: None,
is_finished: false,
}
}
fn init_read(&mut self, bytes: &[u8], buf: &mut [u8]) -> std::io::Result<usize> {
let len = max_compress_len(bytes.len());
if buf.len() >= len {
let n = self.encoder.compress(bytes, buf)?;
self.is_finished = true; // if overflow is set, it will return 0 next iter
Ok(n)
} else {
let mut overflow = vec![0; len];
let n = self.encoder.compress(bytes, overflow.as_mut_slice())?;
overflow.truncate(n);
let mut overflow = Cursor::new(overflow);
let r = overflow.read(buf)?;
self.overflow = Some(overflow);
Ok(r)
}
}
}
impl<R: Read> Read for RawEncoder<R> {
impl<'a, 'b> Read for RawEncoder<'a, 'b> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if self.buffer.is_empty() {
let n = self.inner.read_to_end(&mut self.buffer)?;
self.buffer.truncate(n);
let len = max_compress_len(self.buffer.len());
self.compressed.get_mut().resize(len, 0);
let n = self
.encoder
.compress(self.buffer.as_slice(), self.compressed.get_mut())?;
self.compressed.get_mut().truncate(n);
self.compressed.set_position(0);
if self.is_finished {
return Ok(0);
}
match self.overflow.as_mut() {
Some(overflow) => overflow.read(buf),
None => match self.inner {
BytesType::Bytes(pybytes) => self.init_read(pybytes.as_bytes(), buf),
BytesType::ByteArray(pybytes) => self.init_read(pybytes.as_bytes(), buf),
BytesType::NumpyArray(array) => self.init_read(array.as_bytes(), buf),
BytesType::RustyBuffer(buffer) => {
let buffer_ref = buffer.borrow();
self.init_read(buffer_ref.inner.get_ref(), buf)
}
BytesType::RustyFile(file) => {
let mut buffer = vec![];
file.borrow_mut().read_to_end(&mut buffer)?;
self.init_read(buffer.as_slice(), buf)
}
},
}
self.compressed.read(buf)
}
}

pub(crate) struct RawDecoder<R: Read> {
inner: R,
buffer: Vec<u8>, // raw data read from inner
decompressed: Cursor<Vec<u8>>, // decompressed data.
pub(crate) struct RawDecoder<'a, 'b> {
inner: &'b BytesType<'a>,
overflow: Option<Cursor<Vec<u8>>>,
decoder: Decoder,
is_finished: bool,
}
impl<R: Read> RawDecoder<R> {
pub fn new(inner: R) -> Self {
impl<'a, 'b> RawDecoder<'a, 'b> {
pub fn new(inner: &'b BytesType<'a>) -> Self {
Self {
inner,
buffer: Vec::with_capacity(64000),
decompressed: Cursor::new(Vec::with_capacity(64000)),
decoder: Decoder::new(),
overflow: None,
is_finished: false,
}
}
fn init_read(&mut self, bytes: &[u8], buf: &mut [u8]) -> std::io::Result<usize> {
let len = decompress_len(bytes)?;
if buf.len() >= len {
let n = self.decoder.decompress(bytes, buf)?;
self.is_finished = true; // if overflow is set, it will return 0 next iter
Ok(n)
} else {
let mut overflow = vec![0; len];
let n = self.decoder.decompress(bytes, overflow.as_mut_slice())?;
overflow.truncate(n);
let mut overflow = Cursor::new(overflow);
let r = overflow.read(buf)?;
self.overflow = Some(overflow);
Ok(r)
}
}
}
impl<R: Read> Read for RawDecoder<R> {
impl<'a, 'b> Read for RawDecoder<'a, 'b> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if self.buffer.is_empty() {
let n = self.inner.read_to_end(&mut self.buffer)?;
self.buffer.truncate(n);
let len = decompress_len(&self.buffer)?;
self.decompressed.get_mut().resize(len, 0);
let n = self
.decoder
.decompress(self.buffer.as_slice(), self.decompressed.get_mut())?;
self.decompressed.get_mut().truncate(n);
self.decompressed.set_position(0);
if self.is_finished {
return Ok(0);
}
match self.overflow.as_mut() {
Some(overflow) => overflow.read(buf),
None => match self.inner {
BytesType::Bytes(pybytes) => self.init_read(pybytes.as_bytes(), buf),
BytesType::ByteArray(pybytes) => self.init_read(pybytes.as_bytes(), buf),
BytesType::NumpyArray(array) => self.init_read(array.as_bytes(), buf),
BytesType::RustyBuffer(buffer) => {
let buffer_ref = buffer.borrow();
self.init_read(buffer_ref.inner.get_ref(), buf)
}
BytesType::RustyFile(file) => {
let mut buffer = vec![];
file.borrow_mut().read_to_end(&mut buffer)?;
self.init_read(buffer.as_slice(), buf)
}
},
}
self.decompressed.read(buf)
}
}

/// Decompress snappy data raw into a mutable slice
pub fn decompress_raw<W: Write + ?Sized, R: Read>(input: R, output: &mut W) -> std::io::Result<usize> {
let mut decoder = RawDecoder::new(input);
pub fn decompress_raw<'a, W: Write>(input: BytesType<'a>, output: &mut W) -> std::io::Result<usize> {
let mut decoder = RawDecoder::new(&input);
let n_bytes = std::io::copy(&mut decoder, output)?;
Ok(n_bytes as usize)
}

/// Compress snappy data raw into a mutable slice
pub fn compress_raw<W: Write + ?Sized, R: Read>(input: R, output: &mut W) -> std::io::Result<usize> {
let mut encoder = RawEncoder::new(input);
pub fn compress_raw<'a, W: Write>(input: BytesType<'a>, output: &'a mut W) -> std::io::Result<usize> {
let mut encoder = RawEncoder::new(&input);
let n_bytes = std::io::copy(&mut encoder, output)?;
Ok(n_bytes as usize)
}
Expand Down