From 7f7cbec215f072d818d0b01158c3a43578e67a57 Mon Sep 17 00:00:00 2001 From: suyanhanx Date: Fri, 30 Jun 2023 14:28:23 +0800 Subject: [PATCH] feat(services/gcs): support sink Signed-off-by: suyanhanx --- core/src/services/gcs/backend.rs | 1 + core/src/services/gcs/core.rs | 2 +- core/src/services/gcs/writer.rs | 17 ++++++++--------- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs index e9436359f74c..20e605099fbf 100644 --- a/core/src/services/gcs/backend.rs +++ b/core/src/services/gcs/backend.rs @@ -418,6 +418,7 @@ impl Accessor for GcsBackend { read_with_if_none_match: true, write: true, + write_can_sink: true, write_with_content_type: true, write_without_content_length: true, delete: true, diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs index e29402db7396..35cf401eef0a 100644 --- a/core/src/services/gcs/core.rs +++ b/core/src/services/gcs/core.rs @@ -207,7 +207,7 @@ impl GcsCore { pub fn gcs_insert_object_request( &self, path: &str, - size: Option, + size: Option, content_type: Option<&str>, body: AsyncBody, ) -> Result> { diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs index 858c65d932b6..236016294396 100644 --- a/core/src/services/gcs/writer.rs +++ b/core/src/services/gcs/writer.rs @@ -52,12 +52,12 @@ impl GcsWriter { } } - async fn write_oneshot(&self, bs: Bytes) -> Result<()> { + async fn write_oneshot(&self, size: u64, body: AsyncBody) -> Result<()> { let mut req = self.core.gcs_insert_object_request( &percent_encode_path(&self.path), - Some(bs.len()), + Some(size), self.op.content_type(), - AsyncBody::Bytes(bs), + body, )?; self.core.sign(&mut req).await?; @@ -125,7 +125,9 @@ impl oio::Write for GcsWriter { if self.op.content_length().unwrap_or_default() == bs.len() as u64 && self.written == 0 { - return self.write_oneshot(bs).await; + return self + .write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs)) + .await; } else { let location = self.initiate_upload().await?; self.location = Some(location); @@ -162,11 +164,8 @@ impl oio::Write for GcsWriter { } } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { - Err(Error::new( - ErrorKind::Unsupported, - "Write::sink is not supported", - )) + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + self.write_oneshot(size, AsyncBody::Stream(s)).await } async fn abort(&mut self) -> Result<()> {