Skip to content

Commit

Permalink
refactor: Replace futures with futures-util
Browse files Browse the repository at this point in the history
  • Loading branch information
tottoto committed May 9, 2023
1 parent 3d8ee5e commit dbd653e
Show file tree
Hide file tree
Showing 53 changed files with 160 additions and 160 deletions.
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bin/oay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ axum = "0.6"
chrono = "0.4.24"
clap = { version = "4", features = ["cargo", "string"] }
dirs = "5.0.0"
futures = "0.3"
futures-util = "0.3"
opendal.workspace = true
quick-xml = { version = "0.27", features = ["serialize", "overlapped-lists"] }
serde = { version = "1", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion bin/oli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ anyhow = "1"
clap = { version = "4", features = ["cargo", "string"] }
dirs = "5.0.0"
env_logger = "0.10"
futures = "0.3"
futures-util = "0.3"
log = "0.4"
opendal.workspace = true
serde = { version = "1", features = ["derive"] }
Expand Down
10 changes: 5 additions & 5 deletions bin/oli/src/commands/cp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use clap::Arg;
use clap::ArgAction;
use clap::ArgMatches;
use clap::Command;
use futures::TryStreamExt;
use futures_util::TryStreamExt;
use opendal::Metakey;

use crate::config::Config;
Expand All @@ -49,8 +49,8 @@ pub async fn main(args: &ArgMatches) -> Result<()> {
if !recursive {
let mut dst_w = dst_op.writer(&dst_path).await?;
let reader = src_op.reader(&src_path).await?;
let buf_reader = futures::io::BufReader::with_capacity(8 * 1024 * 1024, reader);
futures::io::copy_buf(buf_reader, &mut dst_w).await?;
let buf_reader = futures_util::io::BufReader::with_capacity(8 * 1024 * 1024, reader);
futures_util::io::copy_buf(buf_reader, &mut dst_w).await?;
// flush data
dst_w.close().await?;
return Ok(());
Expand All @@ -66,12 +66,12 @@ pub async fn main(args: &ArgMatches) -> Result<()> {

let fp = de.path().strip_prefix(&src_path).expect("invalid path");
let reader = src_op.reader(de.path()).await?;
let buf_reader = futures::io::BufReader::with_capacity(8 * 1024 * 1024, reader);
let buf_reader = futures_util::io::BufReader::with_capacity(8 * 1024 * 1024, reader);

let mut writer = dst_op.writer(&dst_root.join(fp).to_string_lossy()).await?;

println!("Copying {}", de.path());
futures::io::copy_buf(buf_reader, &mut writer).await?;
futures_util::io::copy_buf(buf_reader, &mut writer).await?;
writer.close().await?;
}
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion bin/oli/src/commands/ls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use clap::Arg;
use clap::ArgAction;
use clap::ArgMatches;
use clap::Command;
use futures::TryStreamExt;
use futures_util::TryStreamExt;

use crate::config::Config;

Expand Down
2 changes: 1 addition & 1 deletion bindings/nodejs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ crate-type = ["cdylib"]
doc = false

[dependencies]
futures = "0.3.28"
futures-util = "0.3.28"
napi = { version = "2.11.3", default-features = false, features = [
"napi6",
"async",
Expand Down
2 changes: 1 addition & 1 deletion bindings/nodejs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::collections::HashMap;
use std::str::FromStr;
use std::time::Duration;

use futures::TryStreamExt;
use futures_util::TryStreamExt;
use napi::bindgen_prelude::*;

fn build_operator(
Expand Down
2 changes: 1 addition & 1 deletion bindings/object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ version.workspace = true
[dependencies]
async-trait = "0.1"
bytes = "1"
futures = "0.3"
futures-util = "0.3"
object_store = "0.5"
opendal.workspace = true
tokio = "1"
Expand Down
6 changes: 3 additions & 3 deletions bindings/object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use std::task::Poll;

use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::Stream;
use futures::StreamExt;
use futures_util::stream::BoxStream;
use futures_util::Stream;
use futures_util::StreamExt;
use object_store::path::Path;
use object_store::GetResult;
use object_store::ListResult;
Expand Down
2 changes: 1 addition & 1 deletion bindings/python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ doc = false

[dependencies]
chrono = { version = "0.4.24", default-features = false, features = ["std"] }
futures = "0.3.28"
futures-util = "0.3.28"
opendal.workspace = true
pyo3 = { version = "0.18", features = ["chrono"] }
pyo3-asyncio = { version = "0.18", features = ["tokio-runtime"] }
Expand Down
2 changes: 1 addition & 1 deletion bindings/python/src/asyncio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::str::FromStr;
use std::sync::Arc;

use ::opendal as od;
use futures::TryStreamExt;
use futures_util::TryStreamExt;
use pyo3::exceptions::PyIOError;
use pyo3::exceptions::PyNotImplementedError;
use pyo3::exceptions::PyStopAsyncIteration;
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ bytes = "1.2"
chrono = "0.4.24"
dashmap = { version = "5.4", optional = true }
flagset = "0.4"
futures = { version = "0.3", features = ["alloc"] }
futures-util = { version = "0.3", features = ["alloc"] }
hdrs = { version = "0.2", optional = true, features = ["async_file"] }
http = "0.2.5"
hyper = "0.14"
Expand Down
6 changes: 3 additions & 3 deletions core/benches/ops/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
// under the License.

use criterion::Criterion;
use futures::io;
use futures::AsyncReadExt;
use futures_util::io;
use futures_util::AsyncReadExt;
use opendal::Operator;
use rand::prelude::*;
use size::Size;
Expand Down Expand Up @@ -143,7 +143,7 @@ fn bench_read_parallel(c: &mut Criterion, name: &str, op: Operator) {
let _ = d;
})
.collect::<Vec<_>>();
futures::future::join_all(futures).await
futures_util::future::join_all(futures).await
})
},
);
Expand Down
2 changes: 1 addition & 1 deletion core/src/docs/rfcs/0041_object_native_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Here is an example:

```rust
use anyhow::Result;
use futures::AsyncReadExt;
use futures_util::AsyncReadExt;

use opendal::services::fs;
use opendal::Operator;
Expand Down
2 changes: 1 addition & 1 deletion core/src/docs/rfcs/0069_object_stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ let meta = o.metadata_cached().await?;
First, we will add a new API in `Accessor`:

```rust
pub type BoxedObjectStream = Box<dyn futures::Stream<Item = Result<Object>> + Unpin + Send>;
pub type BoxedObjectStream = Box<dyn futures_util::Stream<Item = Result<Object>> + Unpin + Send>;

async fn list(&self, args: &OpList) -> Result<BoxedObjectStream> {
let _ = args;
Expand Down
2 changes: 1 addition & 1 deletion core/src/docs/rfcs/0191_async_streaming_io.md
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ pub trait Accessor: Send + Sync + Debug {
}
```

This API design addressed all concerns but made it hard for users to use. Primarily, we can't support `futures::AsyncRead` and `tokio::AsyncRead` simultaneously.
This API design addressed all concerns but made it hard for users to use. Primarily, we can't support `futures_util::AsyncRead` and `tokio::AsyncRead` simultaneously.

For example, we can't accept a `Box::new(Vec::new())`, user can't get this vec from OpenDAL.

Expand Down
14 changes: 7 additions & 7 deletions core/src/docs/rfcs/0337_dir_entry.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ Returning `DirEntry` instead of `Object` in list.
In [Object Stream](./0069-object-stream.md), we introduce read_dir support via:

```rust
pub trait ObjectStream: futures::Stream<Item = Result<Object>> + Unpin + Send {}
impl<T> ObjectStream for T where T: futures::Stream<Item = Result<Object>> + Unpin + Send {}
pub trait ObjectStream: futures_util::Stream<Item = Result<Object>> + Unpin + Send {}
impl<T> ObjectStream for T where T: futures_util::Stream<Item = Result<Object>> + Unpin + Send {}

pub struct Object {
acc: Arc<dyn Accessor>,
Expand All @@ -34,7 +34,7 @@ Users can't know an object's mode after the list, so they have to send `metadata
```rust
let o = op.object("path/to/dir/");
let mut obs = o.list().await?;
// ObjectStream implements `futures::Stream`
// ObjectStream implements `futures_util::Stream`
while let Some(o) = obs.next().await {
let mut o = o?;
// It's highly possible that OpenDAL already did metadata during list.
Expand All @@ -59,7 +59,7 @@ Introducing a separate `DirEntry` could reduce an extra call for metadata most o
```rust
let o = op.object("path/to/dir/");
let mut ds = o.list().await?;
// ObjectStream implements `futures::Stream`
// ObjectStream implements `futures_util::Stream`
while let Some(de) = ds.try_next().await {
match de.mode() {
ObjectMode::FILE => {
Expand All @@ -78,7 +78,7 @@ while let Some(de) = ds.try_next().await {
Within this RFC, `Object::list()` will return `DirStreamer` instead.

```rust
pub trait DirStream: futures::Stream<Item = Result<DirEntry>> + Unpin + Send {}
pub trait DirStream: futures_util::Stream<Item = Result<DirEntry>> + Unpin + Send {}
pub type DirStreamer = Box<dyn DirStream>;
```

Expand All @@ -97,7 +97,7 @@ With `DirEntry` support, we can reduce an extra `metadata` call if we only want
```rust
let o = op.object("path/to/dir/");
let mut ds = o.list().await?;
// ObjectStream implements `futures::Stream`
// ObjectStream implements `futures_util::Stream`
while let Some(de) = ds.try_next().await {
match de.mode() {
ObjectMode::FILE => {
Expand Down Expand Up @@ -138,7 +138,7 @@ impl From<DirEntry> for Object {}
And use `DirStream` to replace `ObjectStream`:

```rust
pub trait DirStream: futures::Stream<Item = Result<DirEntry>> + Unpin + Send {}
pub trait DirStream: futures_util::Stream<Item = Result<DirEntry>> + Unpin + Send {}
pub type DirStreamer = Box<dyn DirStream>;
```

Expand Down
2 changes: 1 addition & 1 deletion core/src/docs/rfcs/0599_blocking_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ Notes:

- `BlockingBytesReader` is a boxed `std::io::Read`.
- All blocking operations are happening on the current thread.
- Blocking operation is implemented natively, no `futures::block_on`.
- Blocking operation is implemented natively, no `futures_util::block_on`.

# Drawbacks

Expand Down
2 changes: 1 addition & 1 deletion core/src/docs/rfcs/0623_redis_service.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ use redis::AsyncCommands;
// args: &OpWrite
// r: BytesReader
let mut buf = vec![];
let content_length: u64 = futures::io::copy(r, &mut buf).await?;
let content_length: u64 = futures_util::io::copy(r, &mut buf).await?;
let last_modified: String = OffsetDateTime::now().to_string();

// content md5 will not be offered
Expand Down
6 changes: 3 additions & 3 deletions core/src/docs/rfcs/2083_writer_sink_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ I propose to add the following API to `Writer`:
impl Writer {
pub async fn copy_from<R>(&mut self, size: u64, r: R) -> Result<()>
where
R: futures::AsyncRead + Send + Sync + 'static;
R: futures_util::AsyncRead + Send + Sync + 'static;

pub async fn pipe_from<S>(&mut self, size: u64, s: S) -> Result<()>
where
S: futures::TryStream + Send + Sync + 'static
S: futures_util::TryStream + Send + Sync + 'static
Bytes: From<S::Ok>;
}
```
Expand Down Expand Up @@ -71,7 +71,7 @@ To support `Wrtier::copy_from` and `Writer::pipe_from`, we will add a new API ca
```rust
#[async_trait]
pub trait Write: Unpin + Send + Sync {
async fn sink(&mut self, size: u64, s: Box<dyn futures::TryStream<Ok=Bytes> + Send + Sync>) -> Result<()>;
async fn sink(&mut self, size: u64, s: Box<dyn futures_util::TryStream<Ok=Bytes> + Send + Sync>) -> Result<()>;
}
```

Expand Down
10 changes: 5 additions & 5 deletions core/src/docs/upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,9 @@ In v0.24, we made a big refactor on our internal IO-related traits. In this vers

Take `Reader` as an example:

`input::Reader` is the user input reader, which only requires `futures::AsyncRead + Send`.
`input::Reader` is the user input reader, which only requires `futures_util::AsyncRead + Send`.

`output::Reader` is the reader returned by `OpenDAL`, which implements `futures::AsyncRead`, `futures::AsyncSeek`, and `futures::Stream<Item=io::Result<Bytes>>`. Besides, `output::Reader` also implements `Send + Sync`, which makes it useful for users.
`output::Reader` is the reader returned by `OpenDAL`, which implements `futures_util::AsyncRead`, `futures_util::AsyncSeek`, and `futures_util::Stream<Item=io::Result<Bytes>>`. Besides, `output::Reader` also implements `Send + Sync`, which makes it useful for users.

Due to this change, all code that depends on `BytesReader` should be refactored.

Expand All @@ -278,9 +278,9 @@ Due to this change, all code that depends on `BytesReader` should be refactored.

Thanks to the change of IO trait split, we make `ObjectReader` implements all needed traits:

- `futures::AsyncRead`
- `futures::AsyncSeek`
- `futures::Stream<Item=io::Result<Bytes>>`
- `futures_util::AsyncRead`
- `futures_util::AsyncSeek`
- `futures_util::Stream<Item=io::Result<Bytes>>`

Thus, we removed the `seekable_reader` API. They can be replaced by `range_reader`:

Expand Down
2 changes: 1 addition & 1 deletion core/src/layers/chaos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::task::Poll;

use async_trait::async_trait;
use bytes::Bytes;
use futures::FutureExt;
use futures_util::FutureExt;
use rand::prelude::*;
use rand::rngs::StdRng;

Expand Down
2 changes: 1 addition & 1 deletion core/src/layers/error_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::task::Poll;

use async_trait::async_trait;
use bytes::Bytes;
use futures::TryFutureExt;
use futures_util::TryFutureExt;

use crate::ops::*;
use crate::raw::oio::PageOperation;
Expand Down
2 changes: 1 addition & 1 deletion core/src/layers/immutable_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ mod tests {
use std::collections::HashSet;

use anyhow::Result;
use futures::TryStreamExt;
use futures_util::TryStreamExt;
use log::debug;

use super::*;
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use std::task::Poll;

use async_trait::async_trait;
use bytes::Bytes;
use futures::FutureExt;
use futures::TryFutureExt;
use futures_util::FutureExt;
use futures_util::TryFutureExt;
use log::debug;
use log::log;
use log::trace;
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use std::time::Instant;

use async_trait::async_trait;
use bytes::Bytes;
use futures::FutureExt;
use futures::TryFutureExt;
use futures_util::FutureExt;
use futures_util::TryFutureExt;
use metrics::increment_counter;
use metrics::register_counter;
use metrics::register_histogram;
Expand Down
Loading

0 comments on commit dbd653e

Please sign in to comment.