Skip to content

Commit

Permalink
implement logic for webhdfs
Browse files Browse the repository at this point in the history
  • Loading branch information
meteorgan committed Jan 27, 2025
1 parent c39303c commit 634bdec
Showing 1 changed file with 14 additions and 16 deletions.
30 changes: 14 additions & 16 deletions core/src/services/webhdfs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
// specific language governing permissions and limitations
// under the License.

use bytes::Buf;
use http::StatusCode;
use uuid::Uuid;

use super::backend::WebhdfsBackend;
use super::error::parse_error;
use crate::raw::*;
use crate::services::webhdfs::message::FileStatusWrapper;
use crate::*;

pub type WebhdfsWriters =
Expand Down Expand Up @@ -142,19 +144,16 @@ impl oio::BlockWrite for WebhdfsWriter {

impl oio::AppendWrite for WebhdfsWriter {
async fn offset(&self) -> Result<u64> {
Ok(0)
}

async fn append(&self, _offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
let resp = self.backend.webhdfs_get_file_status(&self.path).await?;

let status = resp.status();

let location;

match status {
StatusCode::OK => {
location = self.backend.webhdfs_init_append_request(&self.path).await?;
let bs = resp.into_body();
let file_status = serde_json::from_reader::<_, FileStatusWrapper>(bs.reader())
.map_err(new_json_deserialize_error)?
.file_status;

Ok(file_status.length)
}
StatusCode::NOT_FOUND => {
let req = self
Expand All @@ -163,21 +162,20 @@ impl oio::AppendWrite for WebhdfsWriter {
.await?;

let resp = self.backend.client.send(req).await?;

let status = resp.status();

match status {
StatusCode::CREATED | StatusCode::OK => {
location = self.backend.webhdfs_init_append_request(&self.path).await?;
}
_ => return Err(parse_error(resp)),
StatusCode::CREATED | StatusCode::OK => Ok(0),
_ => Err(parse_error(resp)),
}
}
_ => return Err(parse_error(resp)),
_ => Err(parse_error(resp)),
}
}

async fn append(&self, _offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
let location = self.backend.webhdfs_init_append_request(&self.path).await?;
let req = self.backend.webhdfs_append_request(&location, size, body)?;

let resp = self.backend.client.send(req).await?;

let status = resp.status();
Expand Down

0 comments on commit 634bdec

Please sign in to comment.