Skip to content

Commit

Permalink
feat: migrate service dashmap (#2225)
Browse files Browse the repository at this point in the history
* feat(typed_kv): add scan support for Typed KV Adapter

Signed-off-by: Chojan Shang <[email protected]>

* feat(services/dashmap): migrate service dashmap

Signed-off-by: Chojan Shang <[email protected]>

* fix(services/dashmap): return inner

Signed-off-by: Chojan Shang <[email protected]>

* feat(services/moka): try to enable scan

Signed-off-by: Chojan Shang <[email protected]>

* refactor(typed_kv): scan is not available for all typed kv services

Signed-off-by: Chojan Shang <[email protected]>

* feat(typed_kv): also add KvPager

Signed-off-by: Chojan Shang <[email protected]>

* feat(typed_kv): add typed_kv::Info to carry capability check back

Signed-off-by: Chojan Shang <[email protected]>

* feat(typed_kv): add typed_kv::Capability

Signed-off-by: Chojan Shang <[email protected]>

---------

Signed-off-by: Chojan Shang <[email protected]>
  • Loading branch information
PsiACE authored May 8, 2023
1 parent e4d453b commit 18c9e03
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 42 deletions.
100 changes: 98 additions & 2 deletions core/src/raw/adapters/typed_kv/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ use async_trait::async_trait;
use bytes::Bytes;
use chrono::Utc;

use crate::*;
use crate::EntryMode;
use crate::Error;
use crate::ErrorKind;
use crate::Metadata;
use crate::Result;
use crate::Scheme;

/// Adapter is the typed adapter to underlying kv services.
///
Expand All @@ -39,7 +44,7 @@ use crate::*;
#[async_trait]
pub trait Adapter: Send + Sync + Debug + Unpin + 'static {
/// Get the scheme and name of current adapter.
fn metadata(&self) -> (Scheme, String);
fn info(&self) -> Info;

/// Get a value from adapter.
async fn get(&self, path: &str) -> Result<Option<Value>>;
Expand All @@ -58,6 +63,29 @@ pub trait Adapter: Send + Sync + Debug + Unpin + 'static {

/// Delete a value from adapter.
fn blocking_delete(&self, path: &str) -> Result<()>;

/// Scan a key prefix to get all keys that start with this key.
async fn scan(&self, path: &str) -> Result<Vec<String>> {
let _ = path;

Err(Error::new(
ErrorKind::Unsupported,
"typed_kv adapter doesn't support this operation",
)
.with_operation("typed_kv::Adapter::scan"))
}

/// Scan a key prefix to get all keys that start with this key
/// in blocking way.
fn blocking_scan(&self, path: &str) -> Result<Vec<String>> {
let _ = path;

Err(Error::new(
ErrorKind::Unsupported,
"typed_kv adapter doesn't support this operation",
)
.with_operation("typed_kv::Adapter::blocking_scan"))
}
}

/// Value is the typed value stored in adapter.
Expand Down Expand Up @@ -87,3 +115,71 @@ impl Value {
size_of::<Metadata>() + self.value.len()
}
}

/// Capability is used to describe what operations are supported
/// by Typed KV Operator.
#[derive(Copy, Clone, Default)]
pub struct Capability {
/// If typed_kv operator supports get natively, it will be true.
pub get: bool,
/// If typed_kv operator supports set natively, it will be true.
pub set: bool,
/// If typed_kv operator supports delete natively, it will be true.
pub delete: bool,
/// If typed_kv operator supports scan natively, it will be true.
pub scan: bool,
}

impl Debug for Capability {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut s = vec![];

if self.get {
s.push("Get")
}
if self.set {
s.push("Set");
}
if self.delete {
s.push("Delete");
}
if self.scan {
s.push("Scan");
}

write!(f, "{{ {} }}", s.join(" | "))
}
}

/// Info for this key value accessor.
pub struct Info {
scheme: Scheme,
name: String,
capabilities: Capability,
}

impl Info {
/// Create a new KeyValueAccessorInfo.
pub fn new(scheme: Scheme, name: &str, capabilities: Capability) -> Self {
Self {
scheme,
name: name.to_string(),
capabilities,
}
}

/// Get the scheme.
pub fn scheme(&self) -> Scheme {
self.scheme
}

/// Get the name.
pub fn name(&self) -> &str {
&self.name
}

/// Get the capabilities.
pub fn capabilities(&self) -> Capability {
self.capabilities
}
}
110 changes: 89 additions & 21 deletions core/src/raw/adapters/typed_kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,31 +59,37 @@ impl<S: Adapter> Accessor for Backend<S> {
type BlockingReader = oio::Cursor;
type Writer = KvWriter<S>;
type BlockingWriter = KvWriter<S>;
type Pager = ();
type BlockingPager = ();
type Pager = KvPager;
type BlockingPager = KvPager;

fn info(&self) -> AccessorInfo {
let (scheme, name) = self.kv.metadata();

let mut am = AccessorInfo::default();
am.set_scheme(scheme);
am.set_name(&name);
let kv_info = self.kv.info();
let mut am: AccessorInfo = AccessorInfo::default();
am.set_root(&self.root);

am.set_scheme(kv_info.scheme());
am.set_name(kv_info.name());
let kv_cap = kv_info.capabilities();
let cap = am.capability_mut();
cap.read = true;
cap.read_can_seek = true;
cap.read_can_next = true;
cap.read_with_range = true;
cap.stat = true;

cap.write = true;
cap.write_with_cache_control = true;
cap.write_with_content_disposition = true;
cap.write_with_content_type = true;
cap.write_without_content_length = true;
cap.create_dir = true;
cap.delete = true;
if kv_cap.get {
cap.read = true;
cap.read_can_seek = true;
cap.read_can_next = true;
cap.read_with_range = true;
cap.stat = true;
}

if kv_cap.set {
cap.write = true;
cap.create_dir = true;
}

if kv_cap.delete {
cap.delete = true;
}

if kv_cap.scan {
cap.scan = true;
}

am
}
Expand Down Expand Up @@ -182,6 +188,22 @@ impl<S: Adapter> Accessor for Backend<S> {
self.kv.blocking_delete(&p)?;
Ok(RpDelete::default())
}

async fn scan(&self, path: &str, _: OpScan) -> Result<(RpScan, Self::Pager)> {
let p = build_abs_path(&self.root, path);
let res = self.kv.scan(&p).await?;
let pager = KvPager::new(&self.root, res);

Ok((RpScan::default(), pager))
}

fn blocking_scan(&self, path: &str, _: OpScan) -> Result<(RpScan, Self::BlockingPager)> {
let p = build_abs_path(&self.root, path);
let res = self.kv.blocking_scan(&p)?;
let pager = KvPager::new(&self.root, res);

Ok((RpScan::default(), pager))
}
}

impl<S> Backend<S>
Expand All @@ -204,6 +226,52 @@ where
}
}

pub struct KvPager {
root: String,
inner: Option<Vec<String>>,
}

impl KvPager {
fn new(root: &str, inner: Vec<String>) -> Self {
Self {
root: root.to_string(),
inner: Some(inner),
}
}

fn inner_next_page(&mut self) -> Option<Vec<oio::Entry>> {
let res = self
.inner
.take()?
.into_iter()
.map(|v| {
let mode = if v.ends_with('/') {
EntryMode::DIR
} else {
EntryMode::FILE
};

oio::Entry::new(&build_rel_path(&self.root, &v), Metadata::new(mode))
})
.collect();

Some(res)
}
}

#[async_trait]
impl oio::Page for KvPager {
async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
Ok(self.inner_next_page())
}
}

impl oio::BlockingPage for KvPager {
fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
Ok(self.inner_next_page())
}
}

pub struct KvWriter<S> {
kv: Arc<S>,
path: String,
Expand Down
2 changes: 2 additions & 0 deletions core/src/raw/adapters/typed_kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
mod api;
pub use api::Adapter;
pub use api::Capability;
pub use api::Info;
pub use api::Value;

mod backend;
Expand Down
32 changes: 16 additions & 16 deletions core/src/services/dashmap/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::fmt::Debug;
use async_trait::async_trait;
use dashmap::DashMap;

use crate::raw::adapters::kv;
use crate::raw::adapters::typed_kv;
use crate::*;

/// [dashmap](https://github.com/xacrimon/dashmap) backend support.
Expand Down Expand Up @@ -70,45 +70,45 @@ impl Builder for DashmapBuilder {
}

/// Backend is used to serve `Accessor` support in dashmap.
pub type DashmapBackend = kv::Backend<Adapter>;
pub type DashmapBackend = typed_kv::Backend<Adapter>;

#[derive(Debug, Clone)]
pub struct Adapter {
inner: DashMap<String, Vec<u8>>,
inner: DashMap<String, typed_kv::Value>,
}

#[async_trait]
impl kv::Adapter for Adapter {
fn metadata(&self) -> kv::Metadata {
kv::Metadata::new(
impl typed_kv::Adapter for Adapter {
fn info(&self) -> typed_kv::Info {
typed_kv::Info::new(
Scheme::Dashmap,
&format!("{:?}", &self.inner as *const _),
Capability {
read: true,
write: true,
typed_kv::Capability {
get: true,
set: true,
scan: true,
..Default::default()
delete: true,
},
)
}

async fn get(&self, path: &str) -> Result<Option<Vec<u8>>> {
async fn get(&self, path: &str) -> Result<Option<typed_kv::Value>> {
self.blocking_get(path)
}

fn blocking_get(&self, path: &str) -> Result<Option<Vec<u8>>> {
fn blocking_get(&self, path: &str) -> Result<Option<typed_kv::Value>> {
match self.inner.get(path) {
None => Ok(None),
Some(bs) => Ok(Some(bs.to_vec())),
Some(bs) => Ok(Some(bs.value().to_owned())),
}
}

async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
async fn set(&self, path: &str, value: typed_kv::Value) -> Result<()> {
self.blocking_set(path, value)
}

fn blocking_set(&self, path: &str, value: &[u8]) -> Result<()> {
self.inner.insert(path.to_string(), value.to_vec());
fn blocking_set(&self, path: &str, value: typed_kv::Value) -> Result<()> {
self.inner.insert(path.to_string(), value);

Ok(())
}
Expand Down
12 changes: 9 additions & 3 deletions core/src/services/moka/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,16 @@ impl Debug for Adapter {

#[async_trait]
impl typed_kv::Adapter for Adapter {
fn metadata(&self) -> (Scheme, String) {
(
fn info(&self) -> typed_kv::Info {
typed_kv::Info::new(
Scheme::Moka,
self.inner.name().unwrap_or("moka").to_string(),
self.inner.name().unwrap_or("moka"),
typed_kv::Capability {
get: true,
set: true,
delete: true,
..Default::default()
},
)
}

Expand Down

0 comments on commit 18c9e03

Please sign in to comment.