diff --git a/core/src/services/dbfs/backend.rs b/core/src/services/dbfs/backend.rs index 702e8f779322..e8e9654ee623 100644 --- a/core/src/services/dbfs/backend.rs +++ b/core/src/services/dbfs/backend.rs @@ -18,26 +18,21 @@ use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; +use std::sync::Arc; use async_trait::async_trait; -use base64::prelude::BASE64_STANDARD; -use base64::Engine; -use bytes::Bytes; -use http::header; -use http::Request; -use http::Response; use http::StatusCode; use log::debug; use serde::Deserialize; -use serde_json::json; -use super::error::parse_dbfs_read_error; +use crate::raw::*; +use crate::*; + +use super::core::DbfsCore; use super::error::parse_error; use super::pager::DbfsPager; -use super::reader::IncomingDbfsAsyncBody; +use super::reader::DbfsReader; use super::writer::DbfsWriter; -use crate::raw::*; -use crate::*; /// [Dbfs](https://docs.databricks.com/api/azure/workspace/dbfs)'s REST API support. #[doc = include_str!("docs.md")] @@ -138,10 +133,12 @@ impl Builder for DbfsBuilder { debug!("backend build finished: {:?}", &self); Ok(DbfsBackend { - root, - endpoint: self.endpoint.clone(), - token, - client, + core: Arc::new(DbfsCore { + root, + endpoint: endpoint.to_string(), + token, + client, + }), }) } } @@ -149,196 +146,12 @@ impl Builder for DbfsBuilder { /// Backend for DBFS service #[derive(Debug, Clone)] pub struct DbfsBackend { - root: String, - endpoint: String, - token: String, - pub(super) client: HttpClient, -} - -impl DbfsBackend { - fn dbfs_create_dir_request(&self, path: &str) -> Result> { - let url = format!("{}/api/2.0/dbfs/mkdirs", self.endpoint); - let mut req = Request::post(&url); - - let auth_header_content = format!("Bearer {}", self.token); - req = req.header(header::AUTHORIZATION, auth_header_content); - - let p = build_rooted_abs_path(&self.root, path) - .trim_end_matches('/') - .to_string(); - - let req_body = format!("{{\"path\": \"{}\"}}", percent_encode_path(&p)); - let body = AsyncBody::Bytes(Bytes::from(req_body)); - - let req = req.body(body).map_err(new_request_build_error)?; - - Ok(req) - } - - async fn dbfs_delete(&self, path: &str) -> Result> { - let url = format!("{}/api/2.0/dbfs/delete", self.endpoint); - let mut req = Request::post(&url); - - let auth_header_content = format!("Bearer {}", self.token); - req = req.header(header::AUTHORIZATION, auth_header_content); - - let p = build_rooted_abs_path(&self.root, path) - .trim_end_matches('/') - .to_string(); - - let request_body = &json!({ - "path": percent_encode_path(&p), - // TODO: support recursive toggle, should we add a new field in OpDelete? - "recursive": true, - }); - - let body = AsyncBody::Bytes(Bytes::from(request_body.to_string())); - - let req = req.body(body).map_err(new_request_build_error)?; - - self.client.send(req).await - } - - async fn dbfs_rename(&self, from: &str, to: &str) -> Result> { - let source = build_rooted_abs_path(&self.root, from); - let target = build_rooted_abs_path(&self.root, to); - - let url = format!("{}/api/2.0/dbfs/move", self.endpoint); - let mut req = Request::post(&url); - - let auth_header_content = format!("Bearer {}", self.token); - req = req.header(header::AUTHORIZATION, auth_header_content); - - let req_body = &json!({ - "source_path": percent_encode_path(&source), - "destination_path": percent_encode_path(&target), - }); - - let body = AsyncBody::Bytes(Bytes::from(req_body.to_string())); - - let req = req.body(body).map_err(new_request_build_error)?; - - self.client.send(req).await - } - - pub async fn dbfs_list(&self, path: &str) -> Result> { - let p = build_rooted_abs_path(&self.root, path) - .trim_end_matches('/') - .to_string(); - - let url = format!( - "{}/api/2.0/dbfs/list?path={}", - self.endpoint, - percent_encode_path(&p) - ); - let mut req = Request::get(&url); - - let auth_header_content = format!("Bearer {}", self.token); - req = req.header(header::AUTHORIZATION, auth_header_content); - - let req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.client.send(req).await - } - - pub fn dbfs_create_file_request(&self, path: &str, body: Bytes) -> Result> { - let url = format!("{}/api/2.0/dbfs/put", self.endpoint); - - let contents = BASE64_STANDARD.encode(body); - let mut req = Request::post(&url); - - let auth_header_content = format!("Bearer {}", self.token); - req = req.header(header::AUTHORIZATION, auth_header_content); - - let req_body = &json!({ - "path": path, - "contents": contents, - // TODO: support overwrite toggle, should we add a new field in OpWrite? - "overwrite": true, - }); - - let body = AsyncBody::Bytes(Bytes::from(req_body.to_string())); - - req.body(body).map_err(new_request_build_error) - } - - async fn dbfs_read( - &self, - path: &str, - range: BytesRange, - ) -> Result> { - let p = build_rooted_abs_path(&self.root, path) - .trim_end_matches('/') - .to_string(); - - let mut url = format!( - "{}/api/2.0/dbfs/read?path={}", - self.endpoint, - percent_encode_path(&p) - ); - - if let Some(offset) = range.offset() { - url.push_str(&format!("&offset={}", offset)); - } - - if let Some(length) = range.size() { - url.push_str(&format!("&length={}", length)); - } - - let mut req = Request::get(&url); - - let auth_header_content = format!("Bearer {}", self.token); - req = req.header(header::AUTHORIZATION, auth_header_content); - - let req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.client.send_dbfs(req).await - } - - async fn dbfs_get_properties(&self, path: &str) -> Result> { - let p = build_rooted_abs_path(&self.root, path) - .trim_end_matches('/') - .to_string(); - - let url = format!( - "{}/api/2.0/dbfs/get-status?path={}", - &self.endpoint, - percent_encode_path(&p) - ); - - let mut req = Request::get(&url); - - let auth_header_content = format!("Bearer {}", self.token); - req = req.header(header::AUTHORIZATION, auth_header_content); - - let req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.client.send(req).await - } - - async fn dbfs_ensure_parent_path(&self, path: &str) -> Result<()> { - let resp = self.dbfs_get_properties(path).await?; - - match resp.status() { - StatusCode::OK => return Ok(()), - StatusCode::NOT_FOUND => { - self.create_dir(path, OpCreateDir::default()).await?; - } - _ => return Err(parse_error(resp).await?), - } - Ok(()) - } + core: Arc, } #[async_trait] impl Accessor for DbfsBackend { - type Reader = IncomingDbfsAsyncBody; + type Reader = DbfsReader; type BlockingReader = (); type Writer = oio::OneShotWriter; type BlockingWriter = (); @@ -348,7 +161,7 @@ impl Accessor for DbfsBackend { fn info(&self) -> AccessorInfo { let mut am = AccessorInfo::default(); am.set_scheme(Scheme::Dbfs) - .set_root(&self.root) + .set_root(&self.core.root) .set_native_capability(Capability { stat: true, @@ -370,9 +183,7 @@ impl Accessor for DbfsBackend { } async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result { - let req = self.dbfs_create_dir_request(path)?; - - let resp = self.client.send(req).await?; + let resp = self.core.dbfs_create_dir(path).await?; let status = resp.status(); @@ -386,57 +197,69 @@ impl Accessor for DbfsBackend { } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - let resp = self.dbfs_read(path, args.range()).await?; + let resp = self.core.dbfs_read(path, args.range()).await?; let status = resp.status(); match status { - StatusCode::OK => { - // NOTE: If range is not specified, we need to get content length from stat API. - if let Some(size) = args.range().size() { - let mut meta = parse_into_metadata(path, resp.headers())?; - meta.set_content_length(size); - Ok((RpRead::with_metadata(meta), resp.into_body())) - } else { - let stat_resp = self.dbfs_get_properties(path).await?; - let meta = match stat_resp.status() { - StatusCode::OK => { - let mut meta = parse_into_metadata(path, stat_resp.headers())?; - let bs = stat_resp.into_body().bytes().await?; - let decoded_response = serde_json::from_slice::(&bs) - .map_err(new_json_deserialize_error)?; - meta.set_last_modified(parse_datetime_from_from_timestamp_millis( - decoded_response.modification_time, - )?); - match decoded_response.is_dir { - true => meta.set_mode(EntryMode::DIR), - false => { - meta.set_mode(EntryMode::FILE); - meta.set_content_length(decoded_response.file_size as u64) - } - }; - meta - } - _ => return Err(parse_error(stat_resp).await?), - }; - Ok((RpRead::with_metadata(meta), resp.into_body())) - } + StatusCode::OK | StatusCode::PARTIAL_CONTENT => { + let meta = parse_into_metadata(path, resp.headers())?; + Ok((RpRead::with_metadata(meta), resp.into_body())) } - _ => Err(parse_dbfs_read_error(resp).await?), + _ => Err(parse_error(resp).await?), } + + // let resp = self.core.dbfs_read(path, args.range()).await?; + // + // let status = resp.status(); + // + // match status { + // StatusCode::OK => { + // // NOTE: If range is not specified, we need to get content length from stat API. + // if let Some(size) = args.range().size() { + // let mut meta = parse_into_metadata(path, resp.headers())?; + // meta.set_content_length(size); + // Ok((RpRead::with_metadata(meta), resp.into_body())) + // } else { + // let stat_resp = self.core.dbfs_get_status(path).await?; + // let meta = match stat_resp.status() { + // StatusCode::OK => { + // let mut meta = parse_into_metadata(path, stat_resp.headers())?; + // let bs = stat_resp.into_body().bytes().await?; + // let decoded_response = serde_json::from_slice::(&bs) + // .map_err(new_json_deserialize_error)?; + // meta.set_last_modified(parse_datetime_from_from_timestamp_millis( + // decoded_response.modification_time, + // )?); + // match decoded_response.is_dir { + // true => meta.set_mode(EntryMode::DIR), + // false => { + // meta.set_mode(EntryMode::FILE); + // meta.set_content_length(decoded_response.file_size as u64) + // } + // }; + // meta + // } + // _ => return Err(parse_error(stat_resp).await?), + // }; + // Ok((RpRead::with_metadata(meta), resp.into_body())) + // } + // } + // _ => Err(parse_dbfs_read_error(resp).await?), + // } } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { Ok(( RpWrite::default(), - oio::OneShotWriter::new(DbfsWriter::new(self.clone(), args, path.to_string())), + oio::OneShotWriter::new(DbfsWriter::new(self.core.clone(), args, path.to_string())), )) } async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result { - self.dbfs_ensure_parent_path(to).await?; + self.core.dbfs_ensure_parent_path(to).await?; - let resp = self.dbfs_rename(from, to).await?; + let resp = self.core.dbfs_rename(from, to).await?; let status = resp.status(); @@ -455,7 +278,7 @@ impl Accessor for DbfsBackend { return Ok(RpStat::new(Metadata::new(EntryMode::DIR))); } - let resp = self.dbfs_get_properties(path).await?; + let resp = self.core.dbfs_get_status(path).await?; let status = resp.status(); @@ -486,7 +309,7 @@ impl Accessor for DbfsBackend { /// NOTE: Server will return 200 even if the path doesn't exist. async fn delete(&self, path: &str, _: OpDelete) -> Result { - let resp = self.dbfs_delete(path).await?; + let resp = self.core.dbfs_delete(path).await?; let status = resp.status(); @@ -503,7 +326,7 @@ impl Accessor for DbfsBackend { } async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Pager)> { - let op = DbfsPager::new(self.clone(), path.to_string()); + let op = DbfsPager::new(self.core.clone(), path.to_string()); Ok((RpList::default(), op)) } diff --git a/core/src/services/dbfs/core.rs b/core/src/services/dbfs/core.rs new file mode 100644 index 000000000000..e03e14a1f015 --- /dev/null +++ b/core/src/services/dbfs/core.rs @@ -0,0 +1,236 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; + +use base64::prelude::BASE64_STANDARD; +use base64::Engine; +use bytes::Bytes; +use http::header; +use http::Request; +use http::Response; +use http::StatusCode; +use serde_json::json; + +use crate::raw::build_rooted_abs_path; +use crate::raw::new_request_build_error; +use crate::raw::percent_encode_path; +use crate::raw::AsyncBody; +use crate::raw::BytesRange; +use crate::raw::HttpClient; +use crate::raw::IncomingAsyncBody; +use crate::*; + +use super::error::parse_error; +use super::reader::IncomingDbfsAsyncBody; + +pub struct DbfsCore { + pub root: String, + pub endpoint: String, + pub token: String, + pub client: HttpClient, +} + +impl Debug for DbfsCore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DbfsCore") + .field("root", &self.root) + .field("endpoint", &self.endpoint) + .field("token", &self.token) + .finish_non_exhaustive() + } +} + +impl DbfsCore { + pub async fn dbfs_create_dir(&self, path: &str) -> Result> { + let url = format!("{}/api/2.0/dbfs/mkdirs", self.endpoint); + let mut req = Request::post(&url); + + let auth_header_content = format!("Bearer {}", self.token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let p = build_rooted_abs_path(&self.root, path) + .trim_end_matches('/') + .to_string(); + + let req_body = format!("{{\"path\": \"{}\"}}", percent_encode_path(&p)); + let body = AsyncBody::Bytes(Bytes::from(req_body)); + + let req = req.body(body).map_err(new_request_build_error)?; + + self.client.send(req).await + } + + pub async fn dbfs_delete(&self, path: &str) -> Result> { + let url = format!("{}/api/2.0/dbfs/delete", self.endpoint); + let mut req = Request::post(&url); + + let auth_header_content = format!("Bearer {}", self.token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let p = build_rooted_abs_path(&self.root, path) + .trim_end_matches('/') + .to_string(); + + let request_body = &json!({ + "path": percent_encode_path(&p), + // TODO: support recursive toggle, should we add a new field in OpDelete? + "recursive": true, + }); + + let body = AsyncBody::Bytes(Bytes::from(request_body.to_string())); + + let req = req.body(body).map_err(new_request_build_error)?; + + self.client.send(req).await + } + + pub async fn dbfs_rename(&self, from: &str, to: &str) -> Result> { + let source = build_rooted_abs_path(&self.root, from); + let target = build_rooted_abs_path(&self.root, to); + + let url = format!("{}/api/2.0/dbfs/move", self.endpoint); + let mut req = Request::post(&url); + + let auth_header_content = format!("Bearer {}", self.token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let req_body = &json!({ + "source_path": percent_encode_path(&source), + "destination_path": percent_encode_path(&target), + }); + + let body = AsyncBody::Bytes(Bytes::from(req_body.to_string())); + + let req = req.body(body).map_err(new_request_build_error)?; + + self.client.send(req).await + } + + pub async fn dbfs_list(&self, path: &str) -> Result> { + let p = build_rooted_abs_path(&self.root, path) + .trim_end_matches('/') + .to_string(); + + let url = format!( + "{}/api/2.0/dbfs/list?path={}", + self.endpoint, + percent_encode_path(&p) + ); + let mut req = Request::get(&url); + + let auth_header_content = format!("Bearer {}", self.token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.client.send(req).await + } + + pub fn dbfs_create_file_request(&self, path: &str, body: Bytes) -> Result> { + let url = format!("{}/api/2.0/dbfs/put", self.endpoint); + + let contents = BASE64_STANDARD.encode(body); + let mut req = Request::post(&url); + + let auth_header_content = format!("Bearer {}", self.token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let req_body = &json!({ + "path": path, + "contents": contents, + "overwrite": true, + }); + + let body = AsyncBody::Bytes(Bytes::from(req_body.to_string())); + + req.body(body).map_err(new_request_build_error) + } + + pub async fn dbfs_read( + &self, + path: &str, + range: BytesRange, + ) -> Result> { + let p = build_rooted_abs_path(&self.root, path) + .trim_end_matches('/') + .to_string(); + + let mut url = format!( + "{}/api/2.0/dbfs/read?path={}", + self.endpoint, + percent_encode_path(&p) + ); + + if let Some(offset) = range.offset() { + url.push_str(&format!("&offset={}", offset)); + } + + if let Some(length) = range.size() { + url.push_str(&format!("&length={}", length)); + } + + let mut req = Request::get(&url); + + let auth_header_content = format!("Bearer {}", self.token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.client.send(req).await + } + + pub async fn dbfs_get_status(&self, path: &str) -> Result> { + let p = build_rooted_abs_path(&self.root, path) + .trim_end_matches('/') + .to_string(); + + let url = format!( + "{}/api/2.0/dbfs/get-status?path={}", + &self.endpoint, + percent_encode_path(&p) + ); + + let mut req = Request::get(&url); + + let auth_header_content = format!("Bearer {}", self.token); + req = req.header(header::AUTHORIZATION, auth_header_content); + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.client.send(req).await + } + + pub async fn dbfs_ensure_parent_path(&self, path: &str) -> Result<()> { + let resp = self.dbfs_get_status(path).await?; + + match resp.status() { + StatusCode::OK => return Ok(()), + StatusCode::NOT_FOUND => { + self.dbfs_create_dir(path).await?; + } + _ => return Err(parse_error(resp).await?), + } + Ok(()) + } +} diff --git a/core/src/services/dbfs/mod.rs b/core/src/services/dbfs/mod.rs index 094ebf76b4ed..bb72050318ff 100644 --- a/core/src/services/dbfs/mod.rs +++ b/core/src/services/dbfs/mod.rs @@ -17,6 +17,8 @@ mod backend; pub use backend::DbfsBuilder as Dbfs; + +mod core; mod error; mod pager; mod reader; diff --git a/core/src/services/dbfs/pager.rs b/core/src/services/dbfs/pager.rs index 98306bd27aad..58aef5d757dc 100644 --- a/core/src/services/dbfs/pager.rs +++ b/core/src/services/dbfs/pager.rs @@ -15,25 +15,28 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use async_trait::async_trait; use http::StatusCode; use serde::Deserialize; -use super::backend::DbfsBackend; -use super::error::parse_error; use crate::raw::*; +use crate::services::dbfs::core::DbfsCore; use crate::*; +use super::error::parse_error; + pub struct DbfsPager { - backend: DbfsBackend, + core: Arc, path: String, done: bool, } impl DbfsPager { - pub fn new(backend: DbfsBackend, path: String) -> Self { + pub fn new(core: Arc, path: String) -> Self { Self { - backend, + core, path, done: false, } @@ -47,7 +50,7 @@ impl oio::Page for DbfsPager { return Ok(None); } - let response = self.backend.dbfs_list(&self.path).await?; + let response = self.core.dbfs_list(&self.path).await?; let status_code = response.status(); if !status_code.is_success() { diff --git a/core/src/services/dbfs/writer.rs b/core/src/services/dbfs/writer.rs index 3d2d23c582b6..04dbe4611812 100644 --- a/core/src/services/dbfs/writer.rs +++ b/core/src/services/dbfs/writer.rs @@ -15,25 +15,28 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use async_trait::async_trait; use http::StatusCode; -use super::backend::DbfsBackend; -use super::error::parse_error; use crate::raw::oio::WriteBuf; use crate::raw::*; +use crate::services::dbfs::core::DbfsCore; use crate::*; +use super::error::parse_error; + pub struct DbfsWriter { - backend: DbfsBackend, + core: Arc, path: String, } impl DbfsWriter { const MAX_SIMPLE_SIZE: usize = 1024 * 1024; - pub fn new(backend: DbfsBackend, _op: OpWrite, path: String) -> Self { - DbfsWriter { backend, path } + pub fn new(core: Arc, _op: OpWrite, path: String) -> Self { + DbfsWriter { core, path } } } @@ -51,9 +54,9 @@ impl oio::OneShotWrite for DbfsWriter { )); } - let req = self.backend.dbfs_create_file_request(&self.path, bs)?; + let req = self.core.dbfs_create_file_request(&self.path, bs)?; - let resp = self.backend.client.send(req).await?; + let resp = self.core.client.send(req).await?; let status = resp.status(); match status {