Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(binding/python): allow setting append/buffer/more in write() call #3256

Merged
merged 11 commits into from
Oct 12, 2023
28 changes: 24 additions & 4 deletions bindings/python/python/opendal/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,16 @@ class Operator:
def __init__(self, scheme: str, **kwargs): ...
def read(self, path: str) -> bytes: ...
def open_reader(self, path: str) -> Reader: ...
def write(self, path: str, bs: bytes): ...
def write(
self,
path: str,
bs: bytes,
append: bool = None,
buffer: int = None,
content_type: str = None,
content_disposition: str = None,
cache_control: str = None,
messense marked this conversation as resolved.
Show resolved Hide resolved
): ...
def stat(self, path: str) -> Metadata: ...
def create_dir(self, path: str): ...
def delete(self, path: str): ...
Expand All @@ -34,15 +43,26 @@ class AsyncOperator:
def __init__(self, scheme: str, **kwargs): ...
async def read(self, path: str) -> bytes: ...
def open_reader(self, path: str) -> AsyncReader: ...
async def write(self, path: str, bs: bytes): ...
async def write(
self,
path: str,
bs: bytes,
append: bool = None,
buffer: int = None,
content_type: str = None,
content_disposition: str = None,
cache_control: str = None,
): ...
async def stat(self, path: str) -> Metadata: ...
async def create_dir(self, path: str): ...
async def delete(self, path: str): ...
async def list(self, path: str) -> AsyncIterable[Entry]: ...
async def scan(self, path: str) -> AsyncIterable[Entry]: ...
async def presign_stat(self, path: str, expire_second: int) -> PresignedRequest: ...
async def presign_read(self, path: str, expire_second: int) -> PresignedRequest: ...
async def presign_write(self, path: str, expire_second: int) -> PresignedRequest: ...
async def presign_write(
self, path: str, expire_second: int
) -> PresignedRequest: ...

class Reader:
def read(self, size: Optional[int] = None) -> bytes: ...
Expand Down Expand Up @@ -86,4 +106,4 @@ class PresignedRequest:
@property
def method(self) -> str: ...
@property
def headers(self) -> dict[str, str]: ...
def headers(self) -> dict[str, str]: ...
29 changes: 27 additions & 2 deletions bindings/python/src/asyncio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::layers;
use crate::Entry;
use crate::Metadata;
use crate::PresignedRequest;
use crate::WriteOptions;

/// `AsyncOperator` is the entry for all public async APIs
///
Expand Down Expand Up @@ -87,11 +88,35 @@ impl AsyncOperator {
}

/// Write bytes into given path.
pub fn write<'p>(&'p self, py: Python<'p>, path: String, bs: &PyBytes) -> PyResult<&'p PyAny> {
#[pyo3(signature = (path, bs, **kwargs))]
pub fn write<'p>(
&'p self,
py: Python<'p>,
path: String,
bs: &PyBytes,
kwargs: Option<&PyDict>,
) -> PyResult<&'p PyAny> {
let write_options = WriteOptions::from_kwargs(kwargs)?;
let this = self.0.clone();
let bs = bs.as_bytes().to_vec();
future_into_py(py, async move {
this.write(&path, bs).await.map_err(format_pyerr)
let mut write = this.write_with(&path, bs);
if let Some(append) = write_options.append {
write = write.append(append);
}
if let Some(buffer) = write_options.buffer {
write = write.buffer(buffer);
}
if let Some(content_type) = write_options.content_type {
write = write.content_type(content_type.as_str());
}
if let Some(content_disposition) = write_options.content_disposition {
write = write.content_disposition(content_disposition.as_str());
}
if let Some(cache_control) = write_options.cache_control {
write = write.cache_control(cache_control.as_str());
}
write.await.map_err(format_pyerr)
})
}

Expand Down
89 changes: 87 additions & 2 deletions bindings/python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,27 @@ impl Operator {
}

/// Write bytes into given path.
pub fn write(&self, path: &str, bs: Vec<u8>) -> PyResult<()> {
self.0.write(path, bs).map_err(format_pyerr)
#[pyo3(signature = (path, bs, **kwargs))]
pub fn write(&self, path: &str, bs: Vec<u8>, kwargs: Option<&PyDict>) -> PyResult<()> {
let write_options = WriteOptions::from_kwargs(kwargs)?;
let mut write = self.0.write_with(path, bs);
if let Some(append) = write_options.append {
write = write.append(append);
}
if let Some(buffer) = write_options.buffer {
write = write.buffer(buffer);
}
if let Some(content_type) = write_options.content_type {
write = write.content_type(content_type.as_str());
}
if let Some(content_disposition) = write_options.content_disposition {
write = write.content_disposition(content_disposition.as_str());
}
if let Some(cache_control) = write_options.cache_control {
write = write.cache_control(cache_control.as_str());
}

write.call().map_err(format_pyerr)
}

/// Get current path's metadata **without cache** directly.
Expand Down Expand Up @@ -421,6 +440,72 @@ fn format_pyerr(err: od::Error) -> PyErr {
}
}

/// options in a write operation, shared by Operator.write() and AsyncOperator.write()
struct WriteOptions {
jokester marked this conversation as resolved.
Show resolved Hide resolved
append: Option<bool>,
buffer: Option<usize>,
content_type: Option<String>,
content_disposition: Option<String>,
cache_control: Option<String>,
}

impl WriteOptions {
pub fn from_kwargs(kwargs: Option<&PyDict>) -> PyResult<WriteOptions> {
let buffer = kwargs
.and_then(|dict| dict.get_item("buffer"))
.map(|v| {
v.extract::<usize>().map_err(|err| {
PyValueError::new_err(format!("buffer must be usize, got {}", err))
})
})
.transpose()?;

let append = kwargs
.and_then(|dict| dict.get_item("append"))
.map(|v| {
v.extract::<bool>().map_err(|err| {
PyValueError::new_err(format!("append must be bool, got {}", err))
})
})
.transpose()?;

let content_type = kwargs
.and_then(|dict| dict.get_item("content_type"))
.map(|v| {
v.extract::<String>().map_err(|err| {
PyValueError::new_err(format!("content_type must be str, got {}", err))
})
})
.transpose()?;

let content_disposition = kwargs
.and_then(|dict| dict.get_item("content_disposition"))
.map(|v| {
v.extract::<String>().map_err(|err| {
PyValueError::new_err(format!("content_disposition must be str, got {}", err))
})
})
.transpose()?;

let cache_control = kwargs
.and_then(|dict| dict.get_item("cache_control"))
.map(|v| {
v.extract::<String>().map_err(|err| {
PyValueError::new_err(format!("cache_control must be str, got {}", err))
})
})
.transpose()?;

Ok(WriteOptions {
buffer,
append,
content_type,
content_disposition,
cache_control,
})
}
}

/// OpenDAL Python binding
///
/// ## Installation
Expand Down
3 changes: 2 additions & 1 deletion bindings/python/tests/test_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,12 @@ async def test_async_write(self):
filename = f"test_file_{str(uuid4())}.txt"
content = os.urandom(size)
size = len(content)
await self.async_operator.write(filename, content)
await self.async_operator.write(filename, content, content_type='text/plain')
jokester marked this conversation as resolved.
Show resolved Hide resolved
metadata = await self.async_operator.stat(filename)
assert metadata is not None
assert metadata.mode.is_file()
assert metadata.content_length == size
assert metadata.content_type == 'text/plain'

await self.async_operator.delete(filename)

Expand Down