Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Add trait and public API for append #2260

Merged
merged 10 commits into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion bin/oay/src/bin/oay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use oay::Config;
use opendal::services::Memory;
use opendal::Operator;
use std::sync::Arc;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
use tracing_subscriber::fmt;
use tracing_subscriber::prelude::*;
use tracing_subscriber::EnvFilter;

#[tokio::main]
async fn main() -> Result<()> {
Expand Down
4 changes: 3 additions & 1 deletion bindings/c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use std::str::FromStr;

use ::opendal as od;
use error::opendal_code;
use result::{opendal_result_is_exist, opendal_result_read, opendal_result_stat};
use result::opendal_result_is_exist;
use result::opendal_result_read;
use result::opendal_result_stat;
use types::opendal_metadata;

use crate::types::opendal_bytes;
Expand Down
3 changes: 2 additions & 1 deletion bindings/c/src/result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
//! we are defining all Result types here

use crate::error::opendal_code;
use crate::types::{opendal_bytes, opendal_metadata};
use crate::types::opendal_bytes;
use crate::types::opendal_metadata;

/// The Rust-like Result type of opendal C binding, it contains
/// the data that the read operation returns and a error code
Expand Down
5 changes: 5 additions & 0 deletions core/src/layers/chaos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ impl<A: Accessor> LayeredAccessor for ChaosAccessor<A> {
type BlockingReader = ChaosReader<A::BlockingReader>;
type Writer = A::Writer;
type BlockingWriter = A::BlockingWriter;
type Appender = A::Appender;
type Pager = A::Pager;
type BlockingPager = A::BlockingPager;

Expand Down Expand Up @@ -135,6 +136,10 @@ impl<A: Accessor> LayeredAccessor for ChaosAccessor<A> {
self.inner.blocking_write(path, args)
}

async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
self.inner.append(path, args).await
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> {
self.inner.list(path, args).await
}
Expand Down
56 changes: 56 additions & 0 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
type BlockingReader = CompleteReader<A, A::BlockingReader>;
type Writer = CompleteWriter<A::Writer>;
type BlockingWriter = CompleteWriter<A::BlockingWriter>;
type Appender = CompleteAppender<A::Appender>;
type Pager = CompletePager<A, A::Pager>;
type BlockingPager = CompletePager<A, A::BlockingPager>;

Expand Down Expand Up @@ -375,6 +376,13 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
.map(|(rp, w)| (rp, CompleteWriter::new(w, size)))
}

async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
self.inner
.append(path, args)
.await
.map(|(rp, a)| (rp, CompleteAppender::new(a)))
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> {
self.complete_list(path, args).await
}
Expand Down Expand Up @@ -646,3 +654,51 @@ where
Ok(())
}
}

pub struct CompleteAppender<A> {
inner: Option<A>,
}

impl<A> CompleteAppender<A> {
pub fn new(inner: A) -> CompleteAppender<A> {
CompleteAppender { inner: Some(inner) }
}
}

/// Check if the appender has been closed while debug_assertions enabled.
/// This code will never be executed in release mode.
#[cfg(debug_assertions)]
impl<A> Drop for CompleteAppender<A> {
fn drop(&mut self) {
if self.inner.is_none() {
// Do we need to panic here?
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
log::warn!("appender has not been closed, must be a bug")
}
}
}

#[async_trait]
impl<A> oio::Append for CompleteAppender<A>
where
A: oio::Append,
{
async fn append(&mut self, bs: Bytes) -> Result<()> {
let a = self
.inner
.as_mut()
.ok_or_else(|| Error::new(ErrorKind::Unexpected, "appender has been closed"))?;

a.append(bs).await
}

async fn close(&mut self) -> Result<()> {
let a = self
.inner
.as_mut()
.ok_or_else(|| Error::new(ErrorKind::Unexpected, "appender has been closed"))?;

a.close().await?;
self.inner = None;
Ok(())
}
}
26 changes: 26 additions & 0 deletions core/src/layers/concurrent_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ impl<A: Accessor> LayeredAccessor for ConcurrentLimitAccessor<A> {
type BlockingReader = ConcurrentLimitWrapper<A::BlockingReader>;
type Writer = ConcurrentLimitWrapper<A::Writer>;
type BlockingWriter = ConcurrentLimitWrapper<A::BlockingWriter>;
type Appender = ConcurrentLimitWrapper<A::Appender>;
type Pager = ConcurrentLimitWrapper<A::Pager>;
type BlockingPager = ConcurrentLimitWrapper<A::BlockingPager>;

Expand Down Expand Up @@ -132,6 +133,20 @@ impl<A: Accessor> LayeredAccessor for ConcurrentLimitAccessor<A> {
.map(|(rp, w)| (rp, ConcurrentLimitWrapper::new(w, permit)))
}

async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
let permit = self
.semaphore
.clone()
.acquire_owned()
.await
.expect("semaphore must be valid");

self.inner
.append(path, args)
.await
.map(|(rp, a)| (rp, ConcurrentLimitWrapper::new(a, permit)))
}

async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
let _permit = self
.semaphore
Expand Down Expand Up @@ -309,6 +324,17 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> {
}
}

#[async_trait]
impl<R: oio::Append> oio::Append for ConcurrentLimitWrapper<R> {
async fn append(&mut self, bs: Bytes) -> Result<()> {
self.inner.append(bs).await
}

async fn close(&mut self) -> Result<()> {
self.inner.close().await
}
}

#[async_trait]
impl<R: oio::Page> oio::Page for ConcurrentLimitWrapper<R> {
async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
Expand Down
42 changes: 42 additions & 0 deletions core/src/layers/error_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use bytes::Bytes;
use futures::TryFutureExt;

use crate::ops::*;
use crate::raw::oio::AppendOperation;
use crate::raw::oio::PageOperation;
use crate::raw::oio::ReadOperation;
use crate::raw::oio::WriteOperation;
Expand Down Expand Up @@ -71,6 +72,7 @@ impl<A: Accessor> LayeredAccessor for ErrorContextAccessor<A> {
type BlockingReader = ErrorContextWrapper<A::BlockingReader>;
type Writer = ErrorContextWrapper<A::Writer>;
type BlockingWriter = ErrorContextWrapper<A::BlockingWriter>;
type Appender = ErrorContextWrapper<A::Appender>;
type Pager = ErrorContextWrapper<A::Pager>;
type BlockingPager = ErrorContextWrapper<A::BlockingPager>;

Expand Down Expand Up @@ -138,6 +140,27 @@ impl<A: Accessor> LayeredAccessor for ErrorContextAccessor<A> {
.await
}

async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
self.inner
.append(path, args)
.map_ok(|(rp, os)| {
(
rp,
ErrorContextWrapper {
scheme: self.meta.scheme(),
path: path.to_string(),
inner: os,
},
)
})
.map_err(|err| {
err.with_operation(Operation::Append)
.with_context("service", self.meta.scheme())
.with_context("path", path)
})
.await
}

async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
self.inner
.copy(from, to, args)
Expand Down Expand Up @@ -447,6 +470,25 @@ impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> {
}
}

#[async_trait::async_trait]
impl<T: oio::Append> oio::Append for ErrorContextWrapper<T> {
async fn append(&mut self, bs: Bytes) -> Result<()> {
self.inner.append(bs).await.map_err(|err| {
err.with_operation(AppendOperation::Append)
.with_context("service", self.scheme)
.with_context("path", &self.path)
})
}

async fn close(&mut self) -> Result<()> {
self.inner.close().await.map_err(|err| {
err.with_operation(AppendOperation::Close)
.with_context("service", self.scheme)
.with_context("path", &self.path)
})
}
}

#[async_trait::async_trait]
impl<T: oio::Page> oio::Page for ErrorContextWrapper<T> {
async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
Expand Down
5 changes: 5 additions & 0 deletions core/src/layers/immutable_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ impl<A: Accessor> LayeredAccessor for ImmutableIndexAccessor<A> {
type BlockingReader = A::BlockingReader;
type Writer = A::Writer;
type BlockingWriter = A::BlockingWriter;
type Appender = A::Appender;
type Pager = ImmutableDir;
type BlockingPager = ImmutableDir;

Expand Down Expand Up @@ -194,6 +195,10 @@ impl<A: Accessor> LayeredAccessor for ImmutableIndexAccessor<A> {
self.inner.blocking_write(path, args)
}

async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
self.inner.append(path, args).await
}

fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> {
let mut path = path;
if path == "/" {
Expand Down
Loading