Skip to content

Commit

Permalink
implement the logic for hdfs
Browse files Browse the repository at this point in the history
  • Loading branch information
meteorgan committed Jan 27, 2025
1 parent 634bdec commit d126087
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 8 deletions.
20 changes: 18 additions & 2 deletions core/src/services/hdfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,12 @@ impl Access for HdfsBackend {

async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let target_path = build_rooted_abs_path(&self.root, path);
let mut initial_size = 0;
let target_exists = match self.client.metadata(&target_path) {
Ok(_) => true,
Ok(meta) => {
initial_size = meta.len();
true
}
Err(err) => {
if err.kind() != io::ErrorKind::NotFound {
return Err(new_std_io_error(err));
Expand All @@ -323,6 +327,9 @@ impl Access for HdfsBackend {
let parent = get_parent(&target_path);
self.client.create_dir(parent).map_err(new_std_io_error)?;
}
if !should_append {
initial_size = 0;
}

let mut open_options = self.client.open_file();
open_options.create(true);
Expand All @@ -345,6 +352,7 @@ impl Access for HdfsBackend {
f,
Arc::clone(&self.client),
target_exists,
initial_size,
),
))
}
Expand Down Expand Up @@ -474,8 +482,12 @@ impl Access for HdfsBackend {

fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
let target_path = build_rooted_abs_path(&self.root, path);
let mut initial_size = 0;
let target_exists = match self.client.metadata(&target_path) {
Ok(_) => true,
Ok(meta) => {
initial_size = meta.len();
true
}
Err(err) => {
if err.kind() != io::ErrorKind::NotFound {
return Err(new_std_io_error(err));
Expand All @@ -498,6 +510,9 @@ impl Access for HdfsBackend {
let parent = get_parent(&target_path);
self.client.create_dir(parent).map_err(new_std_io_error)?;
}
if !should_append {
initial_size = 0;
}

let mut open_options = self.client.open_file();
open_options.create(true);
Expand All @@ -519,6 +534,7 @@ impl Access for HdfsBackend {
f,
Arc::clone(&self.client),
target_exists,
initial_size,
),
))
}
Expand Down
13 changes: 7 additions & 6 deletions core/src/services/hdfs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub struct HdfsWriter<F> {
f: Option<F>,
client: Arc<hdrs::Client>,
target_path_exists: bool,
write_bytes_count: u64,
size: u64,
}

/// # Safety
Expand All @@ -45,21 +45,22 @@ impl<F> HdfsWriter<F> {
f: F,
client: Arc<hdrs::Client>,
target_path_exists: bool,
initial_size: u64,
) -> Self {
Self {
target_path,
tmp_path,
f: Some(f),
client,
target_path_exists,
write_bytes_count: 0,
size: initial_size,
}
}
}

impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
async fn write(&mut self, mut bs: Buffer) -> Result<()> {
self.write_bytes_count += bs.len() as u64;
self.size += bs.len() as u64;
let f = self.f.as_mut().expect("HdfsWriter must be initialized");

while bs.has_remaining() {
Expand Down Expand Up @@ -87,7 +88,7 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
.map_err(new_std_io_error)?
}

Ok(Metadata::default().with_content_length(self.write_bytes_count))
Ok(Metadata::default().with_content_length(self.size))
}

async fn abort(&mut self) -> Result<()> {
Expand All @@ -100,7 +101,7 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {

impl oio::BlockingWrite for HdfsWriter<hdrs::File> {
fn write(&mut self, mut bs: Buffer) -> Result<()> {
self.write_bytes_count += bs.len() as u64;
self.size += bs.len() as u64;

let f = self.f.as_mut().expect("HdfsWriter must be initialized");
while bs.has_remaining() {
Expand All @@ -127,6 +128,6 @@ impl oio::BlockingWrite for HdfsWriter<hdrs::File> {
.map_err(new_std_io_error)?;
}

Ok(Metadata::default().with_content_length(self.write_bytes_count))
Ok(Metadata::default().with_content_length(self.size))
}
}

0 comments on commit d126087

Please sign in to comment.