diff --git a/.env.example b/.env.example index b31a6b8e8cdd..6d107e06ca58 100644 --- a/.env.example +++ b/.env.example @@ -94,6 +94,7 @@ OPENDAL_WEBHDFS_TEST=false OPENDAL_WEBHDFS_ROOT=/tmp/opendal/ OPENDAL_WEBHDFS_ENDPOINT=http://127.0.0.1:9870 OPENDAL_WEBHDFS_DELEGATION= +OPENDAL_WEBHDFS_DISABLE_LIST_BATCH=false # supbase OPENDAL_SUPABASE_TEST=false OPENDAL_SUPABASE_BUCKET= diff --git a/.github/workflows/service_test_webhdfs.yml b/.github/workflows/service_test_webhdfs.yml index eb84a274ac9d..475cb4a8b6dc 100644 --- a/.github/workflows/service_test_webhdfs.yml +++ b/.github/workflows/service_test_webhdfs.yml @@ -77,3 +77,13 @@ jobs: OPENDAL_WEBHDFS_TEST: on OPENDAL_WEBHDFS_ROOT: / OPENDAL_WEBHDFS_ENDPOINT: http://127.0.0.1:9870 + + - name: Test with disable_list_batch + shell: bash + working-directory: core + run: cargo nextest run webhdfs + env: + OPENDAL_WEBHDFS_TEST: on + OPENDAL_WEBHDFS_ROOT: / + OPENDAL_WEBHDFS_ENDPOINT: http://127.0.0.1:9870 + OPENDAL_WEBHDFS_DISABLE_LIST_BATCH: true diff --git a/core/src/services/webhdfs/backend.rs b/core/src/services/webhdfs/backend.rs index 122a63fca6d6..4f90b9d2cee0 100644 --- a/core/src/services/webhdfs/backend.rs +++ b/core/src/services/webhdfs/backend.rs @@ -29,6 +29,7 @@ use tokio::sync::OnceCell; use super::error::parse_error; use super::message::BooleanResp; +use super::message::DirectoryListingWrapper; use super::message::FileStatusType; use super::message::FileStatusWrapper; use super::message::FileStatusesWrapper; @@ -47,6 +48,7 @@ pub struct WebhdfsBuilder { root: Option, endpoint: Option, delegation: Option, + disable_list_batch: bool, } impl Debug for WebhdfsBuilder { @@ -104,6 +106,17 @@ impl WebhdfsBuilder { } self } + + /// Disable batch listing + /// + /// # Note + /// + /// When listing a directory, the backend will default to use batch listing. + /// If disable, the backend will list all files/directories in one request. + pub fn disable_list_batch(&mut self) -> &mut Self { + self.disable_list_batch = true; + self + } } impl Builder for WebhdfsBuilder { @@ -116,6 +129,9 @@ impl Builder for WebhdfsBuilder { map.get("root").map(|v| builder.root(v)); map.get("endpoint").map(|v| builder.endpoint(v)); map.get("delegation").map(|v| builder.delegation(v)); + map.get("disable_list_batch") + .map(|v| v == "true") + .map(|_v| builder.disable_list_batch()); builder } @@ -159,6 +175,7 @@ impl Builder for WebhdfsBuilder { auth, client, root_checker: OnceCell::new(), + disable_list_batch: self.disable_list_batch, }; Ok(backend) @@ -173,6 +190,7 @@ pub struct WebhdfsBackend { auth: Option, root_checker: OnceCell<()>, + pub disable_list_batch: bool, pub client: HttpClient, } @@ -276,6 +294,36 @@ impl WebhdfsBackend { Ok(req) } + pub(super) fn webhdfs_list_status_batch_request( + &self, + path: &str, + start_after: &Option, + ) -> Result> { + let p = build_abs_path(&self.root, path); + + // if it's not the first time to call LISTSTATUS_BATCH, we will add &startAfter= + let start_after_param = match start_after { + Some(sa) if sa.is_empty() => String::new(), + Some(sa) => format!("&startAfter={}", sa), + None => String::new(), + }; + + let mut url = format!( + "{}/webhdfs/v1/{}?op=LISTSTATUS_BATCH{}", + self.endpoint, + percent_encode_path(&p), + start_after_param + ); + if let Some(auth) = &self.auth { + url += format!("&{auth}").as_str(); + } + + let req = Request::get(&url) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + Ok(req) + } + async fn webhdfs_read_file( &self, path: &str, @@ -489,30 +537,51 @@ impl Accessor for WebhdfsBackend { if args.delimiter() != "/" { return Err(Error::new( ErrorKind::Unsupported, - "webhdfs only support delimiter `/`", + "WebHDFS only support delimiter `/`", )); } let path = path.trim_end_matches('/'); - let req = self.webhdfs_list_status_request(path)?; - - let resp = self.client.send(req).await?; - match resp.status() { - StatusCode::OK => { - let bs = resp.into_body().bytes().await?; - let file_statuses = serde_json::from_slice::(&bs) - .map_err(new_json_deserialize_error)? - .file_statuses - .file_status; - let objects = WebhdfsPager::new(path, file_statuses); - Ok((RpList::default(), objects)) + if !self.disable_list_batch { + let req = self.webhdfs_list_status_batch_request(path, &None)?; + let resp = self.client.send(req).await?; + match resp.status() { + StatusCode::OK => { + let bs = resp.into_body().bytes().await?; + let directory_listing = serde_json::from_slice::(&bs) + .map_err(new_json_deserialize_error)? + .directory_listing; + let file_statuses = directory_listing.partial_listing.file_statuses.file_status; + let mut objects = WebhdfsPager::new(self.clone(), path, file_statuses); + objects.set_remaining_entries(directory_listing.remaining_entries); + Ok((RpList::default(), objects)) + } + StatusCode::NOT_FOUND => { + let objects = WebhdfsPager::new(self.clone(), path, vec![]); + Ok((RpList::default(), objects)) + } + _ => Err(parse_error(resp).await?), } - StatusCode::NOT_FOUND => { - let objects = WebhdfsPager::new(path, vec![]); - Ok((RpList::default(), objects)) + } else { + let req = self.webhdfs_list_status_request(path)?; + let resp = self.client.send(req).await?; + match resp.status() { + StatusCode::OK => { + let bs = resp.into_body().bytes().await?; + let file_statuses = serde_json::from_slice::(&bs) + .map_err(new_json_deserialize_error)? + .file_statuses + .file_status; + let objects = WebhdfsPager::new(self.clone(), path, file_statuses); + Ok((RpList::default(), objects)) + } + StatusCode::NOT_FOUND => { + let objects = WebhdfsPager::new(self.clone(), path, vec![]); + Ok((RpList::default(), objects)) + } + _ => Err(parse_error(resp).await?), } - _ => Err(parse_error(resp).await?), } } } diff --git a/core/src/services/webhdfs/message.rs b/core/src/services/webhdfs/message.rs index e5008830329f..645c1f5f1cec 100644 --- a/core/src/services/webhdfs/message.rs +++ b/core/src/services/webhdfs/message.rs @@ -38,11 +38,30 @@ pub(super) struct FileStatusesWrapper { #[derive(Debug, Deserialize)] #[serde(rename_all = "PascalCase")] +pub(super) struct DirectoryListingWrapper { + pub directory_listing: DirectoryListing, +} + +#[derive(Debug, Default, Deserialize)] +#[serde(rename_all = "camelCase")] +pub(super) struct DirectoryListing { + pub partial_listing: PartialListing, + pub remaining_entries: u32, +} + +#[derive(Debug, Default, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub(super) struct PartialListing { + pub file_statuses: FileStatuses, +} + +#[derive(Debug, Default, Deserialize)] +#[serde(rename_all = "PascalCase")] pub(super) struct FileStatuses { pub file_status: Vec, } -#[derive(Debug, Deserialize)] +#[derive(Debug, Default, Deserialize)] #[serde(rename_all = "camelCase")] pub struct FileStatus { pub length: u64, @@ -53,19 +72,17 @@ pub struct FileStatus { pub ty: FileStatusType, } -#[derive(Debug, Deserialize, PartialEq, Eq)] +#[derive(Debug, Default, Deserialize, PartialEq, Eq)] #[serde(rename_all = "UPPERCASE")] pub enum FileStatusType { Directory, + #[default] File, } #[cfg(test)] mod test { use super::*; - use crate::raw::oio::Page; - use crate::services::webhdfs::pager::WebhdfsPager; - use crate::EntryMode; #[test] fn test_file_status() { @@ -149,17 +166,84 @@ mod test { .file_statuses .file_status; - let mut pager = WebhdfsPager::new("listing/directory", file_statuses); - let mut entries = vec![]; - while let Some(oes) = pager.next().await.expect("must success") { - entries.extend(oes); - } - - entries.sort_by(|a, b| a.path().cmp(b.path())); - assert_eq!(entries.len(), 2); - assert_eq!(entries[0].path(), "listing/directory/a.patch"); - assert_eq!(entries[0].mode(), EntryMode::FILE); - assert_eq!(entries[1].path(), "listing/directory/bar/"); - assert_eq!(entries[1].mode(), EntryMode::DIR); + // we should check the value of FileStatusWrapper directly. + assert_eq!(file_statuses.len(), 2); + assert_eq!(file_statuses[0].length, 24930); + assert_eq!(file_statuses[0].modification_time, 1320171722771); + assert_eq!(file_statuses[0].path_suffix, "a.patch"); + assert_eq!(file_statuses[0].ty, FileStatusType::File); + assert_eq!(file_statuses[1].length, 0); + assert_eq!(file_statuses[1].modification_time, 1320895981256); + assert_eq!(file_statuses[1].path_suffix, "bar"); + assert_eq!(file_statuses[1].ty, FileStatusType::Directory); + } + + #[tokio::test] + async fn test_list_status_batch() { + let json = r#" +{ + "DirectoryListing": { + "partialListing": { + "FileStatuses": { + "FileStatus": [ + { + "accessTime": 0, + "blockSize": 0, + "childrenNum": 0, + "fileId": 16387, + "group": "supergroup", + "length": 0, + "modificationTime": 1473305882563, + "owner": "andrew", + "pathSuffix": "bardir", + "permission": "755", + "replication": 0, + "storagePolicy": 0, + "type": "DIRECTORY" + }, + { + "accessTime": 1473305896945, + "blockSize": 1024, + "childrenNum": 0, + "fileId": 16388, + "group": "supergroup", + "length": 0, + "modificationTime": 1473305896965, + "owner": "andrew", + "pathSuffix": "bazfile", + "permission": "644", + "replication": 3, + "storagePolicy": 0, + "type": "FILE" + } + ] + } + }, + "remainingEntries": 2 + } +} + "#; + + let directory_listing = serde_json::from_str::(json) + .expect("must success") + .directory_listing; + + assert_eq!(directory_listing.remaining_entries, 2); + assert_eq!( + directory_listing + .partial_listing + .file_statuses + .file_status + .len(), + 2 + ); + assert_eq!( + directory_listing.partial_listing.file_statuses.file_status[0].path_suffix, + "bardir" + ); + assert_eq!( + directory_listing.partial_listing.file_statuses.file_status[1].path_suffix, + "bazfile" + ); } } diff --git a/core/src/services/webhdfs/pager.rs b/core/src/services/webhdfs/pager.rs index dc90473efb58..e9f45df3f44e 100644 --- a/core/src/services/webhdfs/pager.rs +++ b/core/src/services/webhdfs/pager.rs @@ -16,33 +16,87 @@ // under the License. use async_trait::async_trait; +use http::StatusCode; +use super::backend::WebhdfsBackend; +use super::error::parse_error; +use super::message::DirectoryListingWrapper; use super::message::FileStatus; use super::message::FileStatusType; use crate::raw::*; use crate::*; pub struct WebhdfsPager { + backend: WebhdfsBackend, path: String, statuses: Vec, + batch_start_after: Option, + remaining_entries: u32, } impl WebhdfsPager { - pub fn new(path: &str, statuses: Vec) -> Self { + pub fn new(backend: WebhdfsBackend, path: &str, statuses: Vec) -> Self { Self { + backend, path: path.to_string(), + batch_start_after: statuses.last().map(|f| f.path_suffix.clone()), statuses, + remaining_entries: 0, } } + + pub(super) fn set_remaining_entries(&mut self, remaining_entries: u32) { + self.remaining_entries = remaining_entries; + } } #[async_trait] impl oio::Page for WebhdfsPager { + /// Returns the next page of entries. + /// + /// Note: default list status with batch, calling next will query for next batch if `remaining_entries` > 0. async fn next(&mut self) -> Result>> { - if self.statuses.is_empty() { + if self.statuses.is_empty() && self.remaining_entries == 0 { return Ok(None); } + return match self.backend.disable_list_batch { + true => self.webhdfs_get_next_list_statuses(), + false => { + let req = self + .backend + .webhdfs_list_status_batch_request(&self.path, &self.batch_start_after)?; + let resp = self.backend.client.send(req).await?; + + match resp.status() { + StatusCode::OK => { + let bs = resp.into_body().bytes().await?; + let directory_listing = + serde_json::from_slice::(&bs) + .map_err(new_json_deserialize_error)?; + let file_statuses = directory_listing + .directory_listing + .partial_listing + .file_statuses + .file_status; + self.remaining_entries = + directory_listing.directory_listing.remaining_entries; + self.batch_start_after = + file_statuses.last().map(|f| f.path_suffix.clone()); + self.statuses.extend(file_statuses); + self.webhdfs_get_next_list_statuses() + } + StatusCode::NOT_FOUND => self.webhdfs_get_next_list_statuses(), + _ => Err(parse_error(resp).await?), + } + } + }; + } +} + +impl WebhdfsPager { + /// Returns the next page of entries. + fn webhdfs_get_next_list_statuses(&mut self) -> Result>> { let mut entries = Vec::with_capacity(self.statuses.len()); while let Some(status) = self.statuses.pop() { @@ -70,7 +124,6 @@ impl oio::Page for WebhdfsPager { let entry = oio::Entry::new(&path, meta); entries.push(entry); } - Ok(Some(entries)) } }