Skip to content

Commit

Permalink
set write_has_content_length as true by default && implement the writ…
Browse files Browse the repository at this point in the history
…e_returns_metadata logic for fs
  • Loading branch information
meteorgan committed Jan 26, 2025
1 parent f2a1331 commit a1e0b0a
Show file tree
Hide file tree
Showing 30 changed files with 240 additions and 153 deletions.
2 changes: 2 additions & 0 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,8 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
if cap.list && cap.write_can_empty {
cap.create_dir = true;
}
// write operations should always return content length
cap.write_has_content_length = true;
meta.into()
}

Expand Down
9 changes: 7 additions & 2 deletions core/src/raw/adapters/kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,11 @@ impl<S: Adapter> oio::Write for KvWriter<S> {

async fn close(&mut self) -> Result<Metadata> {
let buf = self.buffer.clone().collect();
let length = buf.len() as u64;
self.kv.set(&self.path, buf).await?;

Ok(Metadata::default())
let meta = Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length);
Ok(meta)
}

async fn abort(&mut self) -> Result<()> {
Expand All @@ -318,8 +320,11 @@ impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {

fn close(&mut self) -> Result<Metadata> {
let buf = self.buffer.clone().collect();
let length = buf.len() as u64;
self.kv.blocking_set(&self.path, buf)?;
Ok(Metadata::default())

let meta = Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length);
Ok(meta)
}
}

Expand Down
6 changes: 4 additions & 2 deletions core/src/raw/adapters/typed_kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,10 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
value
}
};
let meta = value.metadata.clone();
self.kv.set(&self.path, value).await?;

Ok(Metadata::default())
Ok(meta)
}

async fn abort(&mut self) -> Result<()> {
Expand Down Expand Up @@ -330,8 +331,9 @@ impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
}
};

let meta = value.metadata.clone();
kv.blocking_set(&self.path, value)?;
Ok(Metadata::default())
Ok(meta)
}
}

Expand Down
11 changes: 8 additions & 3 deletions core/src/raw/oio/write/append_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub trait AppendWrite: Send + Sync + Unpin + 'static {
offset: u64,
size: u64,
body: Buffer,
) -> impl Future<Output = Result<()>> + MaybeSend;
) -> impl Future<Output = Result<Metadata>> + MaybeSend;
}

/// AppendWriter will implements [`oio::Write`] based on append object.
Expand All @@ -60,6 +60,8 @@ pub struct AppendWriter<W: AppendWrite> {
inner: W,

offset: Option<u64>,

meta: Metadata,
}

/// # Safety
Expand All @@ -71,6 +73,7 @@ impl<W: AppendWrite> AppendWriter<W> {
Self {
inner,
offset: None,
meta: Metadata::default(),
}
}
}
Expand All @@ -90,14 +93,16 @@ where
};

let size = bs.len();
self.inner.append(offset, size as u64, bs).await?;
self.meta = self.inner.append(offset, size as u64, bs).await?;
// Update offset after succeed.
self.offset = Some(offset + size as u64);
Ok(())
}

async fn close(&mut self) -> Result<Metadata> {
Ok(Metadata::default())
self.meta
.set_content_length(self.offset.unwrap_or_default());
Ok(self.meta.clone())
}

async fn abort(&mut self) -> Result<()> {
Expand Down
11 changes: 9 additions & 2 deletions core/src/raw/oio/write/block_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ pub struct BlockWriter<W: BlockWrite> {
block_ids: Vec<Uuid>,
cache: Option<Buffer>,
tasks: ConcurrentTasks<WriteInput<W>, Uuid>,

write_bytes_count: u64,
}

impl<W: BlockWrite> BlockWriter<W> {
Expand Down Expand Up @@ -156,6 +158,7 @@ impl<W: BlockWrite> BlockWriter<W> {
}
})
}),
write_bytes_count: 0,
}
}

Expand All @@ -172,6 +175,8 @@ where
W: BlockWrite,
{
async fn write(&mut self, bs: Buffer) -> Result<()> {
self.write_bytes_count += bs.len() as u64;

if !self.started && self.cache.is_none() {
self.fill_cache(bs);
return Ok(());
Expand Down Expand Up @@ -202,7 +207,8 @@ where
};

self.cache = None;
return self.w.write_once(size as u64, body).await;
let meta = self.w.write_once(size as u64, body).await?;
return Ok(meta.with_content_length(self.write_bytes_count));
}

if let Some(cache) = self.cache.clone() {
Expand All @@ -225,7 +231,8 @@ where
}

let block_ids = self.block_ids.clone();
self.w.complete_block(block_ids).await
let meta = self.w.complete_block(block_ids).await?;
Ok(meta.with_content_length(self.write_bytes_count))
}

async fn abort(&mut self) -> Result<()> {
Expand Down
16 changes: 12 additions & 4 deletions core/src/raw/oio/write/multipart_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ pub struct MultipartWriter<W: MultipartWrite> {
next_part_number: usize,

tasks: ConcurrentTasks<WriteInput<W>, MultipartPart>,

write_bytes_count: u64,
}

/// # Safety
Expand Down Expand Up @@ -191,6 +193,7 @@ impl<W: MultipartWrite> MultipartWriter<W> {
}
})
}),
write_bytes_count: 0,
}
}

Expand All @@ -207,6 +210,7 @@ where
W: MultipartWrite,
{
async fn write(&mut self, bs: Buffer) -> Result<()> {
self.write_bytes_count += bs.len() as u64;
let upload_id = match self.upload_id.clone() {
Some(v) => v,
None => {
Expand Down Expand Up @@ -252,7 +256,9 @@ where

self.cache = None;
// Call write_once if there is no upload_id.
return self.w.write_once(size as u64, body).await;
let mut meta = self.w.write_once(size as u64, body).await?;
meta.set_content_length(self.write_bytes_count);
return Ok(meta);
}
};

Expand Down Expand Up @@ -288,7 +294,9 @@ where
.with_context("actual", self.parts.len())
.with_context("upload_id", upload_id));
}
self.w.complete_part(&upload_id, &self.parts).await
let mut meta = self.w.complete_part(&upload_id, &self.parts).await?;
meta.set_content_length(self.write_bytes_count);
Ok(meta)
}

async fn abort(&mut self) -> Result<()> {
Expand Down Expand Up @@ -339,7 +347,7 @@ mod tests {
impl MultipartWrite for Arc<Mutex<TestWrite>> {
async fn write_once(&self, size: u64, _: Buffer) -> Result<Metadata> {
self.lock().await.length += size;
Ok(Metadata::default())
Ok(Metadata::default().with_content_length(size))
}

async fn initiate_part(&self) -> Result<String> {
Expand Down Expand Up @@ -391,7 +399,7 @@ mod tests {
assert_eq!(upload_id, test.upload_id);
assert_eq!(parts.len(), test.part_numbers.len());

Ok(Metadata::default())
Ok(Metadata::default().with_content_length(test.length))
}

async fn abort_part(&self, upload_id: &str) -> Result<()> {
Expand Down
12 changes: 9 additions & 3 deletions core/src/raw/oio/write/one_shot_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,16 @@ impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
}

async fn close(&mut self) -> Result<Metadata> {
match self.buffer.clone() {
Some(bs) => self.inner.write_once(bs).await,
let mut length = 0;
let meta = match self.buffer.clone() {
Some(bs) => {
length = bs.len();
self.inner.write_once(bs).await
}
None => self.inner.write_once(Buffer::new()).await,
}
}?;

Ok(meta.with_content_length(length as u64))
}

async fn abort(&mut self) -> Result<()> {
Expand Down
4 changes: 4 additions & 0 deletions core/src/services/aliyun_drive/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ pub struct AliyunDriveWriter {
file_id: Option<String>,
upload_id: Option<String>,
part_number: usize,

write_bytes_count: u64,
}

impl AliyunDriveWriter {
Expand All @@ -49,12 +51,14 @@ impl AliyunDriveWriter {
file_id: None,
upload_id: None,
part_number: 1, // must start from 1
write_bytes_count: 0,
}
}
}

impl oio::Write for AliyunDriveWriter {
async fn write(&mut self, bs: Buffer) -> Result<()> {
self.write_bytes_count += bs.len() as u64;
let (upload_id, file_id) = match (self.upload_id.as_ref(), self.file_id.as_ref()) {
(Some(upload_id), Some(file_id)) => (upload_id, file_id),
_ => {
Expand Down
9 changes: 7 additions & 2 deletions core/src/services/alluxio/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub struct AlluxioWriter {
_op: OpWrite,
path: String,
stream_id: Option<u64>,

write_bytes_count: u64,
}

impl AlluxioWriter {
Expand All @@ -38,12 +40,15 @@ impl AlluxioWriter {
_op,
path,
stream_id: None,
write_bytes_count: 0,
}
}
}

impl oio::Write for AlluxioWriter {
async fn write(&mut self, bs: Buffer) -> Result<()> {
self.write_bytes_count += bs.len() as u64;

let stream_id = match self.stream_id {
Some(stream_id) => stream_id,
None => {
Expand All @@ -58,11 +63,11 @@ impl oio::Write for AlluxioWriter {

async fn close(&mut self) -> Result<Metadata> {
let Some(stream_id) = self.stream_id else {
return Ok(Metadata::default());
return Ok(Metadata::default().with_content_length(self.write_bytes_count));
};
self.core.close(stream_id).await?;

Ok(Metadata::default())
Ok(Metadata::default().with_content_length(self.write_bytes_count))
}

async fn abort(&mut self) -> Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions core/src/services/azblob/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl oio::AppendWrite for AzblobWriter {
}
}

async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<()> {
async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
let mut req = self
.core
.azblob_append_blob_request(&self.path, offset, size, body)?;
Expand All @@ -99,7 +99,7 @@ impl oio::AppendWrite for AzblobWriter {

let status = resp.status();
match status {
StatusCode::CREATED => Ok(()),
StatusCode::CREATED => Ok(Metadata::default()),
_ => Err(parse_error(resp)),
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/services/azdls/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl oio::AppendWrite for AzdlsWriter {
}
}

async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<()> {
async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
if offset == 0 {
let mut req =
self.core
Expand Down Expand Up @@ -116,7 +116,7 @@ impl oio::AppendWrite for AzdlsWriter {

let status = resp.status();
match status {
StatusCode::OK | StatusCode::ACCEPTED => Ok(()),
StatusCode::OK | StatusCode::ACCEPTED => Ok(Metadata::default()),
_ => Err(parse_error(resp).with_operation("Backend::azdls_update_request")),
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/services/azfile/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,15 @@ impl oio::AppendWrite for AzfileWriter {
}
}

async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<()> {
async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<Metadata> {
let resp = self
.core
.azfile_update(&self.path, size, offset, body)
.await?;

let status = resp.status();
match status {
StatusCode::OK | StatusCode::CREATED => Ok(()),
StatusCode::OK | StatusCode::CREATED => Ok(Metadata::default()),
_ => Err(parse_error(resp).with_operation("Backend::azfile_update")),
}
}
Expand Down
12 changes: 10 additions & 2 deletions core/src/services/compfs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,17 @@ use crate::*;
pub struct CompfsWriter {
core: Arc<CompfsCore>,
file: Cursor<File>,

write_bytes_count: u64,
}

impl CompfsWriter {
pub(super) fn new(core: Arc<CompfsCore>, file: Cursor<File>) -> Self {
Self { core, file }
Self {
core,
file,
write_bytes_count: 0,
}
}
}

Expand All @@ -45,6 +51,8 @@ impl oio::Write for CompfsWriter {
///
/// The IoBuf::buf_len() only returns the length of the current buffer.
async fn write(&mut self, bs: Buffer) -> Result<()> {
self.write_bytes_count += bs.len() as u64;

let mut file = self.file.clone();

self.core
Expand All @@ -69,7 +77,7 @@ impl oio::Write for CompfsWriter {
.exec(move || async move { f.into_inner().close().await })
.await?;

Ok(Metadata::default())
Ok(Metadata::default().with_content_length(self.write_bytes_count))
}

async fn abort(&mut self) -> Result<()> {
Expand Down
Loading

0 comments on commit a1e0b0a

Please sign in to comment.