Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bindings/python): Add layer API for operator #3464

Merged
merged 3 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bindings/python/python/opendal/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class Error(Exception): ...

class Operator:
def __init__(self, scheme: str, **kwargs): ...
def layer(self, layer: Layer): ...
def read(self, path: str) -> memoryview: ...
def open_reader(self, path: str) -> Reader: ...
def write(
Expand All @@ -43,6 +44,7 @@ class Operator:

class AsyncOperator:
def __init__(self, scheme: str, **kwargs): ...
def layer(self, layer: Layer): ...
async def read(self, path: str) -> memoryview: ...
def open_reader(self, path: str) -> AsyncReader: ...
async def write(
Expand Down
6 changes: 0 additions & 6 deletions bindings/python/python/opendal/layers.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,6 @@
# specific language governing permissions and limitations
# under the License.

class ConcurrentLimitLayer:
def __init__(self, permits: int) -> None: ...

class ImmutableIndexLayer:
def insert(self, key: str) -> None: ...

class RetryLayer:
def __init__(
self,
Expand Down
12 changes: 9 additions & 3 deletions bindings/python/src/asyncio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ pub struct AsyncOperator(od::Operator);
#[pymethods]
impl AsyncOperator {
#[new]
#[pyo3(signature = (scheme, *, layers=Vec::new(), **map))]
pub fn new(scheme: &str, layers: Vec<layers::Layer>, map: Option<&PyDict>) -> PyResult<Self> {
#[pyo3(signature = (scheme, *, **map))]
pub fn new(scheme: &str, map: Option<&PyDict>) -> PyResult<Self> {
let scheme = od::Scheme::from_str(scheme)
.map_err(|err| {
od::Error::new(od::ErrorKind::Unexpected, "unsupported scheme").set_source(err)
Expand All @@ -71,7 +71,13 @@ impl AsyncOperator {
})
.unwrap_or_default();

Ok(AsyncOperator(build_operator(scheme, map, layers, false)?))
Ok(AsyncOperator(build_operator(scheme, map, false)?))
}

/// Add new layers upon existing operator
pub fn layer(&self, layer: &layers::Layer) -> PyResult<Self> {
let op = layer.0.layer(self.0.clone());
Ok(Self(op))
}

/// Read the whole path into bytes.
Expand Down
51 changes: 16 additions & 35 deletions bindings/python/src/layers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,47 +18,26 @@
use std::time::Duration;

use ::opendal as od;
use opendal::Operator;
use pyo3::prelude::*;

#[derive(FromPyObject)]
pub enum Layer {
ConcurrentLimit(ConcurrentLimitLayer),
ImmutableIndex(ImmutableIndexLayer),
Retry(RetryLayer),
pub trait PythonLayer: Send + Sync {
fn layer(&self, op: Operator) -> Operator;
}

#[pyclass(module = "opendal.layers")]
#[derive(Clone)]
pub struct ConcurrentLimitLayer(pub od::layers::ConcurrentLimitLayer);
#[pyclass(module = "opendal.layers", subclass)]
pub struct Layer(pub Box<dyn PythonLayer>);
messense marked this conversation as resolved.
Show resolved Hide resolved

#[pymethods]
impl ConcurrentLimitLayer {
#[new]
fn new(permits: usize) -> Self {
Self(od::layers::ConcurrentLimitLayer::new(permits))
}
}

#[pyclass(module = "opendal.layers")]
#[pyclass(module = "opendal.layers", extends=Layer)]
#[derive(Clone)]
pub struct ImmutableIndexLayer(pub od::layers::ImmutableIndexLayer);

#[pymethods]
impl ImmutableIndexLayer {
#[new]
fn new() -> Self {
Self(od::layers::ImmutableIndexLayer::default())
}
pub struct RetryLayer(od::layers::RetryLayer);

fn insert(&mut self, key: String) {
self.0.insert(key);
impl PythonLayer for RetryLayer {
fn layer(&self, op: Operator) -> Operator {
op.layer(self.0.clone())
}
}

#[pyclass(module = "opendal.layers")]
#[derive(Clone)]
pub struct RetryLayer(pub od::layers::RetryLayer);

#[pymethods]
impl RetryLayer {
#[new]
Expand All @@ -75,7 +54,7 @@ impl RetryLayer {
jitter: bool,
max_delay: Option<f64>,
min_delay: Option<f64>,
) -> PyResult<Self> {
) -> PyResult<(Self, Layer)> {
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
let mut retry = od::layers::RetryLayer::default();
if let Some(max_times) = max_times {
retry = retry.with_max_times(max_times);
Expand All @@ -92,14 +71,16 @@ impl RetryLayer {
if let Some(min_delay) = min_delay {
retry = retry.with_min_delay(Duration::from_micros((min_delay * 1000000.0) as u64));
}
Ok(Self(retry))

let retry_layer = Self(retry);

Ok((retry_layer.clone(), Layer(Box::new(retry_layer))))
}
}

pub fn create_submodule(py: Python) -> PyResult<&PyModule> {
let submod = PyModule::new(py, "layers")?;
submod.add_class::<ConcurrentLimitLayer>()?;
submod.add_class::<ImmutableIndexLayer>()?;
submod.add_class::<Layer>()?;
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
submod.add_class::<RetryLayer>()?;
Ok(submod)
}
38 changes: 13 additions & 25 deletions bindings/python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,25 +82,9 @@ impl From<Vec<u8>> for Buffer {
}
}

fn add_layers(mut op: od::Operator, layers: Vec<layers::Layer>) -> PyResult<od::Operator> {
for layer in layers {
match layer {
layers::Layer::Retry(layers::RetryLayer(inner)) => op = op.layer(inner),
layers::Layer::ImmutableIndex(layers::ImmutableIndexLayer(inner)) => {
op = op.layer(inner)
}
layers::Layer::ConcurrentLimit(layers::ConcurrentLimitLayer(inner)) => {
op = op.layer(inner)
}
}
}
Ok(op)
}

fn build_operator(
scheme: od::Scheme,
map: HashMap<String, String>,
layers: Vec<layers::Layer>,
blocking: bool,
) -> PyResult<od::Operator> {
let mut op = od::Operator::via_map(scheme, map).map_err(format_pyerr)?;
Expand All @@ -110,7 +94,7 @@ fn build_operator(
op = op.layer(od::layers::BlockingLayer::create().expect("blocking layer must be created"));
}

add_layers(op, layers)
Ok(op)
}

/// `Operator` is the entry for all public blocking APIs
Expand All @@ -122,8 +106,8 @@ struct Operator(od::BlockingOperator);
#[pymethods]
impl Operator {
#[new]
#[pyo3(signature = (scheme, *, layers=Vec::new(), **map))]
pub fn new(scheme: &str, layers: Vec<layers::Layer>, map: Option<&PyDict>) -> PyResult<Self> {
#[pyo3(signature = (scheme, *, **map))]
pub fn new(scheme: &str, map: Option<&PyDict>) -> PyResult<Self> {
let scheme = od::Scheme::from_str(scheme)
.map_err(|err| {
od::Error::new(od::ErrorKind::Unexpected, "unsupported scheme").set_source(err)
Expand All @@ -136,9 +120,13 @@ impl Operator {
})
.unwrap_or_default();

Ok(Operator(
build_operator(scheme, map, layers, true)?.blocking(),
))
Ok(Operator(build_operator(scheme, map, true)?.blocking()))
}

/// Add new layers upon existing operator
pub fn layer(&self, layer: &layers::Layer) -> PyResult<Self> {
let op = layer.0.layer(self.0.clone().into());
Ok(Self(op.blocking()))
}

/// Read the whole path into bytes.
Expand Down Expand Up @@ -585,11 +573,11 @@ fn _opendal(py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<capability::Capability>()?;
m.add("Error", py.get_type::<Error>())?;

let layers = layers::create_submodule(py)?;
m.add_submodule(layers)?;
let layers_module = layers::create_submodule(py)?;
m.add_submodule(layers_module)?;
py.import("sys")?
.getattr("modules")?
.set_item("opendal.layers", layers)?;
.set_item("opendal.layers", layers_module)?;

Ok(())
}
4 changes: 2 additions & 2 deletions bindings/python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ def setup_config(service_name):

@pytest.fixture()
def operator(service_name, setup_config):
return opendal.Operator(service_name, **setup_config)
return opendal.Operator(service_name, **setup_config).layer(opendal.layers.RetryLayer())


@pytest.fixture()
def async_operator(service_name, setup_config):
return opendal.AsyncOperator(service_name, **setup_config)
return opendal.AsyncOperator(service_name, **setup_config).layer(opendal.layers.RetryLayer())


@pytest.fixture(autouse=True)
Expand Down
11 changes: 11 additions & 0 deletions bindings/python/upgrade.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Unreleased

## Breaking change for layers

Operator and BlockingOperator won't accept `layers` anymore. Instead, we provide a `layer` API:

```python
op = opendal.Operator("memory").layer(opendal.layers.RetryLayer())
```

We removed not used layers `ConcurrentLimitLayer` and `ImmutableIndexLayer` along with this change.
6 changes: 6 additions & 0 deletions core/src/types/operator/blocking_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1140,3 +1140,9 @@ impl BlockingOperator {
))
}
}

impl From<BlockingOperator> for Operator {
fn from(v: BlockingOperator) -> Self {
Operator::from_inner(v.accessor).with_limit(v.limit)
}
}