diff --git a/Cargo.lock b/Cargo.lock index 167a6d18863a..cb0b29ed6f58 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -364,6 +364,23 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "await-tree" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "325bcfc4b87d4aa36f1319b806bacc40fcefcaf43a12bd85a5a2f44fc14ce9de" +dependencies = [ + "coarsetime", + "derive_builder", + "flexstr", + "indextree", + "itertools", + "parking_lot 0.12.1", + "pin-project", + "tokio", + "tracing", +] + [[package]] name = "awaitable" version = "0.4.0" @@ -861,6 +878,18 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b" +[[package]] +name = "coarsetime" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a90d114103adbc625300f346d4d09dfb4ab1c4a8df6868435dd903392ecf4354" +dependencies = [ + "libc", + "once_cell", + "wasi 0.11.0+wasi-snapshot-preview1", + "wasm-bindgen", +] + [[package]] name = "colorchoice" version = "1.0.0" @@ -1241,6 +1270,37 @@ dependencies = [ "syn 2.0.23", ] +[[package]] +name = "derive_builder" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d67778784b508018359cbc8696edb3db78160bab2c2a28ba7f56ef6932997f8" +dependencies = [ + "derive_builder_macro", +] + +[[package]] +name = "derive_builder_core" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c11bdc11a0c47bc7d37d582b5285da6849c96681023680b906673c5707af7b0f" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "derive_builder_macro" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebcda35c7a396850a55ffeac740804b40ffec779b98fffbb1738f4033f0ee79e" +dependencies = [ + "derive_builder_core", + "syn 1.0.109", +] + [[package]] name = "derive_destructure2" version = "0.1.1" @@ -1474,6 +1534,15 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cda653ca797810c02f7ca4b804b40b8b95ae046eb989d356bce17919a8c25499" +[[package]] +name = "flexstr" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d50aef14619d336a54fca5a592d952eb39037b1a1e7e6afd9f91c892ac7ef65" +dependencies = [ + "static_assertions", +] + [[package]] name = "float-cmp" version = "0.9.0" @@ -2072,6 +2141,12 @@ dependencies = [ "hashbrown 0.14.0", ] +[[package]] +name = "indextree" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c40411d0e5c63ef1323c3d09ce5ec6d84d71531e18daed0743fccea279d7deb6" + [[package]] name = "indoc" version = "1.0.9" @@ -3031,6 +3106,7 @@ dependencies = [ "async-compat", "async-tls", "async-trait", + "await-tree", "backon", "base64 0.21.2", "bb8", @@ -3924,7 +4000,7 @@ dependencies = [ "indoc", "libc", "memoffset", - "parking_lot 0.11.2", + "parking_lot 0.12.1", "pyo3-build-config", "pyo3-ffi", "pyo3-macros", @@ -4989,6 +5065,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "strsim" version = "0.10.0" diff --git a/core/Cargo.toml b/core/Cargo.toml index 45ef98e7f06c..2656fe6de237 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -93,6 +93,8 @@ layers-tracing = ["dep:tracing"] layers-otel-trace = ["dep:opentelemetry"] # Enable layers throttle support. layers-throttle = ["dep:governor"] +# Enable layers await-tree support. +layers-await-tree = ["dep:await-tree"] services-azblob = [ "dep:sha2", @@ -183,6 +185,7 @@ anyhow = { version = "1.0.30", features = ["std"] } async-compat = "0.2" async-tls = { version = "0.11", optional = true } async-trait = "0.1.68" +await-tree = { version = "0.1.1", optional = true } backon = "0.4.0" base64 = "0.21" bb8 = { version = "0.8", optional = true } diff --git a/core/src/layers/await_tree.rs b/core/src/layers/await_tree.rs new file mode 100644 index 000000000000..b626295d580d --- /dev/null +++ b/core/src/layers/await_tree.rs @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::*; +use crate::raw::*; + +use async_trait::async_trait; + +use await_tree::InstrumentAwait; + + +/// Add a Instrument await-tree for actor-based applications to the underlying services. +/// +/// # AwaitTree +/// +/// await-tree allows developers to dump this execution tree at runtime, +/// with the span of each Future annotated by instrument_await. +/// Read more about [await-tree](https://docs.rs/await-tree/latest/await_tree/) +/// +/// # Examples +/// +/// ``` +/// use anyhow::Result; +/// use opendal::layers::AwaitTreeLayer; +/// use opendal::services; +/// use opendal::Operator; +/// use opendal::Scheme; +/// +/// let _ = Operator::new(services::Memory::default()) +/// .expect("must init") +/// .layer(AwaitTreeLayer::new()) +/// .finish(); +/// ``` +#[derive(Clone)] +pub struct AwaitTreeLayer { + +} + +impl AwaitTreeLayer { + /// Create a new `AwaitTreeLayer`. + pub fn new() -> Self { + Self {} + } +} + +impl Layer for AwaitTreeLayer { + type LayeredAccessor = AwaitTreeAccessor; + + fn layer(&self, accessor: A) -> Self::LayeredAccessor { + AwaitTreeAccessor { + inner: accessor, + } + } +} + +#[derive(Debug, Clone)] +pub struct AwaitTreeAccessor { + inner: A, +} + +#[async_trait] +impl LayeredAccessor for AwaitTreeAccessor { + type Inner = A; + type Reader = A::Reader; + type BlockingReader = A::BlockingReader; + type Writer = A::Writer; + type BlockingWriter = A::BlockingWriter; + type Appender = A::Appender; + type Pager = A::Pager; + type BlockingPager = A::BlockingPager; + + fn inner(&self) -> &Self::Inner { + &self.inner + } + + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + self.inner + .read(path, args) + .instrument_await("read") + .await + } + + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + + self.inner + .write(path, args) + .instrument_await("write") + .await + } + + async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { + self.inner + .append(path, args) + .instrument_await("append") + .await + } + + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { + self.inner + .list(path, args) + .instrument_await("list") + .await + } + + fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { + self.inner + .blocking_read(path, args) + } + + fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { + self.inner + .blocking_write(path, args) + } + + fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { + self.inner.blocking_list(path, args) + } +} + diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs index ab5861a0ccfe..91895bcb3849 100644 --- a/core/src/layers/mod.rs +++ b/core/src/layers/mod.rs @@ -84,3 +84,6 @@ mod throttle; pub use self::oteltrace::OtelTraceLayer; #[cfg(feature = "layers-throttle")] pub use self::throttle::ThrottleLayer; +#[cfg(feature = "layers-await-tree")] +mod await_tree; +pub use self::await_tree::AwaitTreeLayer;