From 51f36a0603dbdaf0132b4703b5d74f1c27f16b9b Mon Sep 17 00:00:00 2001 From: clundro Date: Sun, 11 Jun 2023 20:06:03 +0800 Subject: [PATCH 1/6] add remove_all for blocking operator Signed-off-by: clundro --- core/src/types/operator/blocking_operator.rs | 44 ++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/core/src/types/operator/blocking_operator.rs b/core/src/types/operator/blocking_operator.rs index 7828b6f2b74b..72fc11e3d0a0 100644 --- a/core/src/types/operator/blocking_operator.rs +++ b/core/src/types/operator/blocking_operator.rs @@ -680,6 +680,50 @@ impl BlockingOperator { Ok(()) } + /// Remove the path and all nested dirs and files recursively. + /// + /// # Notes + /// + /// If underlying services support delete in batch, we will use batch + /// delete instead. + /// + /// # Examples + /// + /// ``` + /// # use anyhow::Result; + /// # use futures::io; + /// # use opendal::BlockingOperator; + /// # fn test(op: BlockingOperator) -> Result<()> { + /// op.remove_all("path/to/dir")?; + /// # Ok(()) + /// # } + /// ``` + pub fn remove_all(&self, path: &str) -> Result<()> { + let meta = match self.stat(path) { + Ok(metadata) => metadata, + + Err(e) if e.kind() == ErrorKind::NotFound => return Ok(()), + + Err(e) => return Err(e), + }; + + if meta.mode() != EntryMode::DIR { + return self.delete(path); + } + + let mut obs = self.scan(path)?; + + let _ = obs.try_for_each(|v| match v { + Ok(entry) => self.delete(entry.path()), + Err(e) => Err(e), + })?; + + // Remove the directory itself. + self.delete(path)?; + + Ok(()) + } + /// List current dir path. /// /// This function will create a new handle to list entries. From e0c9bd86054da04bf0f65925688d4c5f08377219 Mon Sep 17 00:00:00 2001 From: clundro Date: Sun, 11 Jun 2023 20:13:36 +0800 Subject: [PATCH 2/6] update comment Signed-off-by: clundro --- core/src/types/operator/blocking_operator.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/types/operator/blocking_operator.rs b/core/src/types/operator/blocking_operator.rs index 72fc11e3d0a0..ba39ea1a8b29 100644 --- a/core/src/types/operator/blocking_operator.rs +++ b/core/src/types/operator/blocking_operator.rs @@ -684,8 +684,7 @@ impl BlockingOperator { /// /// # Notes /// - /// If underlying services support delete in batch, we will use batch - /// delete instead. + /// We don't support batch delete now. /// /// # Examples /// From 6f63245d84949743a72ad221caa9532b9f7ec255 Mon Sep 17 00:00:00 2001 From: clundro Date: Mon, 12 Jun 2023 00:45:06 +0800 Subject: [PATCH 3/6] add remove_one and remove_all in behavior test Signed-off-by: clundro --- core/src/types/operator/blocking_operator.rs | 61 ++++++++++++++++++-- core/tests/behavior/blocking_list.rs | 32 ++++++++++ core/tests/behavior/blocking_write.rs | 6 ++ 3 files changed, 93 insertions(+), 6 deletions(-) diff --git a/core/src/types/operator/blocking_operator.rs b/core/src/types/operator/blocking_operator.rs index ba39ea1a8b29..cb268aaeb225 100644 --- a/core/src/types/operator/blocking_operator.rs +++ b/core/src/types/operator/blocking_operator.rs @@ -680,6 +680,52 @@ impl BlockingOperator { Ok(()) } + /// + /// # Notes + /// + /// We don't support batch delete now. + /// + /// # Examples + /// + /// ``` + /// # use anyhow::Result; + /// # use futures::io; + /// # use opendal::BlockingOperator; + /// # fn test(op: BlockingOperator) -> Result<()> { + /// op.remove(vec!["abc".to_string(), "def".to_string()])?; + /// # Ok(()) + /// # } + /// ``` + pub fn remove(&self, paths: Vec) -> Result<()> { + Ok(()) + } + + /// remove will remove files via the given paths. + /// + /// remove_via will remove files via the given stream. + /// + /// We will delete by chunks with given batch limit on the stream. + /// + /// # Notes + /// + /// We don't support batch delete now. + /// + /// # Examples + /// + /// ``` + /// # use anyhow::Result; + /// # use futures::io; + /// # use opendal::BlockingOperator; + /// # fn test(op: BlockingOperator) -> Result<()> { + /// # Ok(()) + /// # } + /// ``` + pub fn remove_via(&self) -> Result<()> { + // let stream = stream::iter(vec!["abc".to_string(), "def".to_string()]); + // op.remove_via(stream)?; + Ok(()) + } + /// Remove the path and all nested dirs and files recursively. /// /// # Notes @@ -710,13 +756,16 @@ impl BlockingOperator { return self.delete(path); } - let mut obs = self.scan(path)?; - - let _ = obs.try_for_each(|v| match v { - Ok(entry) => self.delete(entry.path()), - Err(e) => Err(e), - })?; + let obs = self.scan(path)?; + obs.for_each(|v| { + if (match v { + Ok(entry) => self.inner().blocking_delete(entry.path(), OpDelete::new()), + Err(e) => Err(e), + }) + .is_ok() + {} + }); // Remove the directory itself. self.delete(path)?; diff --git a/core/tests/behavior/blocking_list.rs b/core/tests/behavior/blocking_list.rs index b25dfa09fb7b..6b5638646d55 100644 --- a/core/tests/behavior/blocking_list.rs +++ b/core/tests/behavior/blocking_list.rs @@ -68,6 +68,7 @@ macro_rules! behavior_blocking_list_tests { test_list_dir, test_list_non_exist_dir, test_scan, + test_remove_all, ); )* }; @@ -152,3 +153,34 @@ pub fn test_scan(op: BlockingOperator) -> Result<()> { assert!(actual.contains("x/x/x/y")); Ok(()) } + +// Remove all should remove all in this path. +pub fn test_remove_all(op: BlockingOperator) -> Result<()> { + let parent = uuid::Uuid::new_v4().to_string(); + + let expected = vec![ + "x/", "x/y", "x/x/", "x/x/y", "x/x/x/", "x/x/x/y", "x/x/x/x/", + ]; + + for path in expected.iter() { + if path.ends_with('/') { + op.create_dir(&format!("{parent}/{path}"))?; + } else { + op.write(&format!("{parent}/{path}"), "test_scan")?; + } + } + + op.remove_all(&format!("{parent}/x/"))?; + + for path in expected.iter() { + if path.ends_with('/') { + continue; + } + assert!( + !op.is_exist(&format!("{parent}/{path}"))?, + "{parent}/{path} should be removed" + ) + } + + Ok(()) +} diff --git a/core/tests/behavior/blocking_write.rs b/core/tests/behavior/blocking_write.rs index 80fee31ec65f..64cc83c35028 100644 --- a/core/tests/behavior/blocking_write.rs +++ b/core/tests/behavior/blocking_write.rs @@ -84,6 +84,7 @@ macro_rules! behavior_blocking_write_tests { test_fuzz_offset_reader, test_fuzz_part_reader, test_delete_file, + test_remove_one_file, ); )* }; @@ -413,3 +414,8 @@ pub fn test_delete_file(op: BlockingOperator) -> Result<()> { Ok(()) } + +/// Remove one file +pub fn test_remove_one_file(op: BlockingOperator) -> Result<()> { + Ok(()) +} From f3e26ac884b68c8cde347020cffc01b31cfd7348 Mon Sep 17 00:00:00 2001 From: clundro Date: Mon, 12 Jun 2023 01:05:30 +0800 Subject: [PATCH 4/6] fix clippy Signed-off-by: clundro --- core/src/types/operator/blocking_operator.rs | 2 +- core/tests/behavior/blocking_write.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/types/operator/blocking_operator.rs b/core/src/types/operator/blocking_operator.rs index cb268aaeb225..153dc88d1439 100644 --- a/core/src/types/operator/blocking_operator.rs +++ b/core/src/types/operator/blocking_operator.rs @@ -696,7 +696,7 @@ impl BlockingOperator { /// # Ok(()) /// # } /// ``` - pub fn remove(&self, paths: Vec) -> Result<()> { + pub fn remove(&self, _paths: Vec) -> Result<()> { Ok(()) } diff --git a/core/tests/behavior/blocking_write.rs b/core/tests/behavior/blocking_write.rs index 64cc83c35028..e0515f8096cb 100644 --- a/core/tests/behavior/blocking_write.rs +++ b/core/tests/behavior/blocking_write.rs @@ -416,6 +416,6 @@ pub fn test_delete_file(op: BlockingOperator) -> Result<()> { } /// Remove one file -pub fn test_remove_one_file(op: BlockingOperator) -> Result<()> { +pub fn test_remove_one_file(_op: BlockingOperator) -> Result<()> { Ok(()) } From 5cc4313a3d53325f1b0e54f04080e69fc0260d35 Mon Sep 17 00:00:00 2001 From: clundro Date: Wed, 14 Jun 2023 03:16:14 +0800 Subject: [PATCH 5/6] update remove_all and remove Signed-off-by: clundro --- core/src/types/operator/blocking_operator.rs | 27 +++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/core/src/types/operator/blocking_operator.rs b/core/src/types/operator/blocking_operator.rs index 153dc88d1439..7947301db0eb 100644 --- a/core/src/types/operator/blocking_operator.rs +++ b/core/src/types/operator/blocking_operator.rs @@ -696,7 +696,11 @@ impl BlockingOperator { /// # Ok(()) /// # } /// ``` - pub fn remove(&self, _paths: Vec) -> Result<()> { + pub fn remove(&self, paths: Vec) -> Result<()> { + for path in paths.iter() { + self.delete(path)?; + } + Ok(()) } @@ -720,9 +724,7 @@ impl BlockingOperator { /// # Ok(()) /// # } /// ``` - pub fn remove_via(&self) -> Result<()> { - // let stream = stream::iter(vec!["abc".to_string(), "def".to_string()]); - // op.remove_via(stream)?; + pub fn remove_via(&self, input: impl Iterator) -> Result<()> { Ok(()) } @@ -758,14 +760,15 @@ impl BlockingOperator { let obs = self.scan(path)?; - obs.for_each(|v| { - if (match v { - Ok(entry) => self.inner().blocking_delete(entry.path(), OpDelete::new()), - Err(e) => Err(e), - }) - .is_ok() - {} - }); + for v in obs { + match v { + Ok(entry) => { + let Ok(_) = self.inner().blocking_delete(entry.path(), OpDelete::new()) else { return Err(e) }; + } + Err(e) => return Err(e), + } + } + // Remove the directory itself. self.delete(path)?; From f2ca18b5a0535c5bbb51c8209779e970daf0b656 Mon Sep 17 00:00:00 2001 From: clundro Date: Mon, 19 Jun 2023 00:48:51 +0800 Subject: [PATCH 6/6] use intoIter in remove_via Signed-off-by: clundro --- Cargo.lock | 2 +- core/src/types/operator/blocking_operator.rs | 27 ++++++++++---------- core/tests/behavior/blocking_write.rs | 12 ++++++++- 3 files changed, 26 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 108db71f811f..cfc298619430 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2776,7 +2776,7 @@ dependencies = [ [[package]] name = "opendal-dotnet" -version = "0.37.0" +version = "0.1.0" dependencies = [ "opendal", ] diff --git a/core/src/types/operator/blocking_operator.rs b/core/src/types/operator/blocking_operator.rs index 7947301db0eb..57144817e5b8 100644 --- a/core/src/types/operator/blocking_operator.rs +++ b/core/src/types/operator/blocking_operator.rs @@ -680,6 +680,9 @@ impl BlockingOperator { Ok(()) } + /// remove will remove files via the given paths. + /// + /// remove_via will remove files via the given vector iterators. /// /// # Notes /// @@ -692,24 +695,18 @@ impl BlockingOperator { /// # use futures::io; /// # use opendal::BlockingOperator; /// # fn test(op: BlockingOperator) -> Result<()> { - /// op.remove(vec!["abc".to_string(), "def".to_string()])?; + /// let stream = vec!["abc".to_string(), "def".to_string()].into_iter(); + /// op.remove_via(stream)?; /// # Ok(()) /// # } /// ``` - pub fn remove(&self, paths: Vec) -> Result<()> { - for path in paths.iter() { - self.delete(path)?; + pub fn remove_via(&self, input: impl Iterator) -> Result<()> { + for path in input { + self.delete(&path)?; } - Ok(()) } - /// remove will remove files via the given paths. - /// - /// remove_via will remove files via the given stream. - /// - /// We will delete by chunks with given batch limit on the stream. - /// /// # Notes /// /// We don't support batch delete now. @@ -721,10 +718,13 @@ impl BlockingOperator { /// # use futures::io; /// # use opendal::BlockingOperator; /// # fn test(op: BlockingOperator) -> Result<()> { + /// op.remove(vec!["abc".to_string(), "def".to_string()])?; /// # Ok(()) /// # } /// ``` - pub fn remove_via(&self, input: impl Iterator) -> Result<()> { + pub fn remove(&self, paths: Vec) -> Result<()> { + self.remove_via(paths.into_iter())?; + Ok(()) } @@ -763,7 +763,8 @@ impl BlockingOperator { for v in obs { match v { Ok(entry) => { - let Ok(_) = self.inner().blocking_delete(entry.path(), OpDelete::new()) else { return Err(e) }; + self.inner() + .blocking_delete(entry.path(), OpDelete::new())?; } Err(e) => return Err(e), } diff --git a/core/tests/behavior/blocking_write.rs b/core/tests/behavior/blocking_write.rs index e0515f8096cb..1cb312c23692 100644 --- a/core/tests/behavior/blocking_write.rs +++ b/core/tests/behavior/blocking_write.rs @@ -416,6 +416,16 @@ pub fn test_delete_file(op: BlockingOperator) -> Result<()> { } /// Remove one file -pub fn test_remove_one_file(_op: BlockingOperator) -> Result<()> { +pub fn test_remove_one_file(op: BlockingOperator) -> Result<()> { + let path = uuid::Uuid::new_v4().to_string(); + let (content, _) = gen_bytes(); + + op.write(&path, content).expect("write must succeed"); + + op.remove(vec![path.clone()])?; + + // Stat it again to check. + assert!(!op.is_exist(&path)?); + Ok(()) }