diff --git a/bin/oay/src/services/s3/service.rs b/bin/oay/src/services/s3/service.rs index f8bf88280f47..3fec8625f9ad 100644 --- a/bin/oay/src/services/s3/service.rs +++ b/bin/oay/src/services/s3/service.rs @@ -101,7 +101,7 @@ async fn handle_list_objects( let mut lister = state .op - .list_with(¶ms.prefix) + .lister_with(¶ms.prefix) .start_after(¶ms.start_after) .await?; diff --git a/bin/oay/src/services/webdav/webdavfs.rs b/bin/oay/src/services/webdav/webdavfs.rs index b9dea58a79b2..3eda5c539924 100644 --- a/bin/oay/src/services/webdav/webdavfs.rs +++ b/bin/oay/src/services/webdav/webdavfs.rs @@ -62,7 +62,7 @@ impl DavFileSystem for WebdavFs { ) -> dav_server::fs::FsFuture>> { async move { - let lister = self.op.list(path.as_url_string().as_str()).await.unwrap(); + let lister = self.op.lister(path.as_url_string().as_str()).await.unwrap(); Ok(DavStream::new(self.op.clone(), lister).boxed()) } .boxed() diff --git a/bin/oli/src/commands/cp.rs b/bin/oli/src/commands/cp.rs index 0c4468922d60..9bb40bad4ba4 100644 --- a/bin/oli/src/commands/cp.rs +++ b/bin/oli/src/commands/cp.rs @@ -57,7 +57,7 @@ pub async fn main(args: &ArgMatches) -> Result<()> { } let dst_root = Path::new(&dst_path); - let mut ds = src_op.scan(&src_path).await?; + let mut ds = src_op.lister_with(&src_path).delimiter("").await?; while let Some(de) = ds.try_next().await? { let meta = src_op.metadata(&de, Metakey::Mode).await?; if meta.mode().is_dir() { diff --git a/bin/oli/src/commands/ls.rs b/bin/oli/src/commands/ls.rs index 70747cffda9c..719b7dcb449c 100644 --- a/bin/oli/src/commands/ls.rs +++ b/bin/oli/src/commands/ls.rs @@ -41,14 +41,14 @@ pub async fn main(args: &ArgMatches) -> Result<()> { let (op, path) = cfg.parse_location(target)?; if !recursive { - let mut ds = op.list(&path).await?; + let mut ds = op.lister(&path).await?; while let Some(de) = ds.try_next().await? { println!("{}", de.name()); } return Ok(()); } - let mut ds = op.scan(&path).await?; + let mut ds = op.lister_with(&path).delimiter("").await?; while let Some(de) = ds.try_next().await? { println!("{}", de.path()); } diff --git a/bindings/nodejs/index.d.ts b/bindings/nodejs/index.d.ts index a5c78e8b86ed..112ece450320 100644 --- a/bindings/nodejs/index.d.ts +++ b/bindings/nodejs/index.d.ts @@ -216,6 +216,7 @@ export class Operator { * An error will be returned if given path doesn't end with /. * * ### Example + * * ```javascript * const lister = await op.scan("/path/to/dir/"); * while (true) { diff --git a/bindings/nodejs/src/lib.rs b/bindings/nodejs/src/lib.rs index 0cebdcad58c6..dbab75648cf5 100644 --- a/bindings/nodejs/src/lib.rs +++ b/bindings/nodejs/src/lib.rs @@ -288,6 +288,7 @@ impl Operator { /// An error will be returned if given path doesn't end with /. /// /// ### Example + /// /// ```javascript /// const lister = await op.scan("/path/to/dir/"); /// while (true) { @@ -303,7 +304,13 @@ impl Operator { /// ````` #[napi] pub async fn scan(&self, path: String) -> Result { - Ok(Lister(self.0.scan(&path).await.map_err(format_napi_error)?)) + Ok(Lister( + self.0 + .lister_with(&path) + .delimiter("") + .await + .map_err(format_napi_error)?, + )) } /// List dir in flat way synchronously. @@ -408,7 +415,9 @@ impl Operator { /// ``` #[napi] pub async fn list(&self, path: String) -> Result { - Ok(Lister(self.0.list(&path).await.map_err(format_napi_error)?)) + Ok(Lister( + self.0.lister(&path).await.map_err(format_napi_error)?, + )) } /// List given path synchronously. diff --git a/bindings/object_store/src/lib.rs b/bindings/object_store/src/lib.rs index 421b6a686e85..4e6784639602 100644 --- a/bindings/object_store/src/lib.rs +++ b/bindings/object_store/src/lib.rs @@ -148,7 +148,8 @@ impl ObjectStore for OpendalStore { let path = prefix.map_or("".into(), |x| format!("{}/", x)); let stream = self .inner - .scan(&path) + .lister_with(&path) + .delimiter("") .await .map_err(|err| format_object_store_error(err, &path))?; @@ -170,7 +171,7 @@ impl ObjectStore for OpendalStore { let path = prefix.map_or("".into(), |x| format!("{}/", x)); let mut stream = self .inner - .list(&path) + .lister(&path) .await .map_err(|err| format_object_store_error(err, &path))?; diff --git a/bindings/python/src/asyncio.rs b/bindings/python/src/asyncio.rs index 02dad1c18075..4b33015fa136 100644 --- a/bindings/python/src/asyncio.rs +++ b/bindings/python/src/asyncio.rs @@ -139,7 +139,7 @@ impl AsyncOperator { pub fn list<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<&'p PyAny> { let this = self.0.clone(); future_into_py(py, async move { - let lister = this.list(&path).await.map_err(format_pyerr)?; + let lister = this.lister(&path).await.map_err(format_pyerr)?; let pylister: PyObject = Python::with_gil(|py| AsyncLister::new(lister).into_py(py)); Ok(pylister) }) @@ -149,7 +149,11 @@ impl AsyncOperator { pub fn scan<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<&'p PyAny> { let this = self.0.clone(); future_into_py(py, async move { - let lister = this.scan(&path).await.map_err(format_pyerr)?; + let lister = this + .lister_with(&path) + .delimiter("") + .await + .map_err(format_pyerr)?; let pylister: PyObject = Python::with_gil(|py| AsyncLister::new(lister).into_py(py)); Ok(pylister) }) diff --git a/core/src/docs/rfcs/2774_lister_api.md b/core/src/docs/rfcs/2774_lister_api.md index c5a5733014b6..0418c84ade0b 100644 --- a/core/src/docs/rfcs/2774_lister_api.md +++ b/core/src/docs/rfcs/2774_lister_api.md @@ -41,6 +41,7 @@ We will: - Rename existing `list` to `lister` - Add new `list` method to call `lister` and return all entries +- Merge `scan` into `list_with` with `delimiter("")` This keeps the pagination logic encapsulated in `lister`. diff --git a/core/src/docs/upgrade.md b/core/src/docs/upgrade.md index c59c11004fe4..75fb1e48cc55 100644 --- a/core/src/docs/upgrade.md +++ b/core/src/docs/upgrade.md @@ -1,3 +1,15 @@ +# Unreleased + +## Public API + +### RFC-2774 Lister API + +RFC-2774 proposes a new `lister` API to replace current `list` and `scan`. And we add a new API `list` to return entries directly. + +- For listing a directory at once, please use `list()` for convenience. +- For listing a directory recursively, please use `list_with().delimiter("")` or `lister_with().delimiter("")` instead of `scan()`. +- For listing in streaming, please use `lister()` or `lister_with()` instead. + # Upgrade to v0.39 ## Public API diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 20a1dd7b28ca..25048b6c1020 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -1014,7 +1014,7 @@ mod tests { capability_test!(rename, |op| { op.rename("/path/to/mock_file", "/path/to/mock_file_2") }); - capability_test!(list, |op| { op.list("/path/to/mock_dir/") }); + capability_test!(list, |op| { op.lister("/path/to/mock_dir/") }); capability_test!(presign, |op| { op.presign_read("/path/to/mock_file", Duration::from_secs(1)) }); diff --git a/core/src/layers/immutable_index.rs b/core/src/layers/immutable_index.rs index 1c7ed8992349..cc2cf1907bcf 100644 --- a/core/src/layers/immutable_index.rs +++ b/core/src/layers/immutable_index.rs @@ -302,7 +302,7 @@ mod tests { let mut map = HashMap::new(); let mut set = HashSet::new(); - let mut ds = op.list("").await?; + let mut ds = op.lister("").await?; while let Some(entry) = ds.try_next().await? { debug!("got entry: {}", entry.path()); assert!( @@ -341,7 +341,7 @@ mod tests { .layer(iil) .finish(); - let mut ds = op.scan("/").await?; + let mut ds = op.lister_with("/").delimiter("").await?; let mut set = HashSet::new(); let mut map = HashMap::new(); while let Some(entry) = ds.try_next().await? { @@ -391,7 +391,7 @@ mod tests { // List / let mut map = HashMap::new(); let mut set = HashSet::new(); - let mut ds = op.list("/").await?; + let mut ds = op.lister("/").await?; while let Some(entry) = ds.try_next().await? { assert!( set.insert(entry.path().to_string()), @@ -410,7 +410,7 @@ mod tests { // List dataset/stateful/ let mut map = HashMap::new(); let mut set = HashSet::new(); - let mut ds = op.list("dataset/stateful/").await?; + let mut ds = op.lister("dataset/stateful/").await?; while let Some(entry) = ds.try_next().await? { assert!( set.insert(entry.path().to_string()), @@ -452,7 +452,7 @@ mod tests { .layer(iil) .finish(); - let mut ds = op.scan("/").await?; + let mut ds = op.lister_with("/").delimiter("").await?; let mut map = HashMap::new(); let mut set = HashSet::new(); diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index 1408af0351a2..be21adbd120b 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -1356,7 +1356,7 @@ mod tests { let expected = vec!["hello", "world", "2023/", "0208/"]; let mut lister = op - .list("retryable_error/") + .lister("retryable_error/") .await .expect("service must support list"); let mut actual = Vec::new(); diff --git a/core/src/types/operator/operator.rs b/core/src/types/operator/operator.rs index c36bf20c555a..4c2fdfa40baf 100644 --- a/core/src/types/operator/operator.rs +++ b/core/src/types/operator/operator.rs @@ -151,7 +151,7 @@ impl Operator { /// # } /// ``` pub async fn check(&self) -> Result<()> { - let mut ds = self.list("/").await?; + let mut ds = self.lister("/").await?; match ds.next().await { Some(Err(e)) if e.kind() != ErrorKind::NotFound => Err(e), @@ -1232,7 +1232,7 @@ impl Operator { return self.delete(path).await; } - let obs = self.scan(path).await?; + let obs = self.lister_with(path).delimiter("").await?; if self.info().can_batch() { let mut obs = obs.try_chunks(self.limit()); @@ -1266,7 +1266,127 @@ impl Operator { Ok(()) } - /// List given path. + /// List entries within a given directory. + /// + /// # Notes + /// + /// ## For listing recursively + /// + /// This function only read the children of the given directory. To read + /// all entries recursively, use `Operator::list_with("path").delimiter("")` + /// instead. + /// + /// ## For streaming + /// + /// This function will read all entries in the given directory. It could + /// take very long time and consume a lot of memory if the directory + /// contains a lot of entries. + /// + /// In order to avoid this, you can use [`Operator::lister`] to list entries in + /// a streaming way. + /// + /// # Examples + /// + /// ```no_run + /// # use anyhow::Result; + /// use opendal::EntryMode; + /// use opendal::Metakey; + /// use opendal::Operator; + /// # #[tokio::main] + /// # async fn test(op: Operator) -> Result<()> { + /// let mut entries = op.list("path/to/dir/").await?; + /// for entry in entries { + /// let meta = op.metadata(&entry, Metakey::Mode).await?; + /// match meta.mode() { + /// EntryMode::FILE => { + /// println!("Handling file") + /// } + /// EntryMode::DIR => { + /// println!("Handling dir like start a new list via meta.path()") + /// } + /// EntryMode::Unknown => continue, + /// } + /// } + /// # Ok(()) + /// # } + /// ``` + pub async fn list(&self, path: &str) -> Result> { + self.lister_with(path).await?.try_collect().await + } + + /// List entries within a given directory with options. + /// + /// # Notes + /// + /// ## For streaming + /// + /// This function will read all entries in the given directory. It could + /// take very long time and consume a lot of memory if the directory + /// contains a lot of entries. + /// + /// In order to avoid this, you can use [`Operator::lister`] to list entries in + /// a streaming way. + /// + /// # Examples + /// + /// ## List entries with prefix + /// + /// This function can also be used to list entries in recursive way. + /// + /// ```no_run + /// # use anyhow::Result; + /// use opendal::EntryMode; + /// use opendal::Metakey; + /// use opendal::Operator; + /// # #[tokio::main] + /// # async fn test(op: Operator) -> Result<()> { + /// let mut entries = op.list_with("prefix/").delimiter("").await?; + /// for entry in entries { + /// let meta = op.metadata(&entry, Metakey::Mode).await?; + /// match meta.mode() { + /// EntryMode::FILE => { + /// println!("Handling file") + /// } + /// EntryMode::DIR => { + /// println!("Handling dir like start a new list via meta.path()") + /// } + /// EntryMode::Unknown => continue, + /// } + /// } + /// # Ok(()) + /// # } + /// ``` + pub fn list_with(&self, path: &str) -> FutureList { + let path = normalize_path(path); + + let fut = FutureList(OperatorFuture::new( + self.inner().clone(), + path, + OpList::default(), + |inner, path, args| { + let fut = async move { + if !validate_path(&path, EntryMode::DIR) { + return Err(Error::new( + ErrorKind::NotADirectory, + "the path trying to list should end with `/`", + ) + .with_operation("Operator::list") + .with_context("service", inner.info().scheme().into_static()) + .with_context("path", &path)); + } + + let (_, pager) = inner.list(&path, args).await?; + let lister = Lister::new(pager); + + lister.try_collect().await + }; + Box::pin(fut) + }, + )); + fut + } + + /// List entries within a given directory as a stream. /// /// This function will create a new handle to list entries. /// @@ -1283,7 +1403,7 @@ impl Operator { /// use opendal::Operator; /// # #[tokio::main] /// # async fn test(op: Operator) -> Result<()> { - /// let mut ds = op.list("path/to/dir/").await?; + /// let mut ds = op.lister("path/to/dir/").await?; /// while let Some(mut de) = ds.try_next().await? { /// let meta = op.metadata(&de, Metakey::Mode).await?; /// match meta.mode() { @@ -1299,11 +1419,11 @@ impl Operator { /// # Ok(()) /// # } /// ``` - pub async fn list(&self, path: &str) -> Result { - self.list_with(path).await + pub async fn lister(&self, path: &str) -> Result { + self.lister_with(path).await } - /// List given path with OpList. + /// List entries within a given directory as a stream with options. /// /// This function will create a new handle to list entries. /// @@ -1323,7 +1443,7 @@ impl Operator { /// # #[tokio::main] /// # async fn test(op: Operator) -> Result<()> { /// let mut ds = op - /// .list_with("path/to/dir/") + /// .lister_with("path/to/dir/") /// .limit(10) /// .start_after("start") /// .await?; @@ -1345,8 +1465,6 @@ impl Operator { /// /// ## List all files recursively /// - /// We can use `op.scan()` as a shorter alias. - /// /// ```no_run /// # use anyhow::Result; /// # use futures::io; @@ -1356,7 +1474,7 @@ impl Operator { /// use opendal::Operator; /// # #[tokio::main] /// # async fn test(op: Operator) -> Result<()> { - /// let mut ds = op.list_with("path/to/dir/").delimiter("").await?; + /// let mut ds = op.lister_with("path/to/dir/").delimiter("").await?; /// while let Some(mut de) = ds.try_next().await? { /// let meta = op.metadata(&de, Metakey::Mode).await?; /// match meta.mode() { @@ -1372,10 +1490,10 @@ impl Operator { /// # Ok(()) /// # } /// ``` - pub fn list_with(&self, path: &str) -> FutureList { + pub fn lister_with(&self, path: &str) -> FutureLister { let path = normalize_path(path); - let fut = FutureList(OperatorFuture::new( + let fut = FutureLister(OperatorFuture::new( self.inner().clone(), path, OpList::default(), @@ -1400,50 +1518,8 @@ impl Operator { )); fut } - - /// List dir in flat way. - /// - /// Also, this function can be used to list a prefix. - /// - /// An error will be returned if given path doesn't end with `/`. - /// - /// # Notes - /// - /// - `scan` will not return the prefix itself. - /// - `scan` is an alias of `list_with(path).delimiter("")` - /// - /// # Examples - /// - /// ```no_run - /// # use anyhow::Result; - /// # use futures::io; - /// use futures::TryStreamExt; - /// use opendal::EntryMode; - /// use opendal::Metakey; - /// use opendal::Operator; - /// # - /// # #[tokio::main] - /// # async fn test(op: Operator) -> Result<()> { - /// let mut ds = op.scan("/path/to/dir/").await?; - /// while let Some(mut de) = ds.try_next().await? { - /// let meta = op.metadata(&de, Metakey::Mode).await?; - /// match meta.mode() { - /// EntryMode::FILE => { - /// println!("Handling file") - /// } - /// EntryMode::DIR => { - /// println!("Handling dir like start a new list via meta.path()") - /// } - /// EntryMode::Unknown => continue, - /// } - /// } - /// # Ok(()) - /// # } - /// ``` - pub async fn scan(&self, path: &str) -> Result { - self.list_with(path).delimiter("").await - } } + /// Operator presign API. impl Operator { /// Presign an operation for stat(head). diff --git a/core/src/types/operator/operator_futures.rs b/core/src/types/operator/operator_futures.rs index 8fd52a31eda7..68e6467886fb 100644 --- a/core/src/types/operator/operator_futures.rs +++ b/core/src/types/operator/operator_futures.rs @@ -568,9 +568,36 @@ impl Future for FutureDelete { /// Future that generated by [`Operator::list_with`]. /// /// Users can add more options by public functions provided by this struct. -pub struct FutureList(pub(crate) OperatorFuture); +pub struct FutureList(pub(crate) OperatorFuture>); impl FutureList { + /// Change the start_after of this list operation. + pub fn start_after(mut self, v: &str) -> Self { + self.0 = self.0.map_args(|args| args.with_start_after(v)); + self + } + + /// Change the delimiter. The default delimiter is "/" + pub fn delimiter(mut self, v: &str) -> Self { + self.0 = self.0.map_args(|args| args.with_delimiter(v)); + self + } +} + +impl Future for FutureList { + type Output = Result>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.0.poll_unpin(cx) + } +} + +/// Future that generated by [`Operator::lister_with`]. +/// +/// Users can add more options by public functions provided by this struct. +pub struct FutureLister(pub(crate) OperatorFuture); + +impl FutureLister { /// Change the limit of this list operation. pub fn limit(mut self, v: usize) -> Self { self.0 = self.0.map_args(|args| args.with_limit(v)); @@ -590,7 +617,7 @@ impl FutureList { } } -impl Future for FutureList { +impl Future for FutureLister { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/core/tests/behavior/list.rs b/core/tests/behavior/list.rs index bf8aa833976e..ed70676006de 100644 --- a/core/tests/behavior/list.rs +++ b/core/tests/behavior/list.rs @@ -66,7 +66,7 @@ pub async fn test_list_dir(op: Operator) -> Result<()> { op.write(&path, content).await.expect("write must succeed"); - let mut obs = op.list(&format!("{parent}/")).await?; + let mut obs = op.lister(&format!("{parent}/")).await?; let mut found = false; while let Some(de) = obs.try_next().await? { let meta = op.stat(de.path()).await?; @@ -105,7 +105,7 @@ pub async fn test_list_rich_dir(op: Operator) -> Result<()> { .collect::>() .await; - let mut objects = op.with_limit(10).list("test_list_rich_dir/").await?; + let mut objects = op.with_limit(10).lister("test_list_rich_dir/").await?; let mut actual = vec![]; while let Some(o) = objects.try_next().await? { let path = o.path().to_string(); @@ -126,7 +126,7 @@ pub async fn test_list_empty_dir(op: Operator) -> Result<()> { op.create_dir(&dir).await.expect("write must succeed"); - let mut obs = op.list(&dir).await?; + let mut obs = op.lister(&dir).await?; let mut objects = HashMap::new(); while let Some(de) = obs.try_next().await? { objects.insert(de.path().to_string(), de); @@ -143,7 +143,7 @@ pub async fn test_list_empty_dir(op: Operator) -> Result<()> { pub async fn test_list_non_exist_dir(op: Operator) -> Result<()> { let dir = format!("{}/", uuid::Uuid::new_v4()); - let mut obs = op.list(&dir).await?; + let mut obs = op.lister(&dir).await?; let mut objects = HashMap::new(); while let Some(de) = obs.try_next().await? { objects.insert(de.path().to_string(), de); @@ -160,7 +160,7 @@ pub async fn test_list_sub_dir(op: Operator) -> Result<()> { op.create_dir(&path).await.expect("creat must succeed"); - let mut obs = op.list("/").await?; + let mut obs = op.lister("/").await?; let mut found = false; while let Some(de) = obs.try_next().await? { if de.path() == path { @@ -192,7 +192,7 @@ pub async fn test_list_nested_dir(op: Operator) -> Result<()> { .expect("creat must succeed"); op.create_dir(&dir_path).await.expect("creat must succeed"); - let mut obs = op.list(&dir).await?; + let mut obs = op.lister(&dir).await?; let mut objects = HashMap::new(); while let Some(de) = obs.try_next().await? { @@ -235,7 +235,7 @@ pub async fn test_list_nested_dir(op: Operator) -> Result<()> { pub async fn test_list_dir_with_file_path(op: Operator) -> Result<()> { let parent = uuid::Uuid::new_v4().to_string(); - let obs = op.list(&parent).await.map(|_| ()); + let obs = op.lister(&parent).await.map(|_| ()); assert!(obs.is_err()); assert_eq!(obs.unwrap_err().kind(), ErrorKind::NotADirectory); @@ -267,7 +267,7 @@ pub async fn test_list_with_start_after(op: Operator) -> Result<()> { .collect::>() .await; - let mut objects = op.list_with(dir).start_after(&given[2]).await?; + let mut objects = op.lister_with(dir).start_after(&given[2]).await?; let mut actual = vec![]; while let Some(o) = objects.try_next().await? { let path = o.path().to_string(); @@ -284,7 +284,7 @@ pub async fn test_list_with_start_after(op: Operator) -> Result<()> { } pub async fn test_scan_root(op: Operator) -> Result<()> { - let w = op.scan("").await?; + let w = op.lister_with("").delimiter("").await?; let actual = w .try_collect::>() .await? @@ -312,7 +312,10 @@ pub async fn test_scan(op: Operator) -> Result<()> { } } - let w = op.scan(&format!("{parent}/x/")).await?; + let w = op + .lister_with(&format!("{parent}/x/")) + .delimiter("") + .await?; let actual = w .try_collect::>() .await? diff --git a/core/tests/behavior/list_only.rs b/core/tests/behavior/list_only.rs index 6ddf64cb3dfc..e7778461d0df 100644 --- a/core/tests/behavior/list_only.rs +++ b/core/tests/behavior/list_only.rs @@ -36,7 +36,7 @@ pub fn behavior_list_only_tests(op: &Operator) -> Vec { pub async fn test_list_only(op: Operator) -> Result<()> { let mut entries = HashMap::new(); - let mut ds = op.list("/").await?; + let mut ds = op.lister("/").await?; while let Some(de) = ds.try_next().await? { entries.insert(de.path().to_string(), op.stat(de.path()).await?.mode()); }