Skip to content

Commit

Permalink
feat(layers): add await-tree async instrument layer
Browse files Browse the repository at this point in the history
Signed-off-by: owl <[email protected]>
  • Loading branch information
oowl committed Jul 25, 2023
1 parent 007565b commit 480d931
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 1 deletion.
84 changes: 83 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ layers-tracing = ["dep:tracing"]
layers-otel-trace = ["dep:opentelemetry"]
# Enable layers throttle support.
layers-throttle = ["dep:governor"]
# Enable layers await-tree support.
layers-await-tree = ["dep:await-tree"]

services-azblob = [
"dep:sha2",
Expand Down Expand Up @@ -183,6 +185,7 @@ anyhow = { version = "1.0.30", features = ["std"] }
async-compat = "0.2"
async-tls = { version = "0.11", optional = true }
async-trait = "0.1.68"
await-tree = { version = "0.1.1", optional = true }
backon = "0.4.0"
base64 = "0.21"
bb8 = { version = "0.8", optional = true }
Expand Down
133 changes: 133 additions & 0 deletions core/src/layers/await_tree.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::*;
use crate::raw::*;

use async_trait::async_trait;

use await_tree::InstrumentAwait;


/// Add a Instrument await-tree for actor-based applications to the underlying services.
///
/// # AwaitTree
///
/// await-tree allows developers to dump this execution tree at runtime,
/// with the span of each Future annotated by instrument_await.
/// Read more about [await-tree](https://docs.rs/await-tree/latest/await_tree/)
///
/// # Examples
///
/// ```
/// use anyhow::Result;
/// use opendal::layers::AwaitTreeLayer;
/// use opendal::services;
/// use opendal::Operator;
/// use opendal::Scheme;
///
/// let _ = Operator::new(services::Memory::default())
/// .expect("must init")
/// .layer(AwaitTreeLayer::new())
/// .finish();
/// ```
#[derive(Clone)]
pub struct AwaitTreeLayer {

}

impl AwaitTreeLayer {
/// Create a new `AwaitTreeLayer`.
pub fn new() -> Self {
Self {}
}
}

impl<A: Accessor> Layer<A> for AwaitTreeLayer {
type LayeredAccessor = AwaitTreeAccessor<A>;

fn layer(&self, accessor: A) -> Self::LayeredAccessor {
AwaitTreeAccessor {
inner: accessor,
}
}
}

#[derive(Debug, Clone)]
pub struct AwaitTreeAccessor<A: Accessor> {
inner: A,
}

#[async_trait]
impl <A: Accessor> LayeredAccessor for AwaitTreeAccessor<A> {
type Inner = A;
type Reader = A::Reader;
type BlockingReader = A::BlockingReader;
type Writer = A::Writer;
type BlockingWriter = A::BlockingWriter;
type Appender = A::Appender;
type Pager = A::Pager;
type BlockingPager = A::BlockingPager;

fn inner(&self) -> &Self::Inner {
&self.inner
}

async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
self.inner
.read(path, args)
.instrument_await("read")
.await
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {

self.inner
.write(path, args)
.instrument_await("write")
.await
}

async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
self.inner
.append(path, args)
.instrument_await("append")
.await
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> {
self.inner
.list(path, args)
.instrument_await("list")
.await
}

fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
self.inner
.blocking_read(path, args)
}

fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
self.inner
.blocking_write(path, args)
}

fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> {
self.inner.blocking_list(path, args)
}
}

3 changes: 3 additions & 0 deletions core/src/layers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,6 @@ mod throttle;
pub use self::oteltrace::OtelTraceLayer;
#[cfg(feature = "layers-throttle")]
pub use self::throttle::ThrottleLayer;
#[cfg(feature = "layers-await-tree")]
mod await_tree;
pub use self::await_tree::AwaitTreeLayer;

0 comments on commit 480d931

Please sign in to comment.