Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): adopt WebHDFS LISTSTATUS_BATCH for better performance #2499

Merged
merged 12 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ OPENDAL_WEBHDFS_TEST=false
OPENDAL_WEBHDFS_ROOT=/tmp/opendal/
OPENDAL_WEBHDFS_ENDPOINT=http://127.0.0.1:9870
OPENDAL_WEBHDFS_DELEGATION=<delegation>
OPENDAL_WEBHDFS_DISABLE_LIST_BATCH=false
# supbase
OPENDAL_SUPABASE_TEST=false
OPENDAL_SUPABASE_BUCKET=<bucket>
Expand Down
10 changes: 10 additions & 0 deletions .github/workflows/service_test_webhdfs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,13 @@ jobs:
OPENDAL_WEBHDFS_TEST: on
OPENDAL_WEBHDFS_ROOT: /
OPENDAL_WEBHDFS_ENDPOINT: http://127.0.0.1:9870

- name: Test with disable_list_batch
shell: bash
working-directory: core
run: cargo nextest run webhdfs
env:
OPENDAL_WEBHDFS_TEST: on
OPENDAL_WEBHDFS_ROOT: /
OPENDAL_WEBHDFS_ENDPOINT: http://127.0.0.1:9870
OPENDAL_WEBHDFS_DISABLE_LIST_BATCH: true
103 changes: 86 additions & 17 deletions core/src/services/webhdfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use tokio::sync::OnceCell;

use super::error::parse_error;
use super::message::BooleanResp;
use super::message::DirectoryListingWrapper;
use super::message::FileStatusType;
use super::message::FileStatusWrapper;
use super::message::FileStatusesWrapper;
Expand All @@ -47,6 +48,7 @@ pub struct WebhdfsBuilder {
root: Option<String>,
endpoint: Option<String>,
delegation: Option<String>,
disable_list_batch: bool,
}

impl Debug for WebhdfsBuilder {
Expand Down Expand Up @@ -104,6 +106,17 @@ impl WebhdfsBuilder {
}
self
}

/// Disable batch listing
///
/// # Note
///
/// When listing a directory, the backend will default to use batch listing.
/// If disable, the backend will list all files/directories in one request.
pub fn disable_list_batch(&mut self) -> &mut Self {
self.disable_list_batch = true;
self
}
}

impl Builder for WebhdfsBuilder {
Expand All @@ -116,6 +129,9 @@ impl Builder for WebhdfsBuilder {
map.get("root").map(|v| builder.root(v));
map.get("endpoint").map(|v| builder.endpoint(v));
map.get("delegation").map(|v| builder.delegation(v));
map.get("disable_list_batch")
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
.map(|v| v == "true")
.map(|_v| builder.disable_list_batch());

builder
}
Expand Down Expand Up @@ -159,6 +175,7 @@ impl Builder for WebhdfsBuilder {
auth,
client,
root_checker: OnceCell::new(),
disable_list_batch: self.disable_list_batch,
};

Ok(backend)
Expand All @@ -173,6 +190,7 @@ pub struct WebhdfsBackend {
auth: Option<String>,
root_checker: OnceCell<()>,

pub disable_list_batch: bool,
pub client: HttpClient,
}

Expand Down Expand Up @@ -276,6 +294,36 @@ impl WebhdfsBackend {
Ok(req)
}

pub(super) fn webhdfs_list_status_batch_request(
&self,
path: &str,
start_after: &Option<String>,
) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);

// if it's not the first time to call LISTSTATUS_BATCH, we will add &startAfter=<CHILD>
let start_after_param = match start_after {
Some(sa) if sa.is_empty() => String::new(),
Some(sa) => format!("&startAfter={}", sa),
None => String::new(),
};

let mut url = format!(
"{}/webhdfs/v1/{}?op=LISTSTATUS_BATCH{}",
self.endpoint,
percent_encode_path(&p),
start_after_param
);
if let Some(auth) = &self.auth {
url += format!("&{auth}").as_str();
}

let req = Request::get(&url)
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
Ok(req)
}

async fn webhdfs_read_file(
&self,
path: &str,
Expand Down Expand Up @@ -489,30 +537,51 @@ impl Accessor for WebhdfsBackend {
if args.delimiter() != "/" {
return Err(Error::new(
ErrorKind::Unsupported,
"webhdfs only support delimiter `/`",
"WebHDFS only support delimiter `/`",
));
}

let path = path.trim_end_matches('/');
let req = self.webhdfs_list_status_request(path)?;

let resp = self.client.send(req).await?;
match resp.status() {
StatusCode::OK => {
let bs = resp.into_body().bytes().await?;
let file_statuses = serde_json::from_slice::<FileStatusesWrapper>(&bs)
.map_err(new_json_deserialize_error)?
.file_statuses
.file_status;

let objects = WebhdfsPager::new(path, file_statuses);
Ok((RpList::default(), objects))
if !self.disable_list_batch {
let req = self.webhdfs_list_status_batch_request(path, &None)?;
let resp = self.client.send(req).await?;
match resp.status() {
StatusCode::OK => {
let bs = resp.into_body().bytes().await?;
let directory_listing = serde_json::from_slice::<DirectoryListingWrapper>(&bs)
.map_err(new_json_deserialize_error)?
.directory_listing;
let file_statuses = directory_listing.partial_listing.file_statuses.file_status;
let mut objects = WebhdfsPager::new(self.clone(), path, file_statuses);
objects.set_remaining_entries(directory_listing.remaining_entries);
Ok((RpList::default(), objects))
}
StatusCode::NOT_FOUND => {
let objects = WebhdfsPager::new(self.clone(), path, vec![]);
Ok((RpList::default(), objects))
}
_ => Err(parse_error(resp).await?),
}
StatusCode::NOT_FOUND => {
let objects = WebhdfsPager::new(path, vec![]);
Ok((RpList::default(), objects))
} else {
let req = self.webhdfs_list_status_request(path)?;
let resp = self.client.send(req).await?;
match resp.status() {
StatusCode::OK => {
let bs = resp.into_body().bytes().await?;
let file_statuses = serde_json::from_slice::<FileStatusesWrapper>(&bs)
.map_err(new_json_deserialize_error)?
.file_statuses
.file_status;
let objects = WebhdfsPager::new(self.clone(), path, file_statuses);
Ok((RpList::default(), objects))
}
StatusCode::NOT_FOUND => {
let objects = WebhdfsPager::new(self.clone(), path, vec![]);
Ok((RpList::default(), objects))
}
_ => Err(parse_error(resp).await?),
}
_ => Err(parse_error(resp).await?),
}
}
}
118 changes: 101 additions & 17 deletions core/src/services/webhdfs/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,30 @@ pub(super) struct FileStatusesWrapper {

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub(super) struct DirectoryListingWrapper {
pub directory_listing: DirectoryListing,
}

#[derive(Debug, Default, Deserialize)]
#[serde(rename_all = "camelCase")]
pub(super) struct DirectoryListing {
pub partial_listing: PartialListing,
pub remaining_entries: u32,
}

#[derive(Debug, Default, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub(super) struct PartialListing {
pub file_statuses: FileStatuses,
}

#[derive(Debug, Default, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub(super) struct FileStatuses {
pub file_status: Vec<FileStatus>,
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Default, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct FileStatus {
pub length: u64,
Expand All @@ -53,19 +72,17 @@ pub struct FileStatus {
pub ty: FileStatusType,
}

#[derive(Debug, Deserialize, PartialEq, Eq)]
#[derive(Debug, Default, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "UPPERCASE")]
pub enum FileStatusType {
Directory,
#[default]
File,
}

#[cfg(test)]
mod test {
use super::*;
use crate::raw::oio::Page;
use crate::services::webhdfs::pager::WebhdfsPager;
use crate::EntryMode;

#[test]
fn test_file_status() {
Expand Down Expand Up @@ -149,17 +166,84 @@ mod test {
.file_statuses
.file_status;

let mut pager = WebhdfsPager::new("listing/directory", file_statuses);
let mut entries = vec![];
while let Some(oes) = pager.next().await.expect("must success") {
entries.extend(oes);
}

entries.sort_by(|a, b| a.path().cmp(b.path()));
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].path(), "listing/directory/a.patch");
assert_eq!(entries[0].mode(), EntryMode::FILE);
assert_eq!(entries[1].path(), "listing/directory/bar/");
assert_eq!(entries[1].mode(), EntryMode::DIR);
// we should check the value of FileStatusWrapper directly.
assert_eq!(file_statuses.len(), 2);
assert_eq!(file_statuses[0].length, 24930);
assert_eq!(file_statuses[0].modification_time, 1320171722771);
assert_eq!(file_statuses[0].path_suffix, "a.patch");
assert_eq!(file_statuses[0].ty, FileStatusType::File);
assert_eq!(file_statuses[1].length, 0);
assert_eq!(file_statuses[1].modification_time, 1320895981256);
assert_eq!(file_statuses[1].path_suffix, "bar");
assert_eq!(file_statuses[1].ty, FileStatusType::Directory);
}

#[tokio::test]
async fn test_list_status_batch() {
let json = r#"
{
"DirectoryListing": {
"partialListing": {
"FileStatuses": {
"FileStatus": [
{
"accessTime": 0,
"blockSize": 0,
"childrenNum": 0,
"fileId": 16387,
"group": "supergroup",
"length": 0,
"modificationTime": 1473305882563,
"owner": "andrew",
"pathSuffix": "bardir",
"permission": "755",
"replication": 0,
"storagePolicy": 0,
"type": "DIRECTORY"
},
{
"accessTime": 1473305896945,
"blockSize": 1024,
"childrenNum": 0,
"fileId": 16388,
"group": "supergroup",
"length": 0,
"modificationTime": 1473305896965,
"owner": "andrew",
"pathSuffix": "bazfile",
"permission": "644",
"replication": 3,
"storagePolicy": 0,
"type": "FILE"
}
]
}
},
"remainingEntries": 2
}
}
"#;

let directory_listing = serde_json::from_str::<DirectoryListingWrapper>(json)
.expect("must success")
.directory_listing;

assert_eq!(directory_listing.remaining_entries, 2);
assert_eq!(
directory_listing
.partial_listing
.file_statuses
.file_status
.len(),
2
);
assert_eq!(
directory_listing.partial_listing.file_statuses.file_status[0].path_suffix,
"bardir"
);
assert_eq!(
directory_listing.partial_listing.file_statuses.file_status[1].path_suffix,
"bazfile"
);
}
}
Loading