From 18c9e03a2b61f12572043804b37ef1838341e035 Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Mon, 8 May 2023 11:19:07 +0800 Subject: [PATCH] feat: migrate service dashmap (#2225) * feat(typed_kv): add scan support for Typed KV Adapter Signed-off-by: Chojan Shang * feat(services/dashmap): migrate service dashmap Signed-off-by: Chojan Shang * fix(services/dashmap): return inner Signed-off-by: Chojan Shang * feat(services/moka): try to enable scan Signed-off-by: Chojan Shang * refactor(typed_kv): scan is not available for all typed kv services Signed-off-by: Chojan Shang * feat(typed_kv): also add KvPager Signed-off-by: Chojan Shang * feat(typed_kv): add typed_kv::Info to carry capability check back Signed-off-by: Chojan Shang * feat(typed_kv): add typed_kv::Capability Signed-off-by: Chojan Shang --------- Signed-off-by: Chojan Shang --- core/src/raw/adapters/typed_kv/api.rs | 100 +++++++++++++++++++- core/src/raw/adapters/typed_kv/backend.rs | 110 +++++++++++++++++----- core/src/raw/adapters/typed_kv/mod.rs | 2 + core/src/services/dashmap/backend.rs | 32 +++---- core/src/services/moka/backend.rs | 12 ++- 5 files changed, 214 insertions(+), 42 deletions(-) diff --git a/core/src/raw/adapters/typed_kv/api.rs b/core/src/raw/adapters/typed_kv/api.rs index 1f2008c4f405..4f81589fd6bc 100644 --- a/core/src/raw/adapters/typed_kv/api.rs +++ b/core/src/raw/adapters/typed_kv/api.rs @@ -21,7 +21,12 @@ use async_trait::async_trait; use bytes::Bytes; use chrono::Utc; -use crate::*; +use crate::EntryMode; +use crate::Error; +use crate::ErrorKind; +use crate::Metadata; +use crate::Result; +use crate::Scheme; /// Adapter is the typed adapter to underlying kv services. /// @@ -39,7 +44,7 @@ use crate::*; #[async_trait] pub trait Adapter: Send + Sync + Debug + Unpin + 'static { /// Get the scheme and name of current adapter. - fn metadata(&self) -> (Scheme, String); + fn info(&self) -> Info; /// Get a value from adapter. async fn get(&self, path: &str) -> Result>; @@ -58,6 +63,29 @@ pub trait Adapter: Send + Sync + Debug + Unpin + 'static { /// Delete a value from adapter. fn blocking_delete(&self, path: &str) -> Result<()>; + + /// Scan a key prefix to get all keys that start with this key. + async fn scan(&self, path: &str) -> Result> { + let _ = path; + + Err(Error::new( + ErrorKind::Unsupported, + "typed_kv adapter doesn't support this operation", + ) + .with_operation("typed_kv::Adapter::scan")) + } + + /// Scan a key prefix to get all keys that start with this key + /// in blocking way. + fn blocking_scan(&self, path: &str) -> Result> { + let _ = path; + + Err(Error::new( + ErrorKind::Unsupported, + "typed_kv adapter doesn't support this operation", + ) + .with_operation("typed_kv::Adapter::blocking_scan")) + } } /// Value is the typed value stored in adapter. @@ -87,3 +115,71 @@ impl Value { size_of::() + self.value.len() } } + +/// Capability is used to describe what operations are supported +/// by Typed KV Operator. +#[derive(Copy, Clone, Default)] +pub struct Capability { + /// If typed_kv operator supports get natively, it will be true. + pub get: bool, + /// If typed_kv operator supports set natively, it will be true. + pub set: bool, + /// If typed_kv operator supports delete natively, it will be true. + pub delete: bool, + /// If typed_kv operator supports scan natively, it will be true. + pub scan: bool, +} + +impl Debug for Capability { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut s = vec![]; + + if self.get { + s.push("Get") + } + if self.set { + s.push("Set"); + } + if self.delete { + s.push("Delete"); + } + if self.scan { + s.push("Scan"); + } + + write!(f, "{{ {} }}", s.join(" | ")) + } +} + +/// Info for this key value accessor. +pub struct Info { + scheme: Scheme, + name: String, + capabilities: Capability, +} + +impl Info { + /// Create a new KeyValueAccessorInfo. + pub fn new(scheme: Scheme, name: &str, capabilities: Capability) -> Self { + Self { + scheme, + name: name.to_string(), + capabilities, + } + } + + /// Get the scheme. + pub fn scheme(&self) -> Scheme { + self.scheme + } + + /// Get the name. + pub fn name(&self) -> &str { + &self.name + } + + /// Get the capabilities. + pub fn capabilities(&self) -> Capability { + self.capabilities + } +} diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index e7a85dbed27a..1e6afb067e6a 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -59,31 +59,37 @@ impl Accessor for Backend { type BlockingReader = oio::Cursor; type Writer = KvWriter; type BlockingWriter = KvWriter; - type Pager = (); - type BlockingPager = (); + type Pager = KvPager; + type BlockingPager = KvPager; fn info(&self) -> AccessorInfo { - let (scheme, name) = self.kv.metadata(); - - let mut am = AccessorInfo::default(); - am.set_scheme(scheme); - am.set_name(&name); + let kv_info = self.kv.info(); + let mut am: AccessorInfo = AccessorInfo::default(); am.set_root(&self.root); - + am.set_scheme(kv_info.scheme()); + am.set_name(kv_info.name()); + let kv_cap = kv_info.capabilities(); let cap = am.capability_mut(); - cap.read = true; - cap.read_can_seek = true; - cap.read_can_next = true; - cap.read_with_range = true; - cap.stat = true; - - cap.write = true; - cap.write_with_cache_control = true; - cap.write_with_content_disposition = true; - cap.write_with_content_type = true; - cap.write_without_content_length = true; - cap.create_dir = true; - cap.delete = true; + if kv_cap.get { + cap.read = true; + cap.read_can_seek = true; + cap.read_can_next = true; + cap.read_with_range = true; + cap.stat = true; + } + + if kv_cap.set { + cap.write = true; + cap.create_dir = true; + } + + if kv_cap.delete { + cap.delete = true; + } + + if kv_cap.scan { + cap.scan = true; + } am } @@ -182,6 +188,22 @@ impl Accessor for Backend { self.kv.blocking_delete(&p)?; Ok(RpDelete::default()) } + + async fn scan(&self, path: &str, _: OpScan) -> Result<(RpScan, Self::Pager)> { + let p = build_abs_path(&self.root, path); + let res = self.kv.scan(&p).await?; + let pager = KvPager::new(&self.root, res); + + Ok((RpScan::default(), pager)) + } + + fn blocking_scan(&self, path: &str, _: OpScan) -> Result<(RpScan, Self::BlockingPager)> { + let p = build_abs_path(&self.root, path); + let res = self.kv.blocking_scan(&p)?; + let pager = KvPager::new(&self.root, res); + + Ok((RpScan::default(), pager)) + } } impl Backend @@ -204,6 +226,52 @@ where } } +pub struct KvPager { + root: String, + inner: Option>, +} + +impl KvPager { + fn new(root: &str, inner: Vec) -> Self { + Self { + root: root.to_string(), + inner: Some(inner), + } + } + + fn inner_next_page(&mut self) -> Option> { + let res = self + .inner + .take()? + .into_iter() + .map(|v| { + let mode = if v.ends_with('/') { + EntryMode::DIR + } else { + EntryMode::FILE + }; + + oio::Entry::new(&build_rel_path(&self.root, &v), Metadata::new(mode)) + }) + .collect(); + + Some(res) + } +} + +#[async_trait] +impl oio::Page for KvPager { + async fn next(&mut self) -> Result>> { + Ok(self.inner_next_page()) + } +} + +impl oio::BlockingPage for KvPager { + fn next(&mut self) -> Result>> { + Ok(self.inner_next_page()) + } +} + pub struct KvWriter { kv: Arc, path: String, diff --git a/core/src/raw/adapters/typed_kv/mod.rs b/core/src/raw/adapters/typed_kv/mod.rs index 12a4e599411f..567ecddac61c 100644 --- a/core/src/raw/adapters/typed_kv/mod.rs +++ b/core/src/raw/adapters/typed_kv/mod.rs @@ -21,6 +21,8 @@ mod api; pub use api::Adapter; +pub use api::Capability; +pub use api::Info; pub use api::Value; mod backend; diff --git a/core/src/services/dashmap/backend.rs b/core/src/services/dashmap/backend.rs index dee62c02438f..0411dde70cd5 100644 --- a/core/src/services/dashmap/backend.rs +++ b/core/src/services/dashmap/backend.rs @@ -21,7 +21,7 @@ use std::fmt::Debug; use async_trait::async_trait; use dashmap::DashMap; -use crate::raw::adapters::kv; +use crate::raw::adapters::typed_kv; use crate::*; /// [dashmap](https://github.com/xacrimon/dashmap) backend support. @@ -70,45 +70,45 @@ impl Builder for DashmapBuilder { } /// Backend is used to serve `Accessor` support in dashmap. -pub type DashmapBackend = kv::Backend; +pub type DashmapBackend = typed_kv::Backend; #[derive(Debug, Clone)] pub struct Adapter { - inner: DashMap>, + inner: DashMap, } #[async_trait] -impl kv::Adapter for Adapter { - fn metadata(&self) -> kv::Metadata { - kv::Metadata::new( +impl typed_kv::Adapter for Adapter { + fn info(&self) -> typed_kv::Info { + typed_kv::Info::new( Scheme::Dashmap, &format!("{:?}", &self.inner as *const _), - Capability { - read: true, - write: true, + typed_kv::Capability { + get: true, + set: true, scan: true, - ..Default::default() + delete: true, }, ) } - async fn get(&self, path: &str) -> Result>> { + async fn get(&self, path: &str) -> Result> { self.blocking_get(path) } - fn blocking_get(&self, path: &str) -> Result>> { + fn blocking_get(&self, path: &str) -> Result> { match self.inner.get(path) { None => Ok(None), - Some(bs) => Ok(Some(bs.to_vec())), + Some(bs) => Ok(Some(bs.value().to_owned())), } } - async fn set(&self, path: &str, value: &[u8]) -> Result<()> { + async fn set(&self, path: &str, value: typed_kv::Value) -> Result<()> { self.blocking_set(path, value) } - fn blocking_set(&self, path: &str, value: &[u8]) -> Result<()> { - self.inner.insert(path.to_string(), value.to_vec()); + fn blocking_set(&self, path: &str, value: typed_kv::Value) -> Result<()> { + self.inner.insert(path.to_string(), value); Ok(()) } diff --git a/core/src/services/moka/backend.rs b/core/src/services/moka/backend.rs index f2dd7eee0c8c..ee40bc9234c5 100644 --- a/core/src/services/moka/backend.rs +++ b/core/src/services/moka/backend.rs @@ -195,10 +195,16 @@ impl Debug for Adapter { #[async_trait] impl typed_kv::Adapter for Adapter { - fn metadata(&self) -> (Scheme, String) { - ( + fn info(&self) -> typed_kv::Info { + typed_kv::Info::new( Scheme::Moka, - self.inner.name().unwrap_or("moka").to_string(), + self.inner.name().unwrap_or("moka"), + typed_kv::Capability { + get: true, + set: true, + delete: true, + ..Default::default() + }, ) }