diff --git a/core/src/services/hdfs/backend.rs b/core/src/services/hdfs/backend.rs index c4497ce0c1e..ee404197512 100644 --- a/core/src/services/hdfs/backend.rs +++ b/core/src/services/hdfs/backend.rs @@ -299,8 +299,12 @@ impl Access for HdfsBackend { async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> { let target_path = build_rooted_abs_path(&self.root, path); + let mut initial_size = 0; let target_exists = match self.client.metadata(&target_path) { - Ok(_) => true, + Ok(meta) => { + initial_size = meta.len(); + true + } Err(err) => { if err.kind() != io::ErrorKind::NotFound { return Err(new_std_io_error(err)); @@ -323,6 +327,9 @@ impl Access for HdfsBackend { let parent = get_parent(&target_path); self.client.create_dir(parent).map_err(new_std_io_error)?; } + if !should_append { + initial_size = 0; + } let mut open_options = self.client.open_file(); open_options.create(true); @@ -345,6 +352,7 @@ impl Access for HdfsBackend { f, Arc::clone(&self.client), target_exists, + initial_size, ), )) } @@ -474,8 +482,12 @@ impl Access for HdfsBackend { fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { let target_path = build_rooted_abs_path(&self.root, path); + let mut initial_size = 0; let target_exists = match self.client.metadata(&target_path) { - Ok(_) => true, + Ok(meta) => { + initial_size = meta.len(); + true + } Err(err) => { if err.kind() != io::ErrorKind::NotFound { return Err(new_std_io_error(err)); @@ -498,6 +510,9 @@ impl Access for HdfsBackend { let parent = get_parent(&target_path); self.client.create_dir(parent).map_err(new_std_io_error)?; } + if !should_append { + initial_size = 0; + } let mut open_options = self.client.open_file(); open_options.create(true); @@ -519,6 +534,7 @@ impl Access for HdfsBackend { f, Arc::clone(&self.client), target_exists, + initial_size, ), )) } diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs index 6fde07db0c3..463e93e30c7 100644 --- a/core/src/services/hdfs/writer.rs +++ b/core/src/services/hdfs/writer.rs @@ -30,7 +30,7 @@ pub struct HdfsWriter { f: Option, client: Arc, target_path_exists: bool, - write_bytes_count: u64, + size: u64, } /// # Safety @@ -45,6 +45,7 @@ impl HdfsWriter { f: F, client: Arc, target_path_exists: bool, + initial_size: u64, ) -> Self { Self { target_path, @@ -52,14 +53,14 @@ impl HdfsWriter { f: Some(f), client, target_path_exists, - write_bytes_count: 0, + size: initial_size, } } } impl oio::Write for HdfsWriter { async fn write(&mut self, mut bs: Buffer) -> Result<()> { - self.write_bytes_count += bs.len() as u64; + self.size += bs.len() as u64; let f = self.f.as_mut().expect("HdfsWriter must be initialized"); while bs.has_remaining() { @@ -87,7 +88,7 @@ impl oio::Write for HdfsWriter { .map_err(new_std_io_error)? } - Ok(Metadata::default().with_content_length(self.write_bytes_count)) + Ok(Metadata::default().with_content_length(self.size)) } async fn abort(&mut self) -> Result<()> { @@ -100,7 +101,7 @@ impl oio::Write for HdfsWriter { impl oio::BlockingWrite for HdfsWriter { fn write(&mut self, mut bs: Buffer) -> Result<()> { - self.write_bytes_count += bs.len() as u64; + self.size += bs.len() as u64; let f = self.f.as_mut().expect("HdfsWriter must be initialized"); while bs.has_remaining() { @@ -127,6 +128,6 @@ impl oio::BlockingWrite for HdfsWriter { .map_err(new_std_io_error)?; } - Ok(Metadata::default().with_content_length(self.write_bytes_count)) + Ok(Metadata::default().with_content_length(self.size)) } }