From 2bc5a619da18e47b0d92c0ebcf1aa8e4104221b1 Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Sat, 8 Jul 2023 11:40:52 +0800 Subject: [PATCH 1/9] feat(services/persy): add a basic persy service impl Signed-off-by: Chojan Shang --- Cargo.lock | 47 ++++++++ core/Cargo.toml | 3 + core/src/services/mod.rs | 5 + core/src/services/persy/backend.rs | 183 +++++++++++++++++++++++++++++ core/src/services/persy/docs.md | 40 +++++++ core/src/services/persy/mod.rs | 20 ++++ core/src/types/operator/builder.rs | 2 + core/src/types/scheme.rs | 4 + core/tests/behavior/main.rs | 2 + 9 files changed, 306 insertions(+) create mode 100644 core/src/services/persy/backend.rs create mode 100644 core/src/services/persy/docs.md create mode 100644 core/src/services/persy/mod.rs diff --git a/Cargo.lock b/Cargo.lock index fbb9c659a699..8b6500f7194f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -926,6 +926,21 @@ dependencies = [ "libc", ] +[[package]] +name = "crc" +version = "3.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86ec7a15cbe22e59248fc7eadb1907dab5ba09372595da4d73dd805ed4417dfe" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484" + [[package]] name = "crc32fast" version = "1.3.2" @@ -2862,6 +2877,7 @@ dependencies = [ "parking_lot 0.12.1", "paste", "percent-encoding", + "persy", "pin-project", "pretty_assertions", "prometheus", @@ -3340,6 +3356,22 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" +[[package]] +name = "persy" +version = "1.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3712821f12453814409ec149071bd4832a8ec458e648579c104aee30ed70b300" +dependencies = [ + "crc", + "data-encoding", + "fs2", + "linked-hash-map", + "rand 0.8.5", + "thiserror", + "unsigned-varint", + "zigzag", +] + [[package]] name = "pin-project" version = "1.1.2" @@ -5327,6 +5359,12 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1766d682d402817b5ac4490b3c3002d91dfa0d22812f341609f97b08757359c" +[[package]] +name = "unsigned-varint" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d86a8dc7f45e4c1b0d30e43038c38f274e77af056aa5f74b93c2cf9eb3c1c836" + [[package]] name = "untrusted" version = "0.7.1" @@ -5813,3 +5851,12 @@ name = "zeroize" version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9" + +[[package]] +name = "zigzag" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70b40401a28d86ce16a330b863b86fd7dbee4d7c940587ab09ab8c019f9e3fdf" +dependencies = [ + "num-traits", +] diff --git a/core/Cargo.toml b/core/Cargo.toml index f30dd780ba94..01334dc73359 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -50,6 +50,7 @@ default = [ "services-s3", "services-webdav", "services-webhdfs", + "services-persy" ] # Build docs or not. @@ -141,6 +142,7 @@ services-oss = [ "reqsign?/services-aliyun", "reqsign?/reqwest_request", ] +services-persy = ["dep:persy"] services-redb = ["dep:redb"] services-redis = ["dep:redis"] services-rocksdb = ["dep:rocksdb"] @@ -212,6 +214,7 @@ openssh-sftp-client = { version = "0.13.5", optional = true, features = [ opentelemetry = { version = "0.19.0", optional = true } parking_lot = "0.12" percent-encoding = "2" +persy = { version = "1.4.4", optional = true } pin-project = "1" prometheus = { version = "0.13", features = ["process"], optional = true } prost = { version = "0.11", optional = true } diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index 6c84160b8256..b484fcc301f1 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -114,6 +114,11 @@ mod cacache; #[cfg(feature = "services-cacache")] pub use self::cacache::Cacache; +#[cfg(feature = "services-persy")] +mod persy; +#[cfg(feature = "services-persy")] +pub use self::persy::Persy; + #[cfg(feature = "services-redis")] mod redis; #[cfg(feature = "services-redis")] diff --git a/core/src/services/persy/backend.rs b/core/src/services/persy/backend.rs new file mode 100644 index 000000000000..48b3b8d4021a --- /dev/null +++ b/core/src/services/persy/backend.rs @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::str; + +use async_trait::async_trait; +use persy; + +use crate::raw::adapters::kv; +use crate::Builder; +use crate::Error; +use crate::ErrorKind; +use crate::Scheme; +use crate::*; + +/// persy service support. +#[doc = include_str!("docs.md")] +#[derive(Default)] +pub struct PersyBuilder { + /// That path to the persy data file. + datafile: Option, +} + +impl PersyBuilder { + /// Set the path to the persy data directory. Will create if not exists. + pub fn datafile(&mut self, path: &str) -> &mut Self { + self.datafile = Some(path.into()); + self + } +} + +impl Builder for PersyBuilder { + const SCHEME: Scheme = Scheme::Persy; + type Accessor = PersyBackend; + + fn from_map(map: HashMap) -> Self { + let mut builder = PersyBuilder::default(); + + map.get("datafile").map(|v| builder.datafile(v)); + + builder + } + + fn build(&mut self) -> Result { + let datafile_path = self.datafile.take().ok_or_else(|| { + Error::new(ErrorKind::ConfigInvalid, "datafile is required but not set") + .with_context("service", Scheme::Persy) + })?; + + let persy = persy::Persy::open(&datafile_path, persy::Config::new()).map_err(|e| { + Error::new(ErrorKind::ConfigInvalid, "open db") + .with_context("service", Scheme::Persy) + .with_context("datafile", datafile_path.clone()) + .set_source(e) + })?; + + Ok(PersyBackend::new(Adapter { + datafile: datafile_path, + persy, + })) + } +} + +/// Backend for persy services. +pub type PersyBackend = kv::Backend; + +#[derive(Clone)] +pub struct Adapter { + datafile: String, + persy: persy::Persy, +} + +impl Debug for Adapter { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("Adapter"); + ds.field("path", &self.datafile); + ds.finish() + } +} + +#[async_trait] +impl kv::Adapter for Adapter { + fn metadata(&self) -> kv::Metadata { + kv::Metadata::new( + Scheme::Persy, + &self.datafile, + Capability { + read: true, + write: true, + delete: true, + blocking: true, + ..Default::default() + }, + ) + } + + async fn get(&self, path: &str) -> Result>> { + self.blocking_get(path) + } + + fn blocking_get(&self, path: &str) -> Result>> { + match self.persy.scan(path) { + Ok(bs) => { + let mut value = vec![]; + for (_id, content) in bs { + value = content; + } + Ok(Some(value)) + } + Err(_) => Ok(None), + } + } + + async fn set(&self, path: &str, value: &[u8]) -> Result<()> { + self.blocking_set(path, value) + } + + fn blocking_set(&self, path: &str, value: &[u8]) -> Result<()> { + let mut tx = self + .persy + .begin() + .map_err(|e| parse_error(e.persy_error()))?; + tx.insert(path, value) + .map_err(|e| parse_error(e.persy_error()))?; + let prepared = tx.prepare().map_err(|e| parse_error(e.persy_error()))?; + prepared + .commit() + .map_err(|e| parse_error(e.persy_error()))?; + + Ok(()) + } + + async fn delete(&self, path: &str) -> Result<()> { + self.blocking_delete(path) + } + + fn blocking_delete(&self, path: &str) -> Result<()> { + let mut tx = self + .persy + .begin() + .map_err(|e| parse_error(e.persy_error()))?; + for (id, _content) in self + .persy + .scan(path) + .map_err(|e| parse_error(e.persy_error()))? + { + tx.delete(path, &id) + .map_err(|e| parse_error(e.persy_error()))?; + } + let prepared = tx.prepare().map_err(|e| parse_error(e.persy_error()))?; + prepared + .commit() + .map_err(|e| parse_error(e.persy_error()))?; + + Ok(()) + } +} + +fn parse_error(err: persy::PersyError) -> Error { + let kind = match err { + persy::PersyError::RecordNotFound(_) => ErrorKind::NotFound, + _ => ErrorKind::Unexpected, + }; + + Error::new(kind, "error from persy").set_source(err) +} diff --git a/core/src/services/persy/docs.md b/core/src/services/persy/docs.md new file mode 100644 index 000000000000..b8d3e4df1723 --- /dev/null +++ b/core/src/services/persy/docs.md @@ -0,0 +1,40 @@ +## Capabilities + +This service can be used to: + +- [x] stat +- [x] read +- [x] write +- [x] create_dir +- [x] delete +- [ ] copy +- [ ] rename +- [ ] list +- [ ] ~~scan~~ +- [ ] ~~presign~~ +- [x] blocking + +## Configuration + +- `datadir`: Set the path to the cacache data directory + +You can refer to [`CacacheBuilder`]'s docs for more information + +## Example + +### Via Builder + +```rust +use anyhow::Result; +use opendal::services::Cacache; +use opendal::Operator; + +#[tokio::main] +async fn main() -> Result<()> { + let mut builder = Cacache::default(); + builder.datadir("/tmp/opendal/cacache"); + + let op: Operator = Operator::new(builder)?.finish(); + Ok(()) +} +``` diff --git a/core/src/services/persy/mod.rs b/core/src/services/persy/mod.rs new file mode 100644 index 000000000000..b192b6635c13 --- /dev/null +++ b/core/src/services/persy/mod.rs @@ -0,0 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod backend; + +pub use backend::PersyBuilder as Persy; diff --git a/core/src/types/operator/builder.rs b/core/src/types/operator/builder.rs index 370bfeeca6c7..3f78d3cedf7f 100644 --- a/core/src/types/operator/builder.rs +++ b/core/src/types/operator/builder.rs @@ -195,6 +195,8 @@ impl Operator { Scheme::Gdrive => Self::from_map::(map)?.finish(), #[cfg(feature = "services-oss")] Scheme::Oss => Self::from_map::(map)?.finish(), + #[cfg(feature = "services-persy")] + Scheme::Persy => Self::from_map::(map)?.finish(), #[cfg(feature = "services-redis")] Scheme::Redis => Self::from_map::(map)?.finish(), #[cfg(feature = "services-rocksdb")] diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index 8a0facbe7e80..475732e7b7c9 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -76,6 +76,8 @@ pub enum Scheme { Dropbox, /// [oss][crate::services::Oss]: Aliyun Object Storage Services Oss, + /// [persy][crate::services::Persy]: persy backend support. + Persy, /// [redis][crate::services::Redis]: Redis services Redis, /// [rocksdb][crate::services::Rocksdb]: RocksDB services @@ -150,6 +152,7 @@ impl FromStr for Scheme { "mini_moka" => Ok(Scheme::MiniMoka), "moka" => Ok(Scheme::Moka), "obs" => Ok(Scheme::Obs), + "persy" => Ok(Scheme::Persy), "redis" => Ok(Scheme::Redis), "rocksdb" => Ok(Scheme::Rocksdb), "s3" => Ok(Scheme::S3), @@ -187,6 +190,7 @@ impl From for &'static str { Scheme::Moka => "moka", Scheme::Obs => "obs", Scheme::Onedrive => "onedrive", + Scheme::Persy => "persy", Scheme::Gdrive => "gdrive", Scheme::Dropbox => "dropbox", Scheme::Redis => "redis", diff --git a/core/tests/behavior/main.rs b/core/tests/behavior/main.rs index 2ed195085031..150955480cf1 100644 --- a/core/tests/behavior/main.rs +++ b/core/tests/behavior/main.rs @@ -138,6 +138,8 @@ fn main() -> anyhow::Result<()> { tests.extend(behavior_test::()); #[cfg(feature = "services-oss")] tests.extend(behavior_test::()); + #[cfg(feature = "services-persy")] + tests.extend(behavior_test::()); #[cfg(feature = "services-redis")] tests.extend(behavior_test::()); #[cfg(feature = "services-rocksdb")] From 7e181fc5dacff7413024898e0b05507367e5f005 Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Sun, 16 Jul 2023 12:34:26 +0800 Subject: [PATCH 2/9] feat(services/persy): add index to record Signed-off-by: Chojan Shang --- .env.example | 5 ++ core/src/services/persy/backend.rs | 117 ++++++++++++++++++++--------- 2 files changed, 88 insertions(+), 34 deletions(-) diff --git a/.env.example b/.env.example index e3c54bc69627..d763a25494e1 100644 --- a/.env.example +++ b/.env.example @@ -123,6 +123,11 @@ OPENDAL_REDB_TABLE=redb-table # cacache OPENDAL_CACACHE_TEST=false OPENDAL_CACACHE_DATADIR=/tmp/opendal/cacache/ +# persy +OPENDAL_PERSY_TEST=false +OPENDAL_PERSY_DATAFILE=/tmp/opendal/test.persy +OPENDAL_PERSY_SEGMENT=data +OPENDAL_PERSY_INDEX=index #dropbox OPENDAL_DROPBOX_TEST=false OPENDAL_DROPBOX_ROOT=/tmp/opendal/ diff --git a/core/src/services/persy/backend.rs b/core/src/services/persy/backend.rs index 48b3b8d4021a..658f627cbe00 100644 --- a/core/src/services/persy/backend.rs +++ b/core/src/services/persy/backend.rs @@ -36,6 +36,10 @@ use crate::*; pub struct PersyBuilder { /// That path to the persy data file. datafile: Option, + /// That name of the persy segment. + segment: Option, + /// That name of the persy index. + index: Option, } impl PersyBuilder { @@ -44,6 +48,18 @@ impl PersyBuilder { self.datafile = Some(path.into()); self } + + /// Set the name of the persy segment. Will create if not exists. + pub fn segment(&mut self, path: &str) -> &mut Self { + self.segment = Some(path.into()); + self + } + + /// Set the name of the persy index. Will create if not exists. + pub fn index(&mut self, path: &str) -> &mut Self { + self.index = Some(path.into()); + self + } } impl Builder for PersyBuilder { @@ -54,6 +70,8 @@ impl Builder for PersyBuilder { let mut builder = PersyBuilder::default(); map.get("datafile").map(|v| builder.datafile(v)); + map.get("segment").map(|v| builder.segment(v)); + map.get("index").map(|v| builder.index(v)); builder } @@ -64,6 +82,27 @@ impl Builder for PersyBuilder { .with_context("service", Scheme::Persy) })?; + let segment_name = self.segment.take().ok_or_else(|| { + Error::new(ErrorKind::ConfigInvalid, "segment is required but not set") + .with_context("service", Scheme::Persy) + })?; + + let index_name = self.index.take().ok_or_else(|| { + Error::new(ErrorKind::ConfigInvalid, "index is required but not set") + .with_context("service", Scheme::Persy) + })?; + + match persy::Persy::create(&datafile_path) { + Ok(o) => o, + Err(e) if e.to_string() == "Cannot create a new file already exists" => (), + Err(e) => { + return Err(Error::new(ErrorKind::ConfigInvalid, "create db") + .with_context("service", Scheme::Persy) + .with_context("datafile", datafile_path.clone()) + .set_source(e)) + } + }; + let persy = persy::Persy::open(&datafile_path, persy::Config::new()).map_err(|e| { Error::new(ErrorKind::ConfigInvalid, "open db") .with_context("service", Scheme::Persy) @@ -71,8 +110,17 @@ impl Builder for PersyBuilder { .set_source(e) })?; + let mut tx = persy.begin().map_err(parse_error)?; + tx.create_segment(&segment_name).map_err(parse_error)?; + tx.create_index::(&index_name, persy::ValueMode::Replace) + .map_err(parse_error)?; + let prepared = tx.prepare().map_err(parse_error)?; + prepared.commit().map_err(parse_error)?; + Ok(PersyBackend::new(Adapter { datafile: datafile_path, + segment: segment_name, + index: index_name, persy, })) } @@ -84,6 +132,8 @@ pub type PersyBackend = kv::Backend; #[derive(Clone)] pub struct Adapter { datafile: String, + segment: String, + index: String, persy: persy::Persy, } @@ -91,6 +141,8 @@ impl Debug for Adapter { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let mut ds = f.debug_struct("Adapter"); ds.field("path", &self.datafile); + ds.field("segment", &self.segment); + ds.field("index", &self.index); ds.finish() } } @@ -116,16 +168,16 @@ impl kv::Adapter for Adapter { } fn blocking_get(&self, path: &str) -> Result>> { - match self.persy.scan(path) { - Ok(bs) => { - let mut value = vec![]; - for (_id, content) in bs { - value = content; - } - Ok(Some(value)) - } - Err(_) => Ok(None), + let mut read_id = self + .persy + .get::(&self.index, &path.to_string()) + .map_err(parse_error)?; + if let Some(id) = read_id.next() { + let value = self.persy.read(&self.segment, &id).map_err(parse_error)?; + return Ok(value); } + + Ok(None) } async fn set(&self, path: &str, value: &[u8]) -> Result<()> { @@ -133,16 +185,13 @@ impl kv::Adapter for Adapter { } fn blocking_set(&self, path: &str, value: &[u8]) -> Result<()> { - let mut tx = self - .persy - .begin() - .map_err(|e| parse_error(e.persy_error()))?; - tx.insert(path, value) - .map_err(|e| parse_error(e.persy_error()))?; - let prepared = tx.prepare().map_err(|e| parse_error(e.persy_error()))?; - prepared - .commit() - .map_err(|e| parse_error(e.persy_error()))?; + let mut tx = self.persy.begin().map_err(parse_error)?; + let id = tx.insert(&self.segment, value).map_err(parse_error)?; + + tx.put::(&self.index, path.to_string(), id) + .map_err(parse_error)?; + let prepared = tx.prepare().map_err(parse_error)?; + prepared.commit().map_err(parse_error)?; Ok(()) } @@ -152,28 +201,28 @@ impl kv::Adapter for Adapter { } fn blocking_delete(&self, path: &str) -> Result<()> { - let mut tx = self - .persy - .begin() - .map_err(|e| parse_error(e.persy_error()))?; - for (id, _content) in self + let mut delete_id = self .persy - .scan(path) - .map_err(|e| parse_error(e.persy_error()))? - { - tx.delete(path, &id) - .map_err(|e| parse_error(e.persy_error()))?; + .get::(&self.index, &path.to_string()) + .map_err(parse_error)?; + if let Some(id) = delete_id.next() { + //Begin a transaction + let mut tx = self.persy.begin().map_err(parse_error)?; + // delete the record + tx.delete(&self.segment, &id).map_err(parse_error)?; + // remove the index + tx.remove::(&self.index, path.to_string(), Some(id)).map_err(parse_error)?; + //Commit the tx. + let prepared = tx.prepare().map_err(parse_error)?; + prepared.commit().map_err(parse_error)?; } - let prepared = tx.prepare().map_err(|e| parse_error(e.persy_error()))?; - prepared - .commit() - .map_err(|e| parse_error(e.persy_error()))?; Ok(()) } } -fn parse_error(err: persy::PersyError) -> Error { +fn parse_error>(err: persy::PE) -> Error { + let err: persy::PersyError = err.persy_error(); let kind = match err { persy::PersyError::RecordNotFound(_) => ErrorKind::NotFound, _ => ErrorKind::Unexpected, From d6b09aa0bb62aff89d92dd5ecaa85af01147c780 Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Sun, 16 Jul 2023 13:25:17 +0800 Subject: [PATCH 3/9] docs(services/persy): fix example Signed-off-by: Chojan Shang --- core/src/services/persy/docs.md | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/core/src/services/persy/docs.md b/core/src/services/persy/docs.md index b8d3e4df1723..d6c2cd4c3308 100644 --- a/core/src/services/persy/docs.md +++ b/core/src/services/persy/docs.md @@ -16,9 +16,11 @@ This service can be used to: ## Configuration -- `datadir`: Set the path to the cacache data directory +- `datafile`: Set the path to the persy data file. The directory in the path must already exist. +- `segment`: Set the name of the persy segment. +- `index`: Set the name of the persy index. -You can refer to [`CacacheBuilder`]'s docs for more information +You can refer to [`PersyBuilder`]'s docs for more information ## Example @@ -26,13 +28,15 @@ You can refer to [`CacacheBuilder`]'s docs for more information ```rust use anyhow::Result; -use opendal::services::Cacache; +use opendal::services::Persy; use opendal::Operator; #[tokio::main] async fn main() -> Result<()> { - let mut builder = Cacache::default(); - builder.datadir("/tmp/opendal/cacache"); + let mut builder = Persy::default(); + builder.datafile("./test.persy"); + builder.segment("data"); + builder.index("index"); let op: Operator = Operator::new(builder)?.finish(); Ok(()) From cd6477a3fad0a41365f6dcdfb0d38b6e3110ca77 Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Sun, 16 Jul 2023 13:26:01 +0800 Subject: [PATCH 4/9] refactor(services/persy): handle init logic Signed-off-by: Chojan Shang --- core/src/services/persy/backend.rs | 57 +++++++++++++++++------------- 1 file changed, 33 insertions(+), 24 deletions(-) diff --git a/core/src/services/persy/backend.rs b/core/src/services/persy/backend.rs index 658f627cbe00..6795f928ba83 100644 --- a/core/src/services/persy/backend.rs +++ b/core/src/services/persy/backend.rs @@ -34,7 +34,7 @@ use crate::*; #[doc = include_str!("docs.md")] #[derive(Default)] pub struct PersyBuilder { - /// That path to the persy data file. + /// That path to the persy data file. The directory in the path must already exist. datafile: Option, /// That name of the persy segment. segment: Option, @@ -87,40 +87,48 @@ impl Builder for PersyBuilder { .with_context("service", Scheme::Persy) })?; - let index_name = self.index.take().ok_or_else(|| { + let segment = segment_name.clone(); + + let index_name = self.index.take().ok_or_else(|| { Error::new(ErrorKind::ConfigInvalid, "index is required but not set") .with_context("service", Scheme::Persy) })?; - match persy::Persy::create(&datafile_path) { - Ok(o) => o, - Err(e) if e.to_string() == "Cannot create a new file already exists" => (), - Err(e) => { - return Err(Error::new(ErrorKind::ConfigInvalid, "create db") + let index = index_name.clone(); + + let persy = persy::OpenOptions::new() + .create(true) + .prepare_with(move |p| init(p, &segment_name, &index_name)) + .open(&datafile_path) + .map_err(|e| { + Error::new(ErrorKind::ConfigInvalid, "open db") .with_context("service", Scheme::Persy) .with_context("datafile", datafile_path.clone()) - .set_source(e)) + .set_source(e) + })?; + + // This function will only be called on database creation + fn init(persy: &persy::Persy, segment_name: &str, index_name: &str) -> std::result::Result<(), Box> { + let mut tx = persy.begin()?; + + if !tx.exists_segment(segment_name)? { + tx.create_segment(segment_name)?; + } + if !tx.exists_index(index_name)? { + tx.create_index::(index_name, persy::ValueMode::Replace)?; } - }; - let persy = persy::Persy::open(&datafile_path, persy::Config::new()).map_err(|e| { - Error::new(ErrorKind::ConfigInvalid, "open db") - .with_context("service", Scheme::Persy) - .with_context("datafile", datafile_path.clone()) - .set_source(e) - })?; + let prepared = tx.prepare()?; + prepared.commit()?; - let mut tx = persy.begin().map_err(parse_error)?; - tx.create_segment(&segment_name).map_err(parse_error)?; - tx.create_index::(&index_name, persy::ValueMode::Replace) - .map_err(parse_error)?; - let prepared = tx.prepare().map_err(parse_error)?; - prepared.commit().map_err(parse_error)?; + println!("Segment and Index successfully created"); + Ok(()) + } Ok(PersyBackend::new(Adapter { datafile: datafile_path, - segment: segment_name, - index: index_name, + segment, + index, persy, })) } @@ -211,7 +219,8 @@ impl kv::Adapter for Adapter { // delete the record tx.delete(&self.segment, &id).map_err(parse_error)?; // remove the index - tx.remove::(&self.index, path.to_string(), Some(id)).map_err(parse_error)?; + tx.remove::(&self.index, path.to_string(), Some(id)) + .map_err(parse_error)?; //Commit the tx. let prepared = tx.prepare().map_err(parse_error)?; prepared.commit().map_err(parse_error)?; From c93f5e375dc1f6367d3c2ee6e828c976a2f64667 Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Sun, 16 Jul 2023 13:38:21 +0800 Subject: [PATCH 5/9] docs(services/persy): add persy item Signed-off-by: Chojan Shang --- README.md | 1 + core/README.md | 1 + website/src/components/HomepageFeatures/_feature_services.mdx | 1 + 3 files changed, 3 insertions(+) diff --git a/README.md b/README.md index c3cfd14df40d..b8c1ff48f60b 100644 --- a/README.md +++ b/README.md @@ -76,6 +76,7 @@ Major components of the project include: - cacache: [cacache](https://crates.io/crates/cacache) backend - dashmap: [dashmap](https://github.com/xacrimon/dashmap) backend - memory: In memory backend +- persy: [persy](https://crates.io/crates/persy) backend - redis: [Redis](https://redis.io/) services - rocksdb: [RocksDB](http://rocksdb.org/) services - sled: [sled](https://crates.io/crates/sled) backend diff --git a/core/README.md b/core/README.md index 6d133385e453..a7e003cc66cb 100644 --- a/core/README.md +++ b/core/README.md @@ -67,6 +67,7 @@ - cacache: [cacache](https://crates.io/crates/cacache) backend - dashmap: [dashmap](https://github.com/xacrimon/dashmap) backend - memory: In memory backend +- persy: [persy](https://crates.io/crates/persy) backend - redis: [Redis](https://redis.io/) services - rocksdb: [RocksDB](http://rocksdb.org/) services - sled: [sled](https://crates.io/crates/sled) backend diff --git a/website/src/components/HomepageFeatures/_feature_services.mdx b/website/src/components/HomepageFeatures/_feature_services.mdx index 074311ba2587..f622ede7feb0 100644 --- a/website/src/components/HomepageFeatures/_feature_services.mdx +++ b/website/src/components/HomepageFeatures/_feature_services.mdx @@ -49,6 +49,7 @@ Apache OpenDAL provides native support for all kinds for storage systems. - cacache: [cacache](https://crates.io/crates/cacache) backend - dashmap: [dashmap](https://github.com/xacrimon/dashmap) backend - memory: In memory backend +- persy: [persy](https://crates.io/crates/persy) backend - redis: [Redis](https://redis.io/) services - rocksdb: [RocksDB](http://rocksdb.org/) services - sled: [sled](https://crates.io/crates/sled) backend From f5cdcddb3003010298e5c7d585284c2839beb651 Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Sun, 16 Jul 2023 13:38:48 +0800 Subject: [PATCH 6/9] chore(services/persy): add persy ci Signed-off-by: Chojan Shang --- .github/workflows/service_test_persy.yml | 56 ++++++++++++++++++++++++ core/Cargo.toml | 1 - 2 files changed, 56 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/service_test_persy.yml diff --git a/.github/workflows/service_test_persy.yml b/.github/workflows/service_test_persy.yml new file mode 100644 index 000000000000..3ae467f3f41d --- /dev/null +++ b/.github/workflows/service_test_persy.yml @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: Service Test Persy + +on: + push: + branches: + - main + pull_request: + branches: + - main + paths: + - "core/src/**" + - "core/tests/**" + - "!core/src/docs/**" + - "!core/src/services/**" + - "core/src/services/persy/**" + - ".github/workflows/service_test_persy.yml" + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }} + cancel-in-progress: true + +jobs: + persy: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Setup Rust toolchain + uses: ./.github/actions/setup + - name: Test persy + shell: bash + working-directory: core + run: cargo test persy --features services-persy -j=1 + env: + RUST_BACKTRACE: full + RUST_LOG: debug + OPENDAL_PERSY_TEST: on + OPENDAL_PERSY_DATAFILE: ./test.persy + OPENDAL_PERSY_SEGMENT: data + OPENDAL_PERSY_INDEX: index diff --git a/core/Cargo.toml b/core/Cargo.toml index 01334dc73359..ee6f2995efb0 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -50,7 +50,6 @@ default = [ "services-s3", "services-webdav", "services-webhdfs", - "services-persy" ] # Build docs or not. From 0c014162b9d3adf5fb181defbbf1ebb3d3d9246f Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Sun, 16 Jul 2023 13:47:50 +0800 Subject: [PATCH 7/9] chore: cargo fmt Signed-off-by: Chojan Shang --- core/src/services/persy/backend.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/services/persy/backend.rs b/core/src/services/persy/backend.rs index 6795f928ba83..062d68573d9a 100644 --- a/core/src/services/persy/backend.rs +++ b/core/src/services/persy/backend.rs @@ -89,7 +89,7 @@ impl Builder for PersyBuilder { let segment = segment_name.clone(); - let index_name = self.index.take().ok_or_else(|| { + let index_name = self.index.take().ok_or_else(|| { Error::new(ErrorKind::ConfigInvalid, "index is required but not set") .with_context("service", Scheme::Persy) })?; @@ -108,7 +108,11 @@ impl Builder for PersyBuilder { })?; // This function will only be called on database creation - fn init(persy: &persy::Persy, segment_name: &str, index_name: &str) -> std::result::Result<(), Box> { + fn init( + persy: &persy::Persy, + segment_name: &str, + index_name: &str, + ) -> std::result::Result<(), Box> { let mut tx = persy.begin()?; if !tx.exists_segment(segment_name)? { From c4dfb4b02fabee7867f334aca46acfd4305c3899 Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Sun, 16 Jul 2023 17:01:24 +0800 Subject: [PATCH 8/9] chore(ci): remove j=1 Signed-off-by: Chojan Shang --- .github/workflows/service_test_persy.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/service_test_persy.yml b/.github/workflows/service_test_persy.yml index 3ae467f3f41d..66457835f31f 100644 --- a/.github/workflows/service_test_persy.yml +++ b/.github/workflows/service_test_persy.yml @@ -46,7 +46,7 @@ jobs: - name: Test persy shell: bash working-directory: core - run: cargo test persy --features services-persy -j=1 + run: cargo test persy --features services-persy env: RUST_BACKTRACE: full RUST_LOG: debug From 9246e8f28a9b2fed4740aa7bb4a472a1b1988bed Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Sun, 16 Jul 2023 23:37:48 +0800 Subject: [PATCH 9/9] chore: fix comments style Signed-off-by: Chojan Shang --- core/src/services/persy/backend.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/services/persy/backend.rs b/core/src/services/persy/backend.rs index 062d68573d9a..0c92a92ffbb4 100644 --- a/core/src/services/persy/backend.rs +++ b/core/src/services/persy/backend.rs @@ -125,7 +125,6 @@ impl Builder for PersyBuilder { let prepared = tx.prepare()?; prepared.commit()?; - println!("Segment and Index successfully created"); Ok(()) } @@ -218,14 +217,14 @@ impl kv::Adapter for Adapter { .get::(&self.index, &path.to_string()) .map_err(parse_error)?; if let Some(id) = delete_id.next() { - //Begin a transaction + // Begin a transaction. let mut tx = self.persy.begin().map_err(parse_error)?; - // delete the record + // Delete the record. tx.delete(&self.segment, &id).map_err(parse_error)?; - // remove the index + // Remove the index. tx.remove::(&self.index, path.to_string(), Some(id)) .map_err(parse_error)?; - //Commit the tx. + // Commit the tx. let prepared = tx.prepare().map_err(parse_error)?; prepared.commit().map_err(parse_error)?; }