From f0defd18fc99a5643ecb9cbd600efd3bb6e35d19 Mon Sep 17 00:00:00 2001 From: owl Date: Thu, 29 Jun 2023 01:46:48 +0800 Subject: [PATCH 01/12] feat(services/tikv): introduce new service backend tikv Signed-off-by: owl --- core/Cargo.toml | 2 + core/src/services/mod.rs | 2 + core/src/services/tikv/backend.rs | 298 ++++++++++++++++++++++++++++++ core/src/services/tikv/mod.rs | 16 ++ 4 files changed, 318 insertions(+) create mode 100644 core/src/services/tikv/backend.rs create mode 100644 core/src/services/tikv/mod.rs diff --git a/core/Cargo.toml b/core/Cargo.toml index 33f2d742701e..08ba3a91fcb4 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -156,6 +156,7 @@ services-s3 = [ services-sftp = ["dep:openssh", "dep:openssh-sftp-client", "dep:dirs"] services-sled = ["dep:sled"] services-supabase = [] +services-tikv = ["tikv-client"] services-vercel-artifacts = [] services-wasabi = [ "dep:reqsign", @@ -241,6 +242,7 @@ suppaftp = { version = "4.5", default-features = false, features = [ "async-secure", "async-rustls", ], optional = true } +tikv-client = { version = "0.2.0", optional = true } tokio = "1.27" tracing = { version = "0.1", optional = true } uuid = { version = "1", features = ["serde", "v4"] } diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index faa541629dc0..b0cf0a04ac38 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -193,3 +193,5 @@ pub use vercel_artifacts::VercelArtifacts; mod redb; #[cfg(feature = "services-redb")] pub use self::redb::Redb; +#[cfg(feature = "services-tikv")] +pub mod tikv; \ No newline at end of file diff --git a/core/src/services/tikv/backend.rs b/core/src/services/tikv/backend.rs new file mode 100644 index 000000000000..393b9d27500b --- /dev/null +++ b/core/src/services/tikv/backend.rs @@ -0,0 +1,298 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::io::Error; +use std::io::ErrorKind; +use std::io::Result; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +use futures::future::BoxFuture; +use futures::ready; +use futures::Stream; +use pin_project::pin_project; +use tikv_client::BoundRange; +use tikv_client::Config; +use tikv_client::Key; +use tikv_client::KvPair; +use tikv_client::RawClient; + +use crate::adapters::kv::Adapter; +use crate::error::new_other_backend_error; +use crate::error::new_other_object_error; +use crate::ops::Operation; +use crate::path::normalize_root; + +const DEFAULT_TIKV_ENDPOINT: &str = "127.0.0.1:2379"; +const DEFAULT_TIKV_PORT: u16 = 6379; + +/// TiKV backend builder +#[derive(Clone, Default)] +pub struct Builder { + /// network address of the TiKV service. + /// + /// default is "127.0.0.1:2379" + endpoints: Option>, + /// whether using insecure connection to TiKV + insecure: bool, + /// certificate authority file path + ca_path: Option, + /// cert path + cert_path: Option, + /// key path + key_path: Option, + + /// the working directory of the TiKV service. Can be "path/to/dir" + /// + /// default is "/" + root: Option, +} + +impl Builder { + pub fn endpoints(&mut self, endpoints: impl Into>) -> &mut Self { + let ep: Vec = endpoints.into().into_iter().map(|s| s.to_owned()).collect(); + if !ep.is_empty() { + self.endpoints = Some(ep) + } + self + } + + pub fn insecure(&mut self) -> &mut Self { + self.insecure = true; + self + } + + pub fn ca_path(&mut self, ca_path: &str) -> &mut Self { + if !ca_path.is_empty() { + self.ca_path = Some(ca_path.to_string()) + } + self + } + + pub fn cert_path(&mut self, cert_path: &str) -> &mut Self { + if !cert_path.is_empty() { + self.cert_path = Some(cert_path.to_string()) + } + self + } + + pub fn key_path(&mut self, key_path: &str) -> &mut Self { + if !key_path.is_empty() { + self.key_path = Some(key_path.to_string()) + } + self + } + + pub fn root(&mut self, root: &str) -> &mut Self { + if !root.is_empty() { + self.root = Some(root.to_string()) + } + self + } +} + +impl Builder { + pub async fn build(&mut self) -> Result { + let endpoints = self + .endpoints + .clone() + .unwrap_or_else(|| vec![DEFAULT_TIKV_ENDPOINT.to_string()]); + + let r = self + .root + .clone() + .unwrap_or_else(|| "/".to_string()) + .as_str(); + let root = normalize_root(r); + + let mut ctx = Hashmap::from([("endpoints".to_string(), format!("{:?}", endpoint.clone()))]); + + let client = if self.insecure { + RawClient::new(endpoints).await.map_err(|err| { + new_other_backend_error(ctx.clone(), anyhow::anyhow!("invalid configuration", err)) + })? + } else if self.ca_path.is_some() && self.key_path.is_some() && self.cert_path.is_some() { + let (ca_path, key_path, cert_path) = ( + self.ca_path.clone().unwrap(), + self.key_path.clone().unwrap(), + self.cert_path.clone().unwrap(), + ); + ctx.extend([ + ("ca_path".to_string(), ca_path.clone()), + ("key_path".to_string(), key_path.clone()), + ("cert_path".to_string(), cert_path.clone()), + ]); + let config = Config::default().with_security(ca_path, cert_path, key_path); + RawClient::new_with_config(endpoints, config) + .await + .map_err(|err| { + new_other_backend_error( + ctx.clone(), + anyhow::anyhow!("invalid configuration", err), + ) + })? + } else { + return Err(new_other_backend_error( + ctx.clone(), + anyhow::anyhow!("invalid configuration: no enough certifications"), + )); + }; + + debug!("backend build finished: {:?}", &self); + Ok(Backend::new(Adapter { + client, + next_id: Arc::new(AtomicU64::new(0)), + })) + } +} + +/// Backend for TiKV service +pub type Backend = kv::Backend; + +#[derive(Clone)] +pub struct Adapter { + client: TransactionClient, + next_id: Arc, +} + +#[async_trait::async_trait] +impl kv::Adapter for Adapter { + fn metadata(&self) -> kv::Metadata { + kv::Metadata::new( + Scheme::TiKV, + "TiKV", + AccessorCapability::Read | AccessorCapability::Write, + ) + } + + async fn next_id(&self) -> Result { + Ok(self.next_id.fetch_add(1, Ordering::Relaxed)) + } + + async fn set(&self, key: &[u8], value: &[u8]) -> Result<()> { + self.client + .put(key, value) + .await + .map_err(|e| Error::new(ErrorKind::Other, anyhow!("tikv: {:?}", e))) + } + + async fn get(&self, key: &[u8]) -> Result>> { + self.client + .get(key) + .await + .map_err(|e| Error::new(ErrorKind::Other, anyhow!("tikv: {:?}", e))) + } + + async fn delete(&self, key: &[u8]) -> Result<()> { + self.client + .delete(key) + .await + .map_err(|e| Error::new(ErrorKind::Other, anyhow!("tikv: {:?}", e))) + } + + async fn scan(&self, prefix: &[u8]) -> Result { + Ok(kv::KeyStreamer::new(self.client.clone(), prefix)) + } +} + +#[pin_project] +struct KeyStream { + client: RawClient, + bound: BoundRange, + end: Vec, + keys: IntoIter>, + + fut: Option>>>>, + + cursor: &[u8], + done: bool, +} + +impl KeyStream { + fn new(client: RawClient, prefix: &[u8]) -> Self { + let end = prefix.to_vec().extend_one(b"\0"); + let bound = BoundRange::new(prefix, &end); + + Self { + client, + bound, + end, + + keys: vec![].into_iter(), + fut: None, + done: false, + cursor: prefix, + } + } +} + +impl Stream for KeyStream { + type Item = Result>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + + loop { + if let Some(key) = this.keys.next() { + debug_assert!( + this.bound.contains(key), + "prefix is not match: expect {:x?}, got {:x?}", + *this.arg, + key + ); + return Poll::Ready(Some(Ok(key))); + } + + match this.fut { + None => { + if *this.done { + return Poll::Ready(None); + } + + let arg = this.arg.to_vec(); + let cursor = *this.cursor; + let mut client = this.client.clone(); + let bound = BoundRange::new(cursor, &self.end); + let fut = async move { + let keys = client + .scan_keys(bound, 100) + .await + .map_err(|e| Error::new(ErrorKind::Other, anyhow!("tikv: {:?}", e)))?; + cursor = keys.last(); + Ok((cursor, keys)) + }; + *this.fut = Some(Box::pin(fut)); + continue; + } + Some(fut) => { + let (cursor, keys) = ready!(Pin::new(fut).poll(cx))?; + + *this.fut = None; + + if let Some(cursor) = cursor { + *this.cursor = cursor; + } else { + *this.done = true; + } + *this.keys = keys.into_iter(); + continue; + } + } + } + } +} \ No newline at end of file diff --git a/core/src/services/tikv/mod.rs b/core/src/services/tikv/mod.rs new file mode 100644 index 000000000000..954d06051dc1 --- /dev/null +++ b/core/src/services/tikv/mod.rs @@ -0,0 +1,16 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod backend; +pub use backend::Backend; \ No newline at end of file From 2e37fb4fbd20e4300d75a24a74b8a3f50af04329 Mon Sep 17 00:00:00 2001 From: owl Date: Fri, 14 Jul 2023 01:17:27 +0800 Subject: [PATCH 02/12] feat(services/tikv): fix code Signed-off-by: owl --- Cargo.lock | 76 +++++++ core/Cargo.toml | 1 + core/src/services/mod.rs | 5 +- core/src/services/tikv/backend.rs | 333 +++++++++++++----------------- core/src/services/tikv/docs.md | 44 ++++ core/src/services/tikv/mod.rs | 3 +- core/src/types/scheme.rs | 3 + 7 files changed, 276 insertions(+), 189 deletions(-) create mode 100644 core/src/services/tikv/docs.md diff --git a/Cargo.lock b/Cargo.lock index 27548bc25892..2d09385ed490 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -252,6 +252,17 @@ dependencies = [ "event-listener", ] +[[package]] +name = "async-recursion" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7d78656ba01f1b93024b7c3a0467f1608e4be67d725749fdcd7d2c7678fd7a2" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "async-std" version = "1.12.0" @@ -1208,6 +1219,17 @@ dependencies = [ "zeroize", ] +[[package]] +name = "derive-new" +version = "0.5.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3418329ca0ad70234b9735dc4ceed10af4df60eff9c8e7b06cb5e520d92c3535" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "derive_arbitrary" version = "1.3.1" @@ -1420,6 +1442,17 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" +[[package]] +name = "fail" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3be3c61c59fdc91f5dbc3ea31ee8623122ce80057058be560654c5d410d181a6" +dependencies = [ + "lazy_static", + "log", + "rand 0.7.3", +] + [[package]] name = "fastrand" version = "1.9.0" @@ -3059,6 +3092,7 @@ dependencies = [ "size", "sled", "suppaftp", + "tikv-client", "tokio", "tracing", "tracing-opentelemetry", @@ -3814,6 +3848,7 @@ dependencies = [ "parking_lot 0.12.1", "procfs", "protobuf", + "reqwest", "thiserror", ] @@ -5143,6 +5178,36 @@ dependencies = [ "threadpool", ] +[[package]] +name = "tikv-client" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e9a500ec7c937fe18b1d08720406e63f3c7e2de1d7c58d460d3dc21a6656a3b" +dependencies = [ + "async-recursion", + "async-trait", + "derive-new", + "either", + "fail", + "futures", + "glob", + "lazy_static", + "log", + "pin-project", + "prometheus", + "prost", + "rand 0.8.5", + "regex", + "semver", + "serde", + "serde_derive", + "thiserror", + "tokio", + "tonic", + "tonic-build", + "tonic-disable-doctest", +] + [[package]] name = "time" version = "0.1.45" @@ -5408,6 +5473,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "tonic-disable-doctest" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "322be90b9db8035feae8d5b26d04dd9146dc4b0eaa6dec32fc9cf68d1a29e806" +dependencies = [ + "convert_case", + "itertools", + "tonic-build", +] + [[package]] name = "tower" version = "0.4.13" diff --git a/core/Cargo.toml b/core/Cargo.toml index 08ba3a91fcb4..75bd373f776c 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -50,6 +50,7 @@ default = [ "services-s3", "services-webdav", "services-webhdfs", + "services-tikv", ] # Build docs or not. diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index b0cf0a04ac38..b6ff23f3ef38 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -193,5 +193,8 @@ pub use vercel_artifacts::VercelArtifacts; mod redb; #[cfg(feature = "services-redb")] pub use self::redb::Redb; + +#[cfg(feature = "services-tikv")] +mod tikv; #[cfg(feature = "services-tikv")] -pub mod tikv; \ No newline at end of file +pub use self::tikv::TiKV; diff --git a/core/src/services/tikv/backend.rs b/core/src/services/tikv/backend.rs index 393b9d27500b..67091d4154b5 100644 --- a/core/src/services/tikv/backend.rs +++ b/core/src/services/tikv/backend.rs @@ -15,35 +15,26 @@ // specific language governing permissions and limitations // under the License. -use std::io::Error; -use std::io::ErrorKind; -use std::io::Result; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; - -use futures::future::BoxFuture; -use futures::ready; -use futures::Stream; -use pin_project::pin_project; -use tikv_client::BoundRange; +use std::collections::HashMap; use tikv_client::Config; -use tikv_client::Key; -use tikv_client::KvPair; use tikv_client::RawClient; -use crate::adapters::kv::Adapter; -use crate::error::new_other_backend_error; -use crate::error::new_other_object_error; -use crate::ops::Operation; -use crate::path::normalize_root; +use crate::raw::adapters::kv; +use crate::Capability; +use crate::Scheme; +use async_trait::async_trait; + +use crate::Builder; +use crate::Error; +use crate::ErrorKind; +use crate::*; -const DEFAULT_TIKV_ENDPOINT: &str = "127.0.0.1:2379"; -const DEFAULT_TIKV_PORT: u16 = 6379; +use std::fmt::Debug; +use std::fmt::Formatter; /// TiKV backend builder #[derive(Clone, Default)] -pub struct Builder { +pub struct TiKVBuilder { /// network address of the TiKV service. /// /// default is "127.0.0.1:2379" @@ -56,27 +47,25 @@ pub struct Builder { cert_path: Option, /// key path key_path: Option, - - /// the working directory of the TiKV service. Can be "path/to/dir" - /// - /// default is "/" - root: Option, } -impl Builder { - pub fn endpoints(&mut self, endpoints: impl Into>) -> &mut Self { - let ep: Vec = endpoints.into().into_iter().map(|s| s.to_owned()).collect(); +impl TiKVBuilder { + /// Set the network address of the TiKV service. + pub fn endpoints(&mut self, endpoints: impl Into>) -> &mut Self { + let ep: Vec = endpoints.into().into_iter().collect(); if !ep.is_empty() { self.endpoints = Some(ep) } self } + /// Set the insecure connection to TiKV. pub fn insecure(&mut self) -> &mut Self { - self.insecure = true; + self.insecure = false; self } + /// Set the certificate authority file path. pub fn ca_path(&mut self, ca_path: &str) -> &mut Self { if !ca_path.is_empty() { self.ca_path = Some(ca_path.to_string()) @@ -84,6 +73,7 @@ impl Builder { self } + /// Set the certificate file path. pub fn cert_path(&mut self, cert_path: &str) -> &mut Self { if !cert_path.is_empty() { self.cert_path = Some(cert_path.to_string()) @@ -91,208 +81,177 @@ impl Builder { self } + /// Set the key file path. pub fn key_path(&mut self, key_path: &str) -> &mut Self { if !key_path.is_empty() { self.key_path = Some(key_path.to_string()) } self } +} - pub fn root(&mut self, root: &str) -> &mut Self { - if !root.is_empty() { - self.root = Some(root.to_string()) +impl Builder for TiKVBuilder { + const SCHEME: Scheme = Scheme::Redb; + type Accessor = Backend; + + fn from_map(map: HashMap) -> Self { + let mut builder = TiKVBuilder::default(); + + map.get("endpoints") + .map(|v| v.split(',').map(|s| s.to_owned()).collect::>()) + .map(|v| builder.endpoints(v)); + map.get("insecure") + .filter(|v| *v == "on" || *v == "true") + .map(|_| builder.insecure()); + map.get("ca_path").map(|v| builder.ca_path(v)); + map.get("cert_path").map(|v| builder.cert_path(v)); + map.get("key_path").map(|v| builder.key_path(v)); + + builder + } + + fn build(&mut self) -> Result { + let endpoints = self.endpoints.take().ok_or_else(|| { + Error::new( + ErrorKind::ConfigInvalid, + "endpoints is required but not set", + ) + .with_context("service", Scheme::TiKV) + })?; + + if self.insecure { + if self.ca_path.is_some() || self.key_path.is_some() || self.cert_path.is_some() { + return Err( + Error::new(ErrorKind::ConfigInvalid, "invalid tls configuration") + .with_context("service", Scheme::TiKV) + .with_context("endpoints", format!("{:?}", endpoints)), + )?; + } + } else if !(self.ca_path.is_some() && self.key_path.is_some() && self.cert_path.is_some()) { + return Err( + Error::new(ErrorKind::ConfigInvalid, "invalid tls configuration") + .with_context("service", Scheme::TiKV) + .with_context("endpoints", format!("{:?}", endpoints)), + )?; } - self + + Ok(Backend::new(Adapter { + client: None, + endpoints, + insecure: self.insecure, + ca_path: self.ca_path.clone(), + cert_path: self.cert_path.clone(), + key_path: self.key_path.clone(), + })) } } -impl Builder { - pub async fn build(&mut self) -> Result { - let endpoints = self - .endpoints - .clone() - .unwrap_or_else(|| vec![DEFAULT_TIKV_ENDPOINT.to_string()]); +/// Backend for TiKV service +pub type Backend = kv::Backend; - let r = self - .root - .clone() - .unwrap_or_else(|| "/".to_string()) - .as_str(); - let root = normalize_root(r); +#[derive(Clone)] +pub struct Adapter { + client: Option, + endpoints: Vec, + insecure: bool, + ca_path: Option, + cert_path: Option, + key_path: Option, +} + +impl Debug for Adapter { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("Adapter"); + ds.field("endpoints", &self.endpoints); + ds.field("insecure", &self.insecure); + ds.field("ca_path", &self.ca_path); + ds.field("cert_path", &self.cert_path); + ds.field("key_path", &self.key_path); + ds.finish() + } +} - let mut ctx = Hashmap::from([("endpoints".to_string(), format!("{:?}", endpoint.clone()))]); +impl Adapter { + async fn get_connection(&self) -> Result { + if let Some(client) = self.client.clone() { + return Ok(client); + } let client = if self.insecure { - RawClient::new(endpoints).await.map_err(|err| { - new_other_backend_error(ctx.clone(), anyhow::anyhow!("invalid configuration", err)) - })? + RawClient::new(self.endpoints.clone()) + .await + .map_err(|err| { + Error::new(ErrorKind::ConfigInvalid, "invalid configuration") + .with_context("service", Scheme::TiKV) + .with_context("endpoints", format!("{:?}", self.endpoints)) + .set_source(err) + })? } else if self.ca_path.is_some() && self.key_path.is_some() && self.cert_path.is_some() { let (ca_path, key_path, cert_path) = ( self.ca_path.clone().unwrap(), self.key_path.clone().unwrap(), self.cert_path.clone().unwrap(), ); - ctx.extend([ - ("ca_path".to_string(), ca_path.clone()), - ("key_path".to_string(), key_path.clone()), - ("cert_path".to_string(), cert_path.clone()), - ]); let config = Config::default().with_security(ca_path, cert_path, key_path); - RawClient::new_with_config(endpoints, config) + RawClient::new_with_config(self.endpoints.clone(), config) .await .map_err(|err| { - new_other_backend_error( - ctx.clone(), - anyhow::anyhow!("invalid configuration", err), - ) + Error::new(ErrorKind::ConfigInvalid, "invalid configuration") + .with_context("service", Scheme::TiKV) + .with_context("endpoints", format!("{:?}", self.endpoints)) + .set_source(err) })? } else { - return Err(new_other_backend_error( - ctx.clone(), - anyhow::anyhow!("invalid configuration: no enough certifications"), - )); + return Err( + Error::new(ErrorKind::ConfigInvalid, "invalid configuration") + .with_context("service", Scheme::TiKV) + .with_context("endpoints", format!("{:?}", self.endpoints)), + ); }; - - debug!("backend build finished: {:?}", &self); - Ok(Backend::new(Adapter { - client, - next_id: Arc::new(AtomicU64::new(0)), - })) + Ok(client) } } -/// Backend for TiKV service -pub type Backend = kv::Backend; - -#[derive(Clone)] -pub struct Adapter { - client: TransactionClient, - next_id: Arc, -} - -#[async_trait::async_trait] +#[async_trait] impl kv::Adapter for Adapter { fn metadata(&self) -> kv::Metadata { kv::Metadata::new( Scheme::TiKV, "TiKV", - AccessorCapability::Read | AccessorCapability::Write, + Capability { + read: true, + write: true, + blocking: false, + ..Default::default() + }, ) } - async fn next_id(&self) -> Result { - Ok(self.next_id.fetch_add(1, Ordering::Relaxed)) - } - - async fn set(&self, key: &[u8], value: &[u8]) -> Result<()> { - self.client - .put(key, value) + async fn get(&self, path: &str) -> Result>> { + self.get_connection() + .await? + .get(path.to_owned()) .await - .map_err(|e| Error::new(ErrorKind::Other, anyhow!("tikv: {:?}", e))) + .map_err(parse_tikv_error) } - async fn get(&self, key: &[u8]) -> Result>> { - self.client - .get(key) + async fn set(&self, path: &str, value: &[u8]) -> Result<()> { + self.get_connection() + .await? + .put(path.to_owned(), value.to_vec()) .await - .map_err(|e| Error::new(ErrorKind::Other, anyhow!("tikv: {:?}", e))) + .map_err(parse_tikv_error) } - async fn delete(&self, key: &[u8]) -> Result<()> { - self.client - .delete(key) + async fn delete(&self, path: &str) -> Result<()> { + self.get_connection() + .await? + .delete(path.to_owned()) .await - .map_err(|e| Error::new(ErrorKind::Other, anyhow!("tikv: {:?}", e))) - } - - async fn scan(&self, prefix: &[u8]) -> Result { - Ok(kv::KeyStreamer::new(self.client.clone(), prefix)) + .map_err(parse_tikv_error) } } -#[pin_project] -struct KeyStream { - client: RawClient, - bound: BoundRange, - end: Vec, - keys: IntoIter>, - - fut: Option>>>>, - - cursor: &[u8], - done: bool, -} - -impl KeyStream { - fn new(client: RawClient, prefix: &[u8]) -> Self { - let end = prefix.to_vec().extend_one(b"\0"); - let bound = BoundRange::new(prefix, &end); - - Self { - client, - bound, - end, - - keys: vec![].into_iter(), - fut: None, - done: false, - cursor: prefix, - } - } +fn parse_tikv_error(e: tikv_client::Error) -> Error { + Error::new(ErrorKind::Unexpected, "error from tikv").set_source(e) } - -impl Stream for KeyStream { - type Item = Result>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - - loop { - if let Some(key) = this.keys.next() { - debug_assert!( - this.bound.contains(key), - "prefix is not match: expect {:x?}, got {:x?}", - *this.arg, - key - ); - return Poll::Ready(Some(Ok(key))); - } - - match this.fut { - None => { - if *this.done { - return Poll::Ready(None); - } - - let arg = this.arg.to_vec(); - let cursor = *this.cursor; - let mut client = this.client.clone(); - let bound = BoundRange::new(cursor, &self.end); - let fut = async move { - let keys = client - .scan_keys(bound, 100) - .await - .map_err(|e| Error::new(ErrorKind::Other, anyhow!("tikv: {:?}", e)))?; - cursor = keys.last(); - Ok((cursor, keys)) - }; - *this.fut = Some(Box::pin(fut)); - continue; - } - Some(fut) => { - let (cursor, keys) = ready!(Pin::new(fut).poll(cx))?; - - *this.fut = None; - - if let Some(cursor) = cursor { - *this.cursor = cursor; - } else { - *this.done = true; - } - *this.keys = keys.into_iter(); - continue; - } - } - } - } -} \ No newline at end of file diff --git a/core/src/services/tikv/docs.md b/core/src/services/tikv/docs.md new file mode 100644 index 000000000000..8de3dbeff454 --- /dev/null +++ b/core/src/services/tikv/docs.md @@ -0,0 +1,44 @@ +## Capabilities + +This service can be used to: + +- [x] stat +- [x] read +- [x] write +- [x] create_dir +- [x] delete +- [x] copy +- [x] rename +- [ ] ~~list~~ +- [ ] scan +- [ ] ~~presign~~ +- [ ] ~~blocking~~ + +## Configuration + +- `endpoints`: Set the endpoints to the tikv cluster +- `insecure`: Set the insecure flag to the tikv cluster +- `ca_path`: Set the ca path to the tikv connection +- `cert_path`: Set the cert path to the tikv connection +- `key_path`: Set the key path to the tikv connection + +You can refer to [`TiKVBuilder`]'s docs for more information + +## Example + +### Via Builder + +```rust +use anyhow::Result; +use opendal::services::Tikv; +use opendal::Operator; + +#[tokio::main] +async fn main() -> Result<()> { + let mut builder = Tikv::default(); + builder.endpoints("127.0.0.1:2379"); + + let op: Operator = Operator::new(builder)?.finish(); + Ok(()) +} +``` diff --git a/core/src/services/tikv/mod.rs b/core/src/services/tikv/mod.rs index 954d06051dc1..8f8c0af39d1d 100644 --- a/core/src/services/tikv/mod.rs +++ b/core/src/services/tikv/mod.rs @@ -13,4 +13,5 @@ // limitations under the License. mod backend; -pub use backend::Backend; \ No newline at end of file + +pub use backend::TiKVBuilder as TiKV; diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index 3b29260c9005..838bf4d6c79a 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -102,6 +102,8 @@ pub enum Scheme { Webhdfs, /// [redb][crate::services::Redb]: Redb Services Redb, + /// [tikv][crate::services::tikv]: TiKV Services + TiKV, /// Custom that allow users to implement services outside of OpenDAL. /// /// # NOTE @@ -209,6 +211,7 @@ impl From for &'static str { Scheme::Webdav => "webdav", Scheme::Webhdfs => "webhdfs", Scheme::Redb => "redb", + Scheme::TiKV => "tikv", Scheme::Custom(v) => v, } } From 05eacb2f898515d2c6883149cc43fd2021a9e7d8 Mon Sep 17 00:00:00 2001 From: owl Date: Fri, 14 Jul 2023 01:24:04 +0800 Subject: [PATCH 03/12] feat(services/tikv): fix code Signed-off-by: owl --- core/src/services/tikv/mod.rs | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/core/src/services/tikv/mod.rs b/core/src/services/tikv/mod.rs index 8f8c0af39d1d..75a3eb5d3209 100644 --- a/core/src/services/tikv/mod.rs +++ b/core/src/services/tikv/mod.rs @@ -1,16 +1,20 @@ -// Copyright 2022 Datafuse Labs. +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at // -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 // -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + mod backend; From 07c89a665f76de71e21b9f3d7452b802a63afd37 Mon Sep 17 00:00:00 2001 From: owl Date: Fri, 14 Jul 2023 23:43:43 +0800 Subject: [PATCH 04/12] feat(services/tikv): fix code Signed-off-by: owl --- core/src/services/mod.rs | 2 +- core/src/services/tikv/backend.rs | 30 +++++++++++++----------------- core/src/services/tikv/mod.rs | 2 +- core/src/types/scheme.rs | 6 +++--- 4 files changed, 18 insertions(+), 22 deletions(-) diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index b6ff23f3ef38..154e7f0291e1 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -197,4 +197,4 @@ pub use self::redb::Redb; #[cfg(feature = "services-tikv")] mod tikv; #[cfg(feature = "services-tikv")] -pub use self::tikv::TiKV; +pub use self::tikv::Tikv; diff --git a/core/src/services/tikv/backend.rs b/core/src/services/tikv/backend.rs index 67091d4154b5..fcee836cd02a 100644 --- a/core/src/services/tikv/backend.rs +++ b/core/src/services/tikv/backend.rs @@ -34,7 +34,7 @@ use std::fmt::Formatter; /// TiKV backend builder #[derive(Clone, Default)] -pub struct TiKVBuilder { +pub struct TikvBuilder { /// network address of the TiKV service. /// /// default is "127.0.0.1:2379" @@ -49,10 +49,10 @@ pub struct TiKVBuilder { key_path: Option, } -impl TiKVBuilder { +impl TikvBuilder { /// Set the network address of the TiKV service. - pub fn endpoints(&mut self, endpoints: impl Into>) -> &mut Self { - let ep: Vec = endpoints.into().into_iter().collect(); + pub fn endpoints(&mut self, endpoints: Vec) -> &mut Self { + let ep: Vec = endpoints.into_iter().collect(); if !ep.is_empty() { self.endpoints = Some(ep) } @@ -90,12 +90,12 @@ impl TiKVBuilder { } } -impl Builder for TiKVBuilder { +impl Builder for TikvBuilder { const SCHEME: Scheme = Scheme::Redb; type Accessor = Backend; fn from_map(map: HashMap) -> Self { - let mut builder = TiKVBuilder::default(); + let mut builder = TikvBuilder::default(); map.get("endpoints") .map(|v| v.split(',').map(|s| s.to_owned()).collect::>()) @@ -116,21 +116,21 @@ impl Builder for TiKVBuilder { ErrorKind::ConfigInvalid, "endpoints is required but not set", ) - .with_context("service", Scheme::TiKV) + .with_context("service", Scheme::Tikv) })?; if self.insecure { if self.ca_path.is_some() || self.key_path.is_some() || self.cert_path.is_some() { return Err( Error::new(ErrorKind::ConfigInvalid, "invalid tls configuration") - .with_context("service", Scheme::TiKV) + .with_context("service", Scheme::Tikv) .with_context("endpoints", format!("{:?}", endpoints)), )?; } } else if !(self.ca_path.is_some() && self.key_path.is_some() && self.cert_path.is_some()) { return Err( Error::new(ErrorKind::ConfigInvalid, "invalid tls configuration") - .with_context("service", Scheme::TiKV) + .with_context("service", Scheme::Tikv) .with_context("endpoints", format!("{:?}", endpoints)), )?; } @@ -163,10 +163,6 @@ impl Debug for Adapter { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let mut ds = f.debug_struct("Adapter"); ds.field("endpoints", &self.endpoints); - ds.field("insecure", &self.insecure); - ds.field("ca_path", &self.ca_path); - ds.field("cert_path", &self.cert_path); - ds.field("key_path", &self.key_path); ds.finish() } } @@ -182,7 +178,7 @@ impl Adapter { .await .map_err(|err| { Error::new(ErrorKind::ConfigInvalid, "invalid configuration") - .with_context("service", Scheme::TiKV) + .with_context("service", Scheme::Tikv) .with_context("endpoints", format!("{:?}", self.endpoints)) .set_source(err) })? @@ -197,14 +193,14 @@ impl Adapter { .await .map_err(|err| { Error::new(ErrorKind::ConfigInvalid, "invalid configuration") - .with_context("service", Scheme::TiKV) + .with_context("service", Scheme::Tikv) .with_context("endpoints", format!("{:?}", self.endpoints)) .set_source(err) })? } else { return Err( Error::new(ErrorKind::ConfigInvalid, "invalid configuration") - .with_context("service", Scheme::TiKV) + .with_context("service", Scheme::Tikv) .with_context("endpoints", format!("{:?}", self.endpoints)), ); }; @@ -216,7 +212,7 @@ impl Adapter { impl kv::Adapter for Adapter { fn metadata(&self) -> kv::Metadata { kv::Metadata::new( - Scheme::TiKV, + Scheme::Tikv, "TiKV", Capability { read: true, diff --git a/core/src/services/tikv/mod.rs b/core/src/services/tikv/mod.rs index 75a3eb5d3209..a79f3d61d432 100644 --- a/core/src/services/tikv/mod.rs +++ b/core/src/services/tikv/mod.rs @@ -18,4 +18,4 @@ mod backend; -pub use backend::TiKVBuilder as TiKV; +pub use backend::TikvBuilder as Tikv; diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index 838bf4d6c79a..02dc46c62fd7 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -102,8 +102,8 @@ pub enum Scheme { Webhdfs, /// [redb][crate::services::Redb]: Redb Services Redb, - /// [tikv][crate::services::tikv]: TiKV Services - TiKV, + /// [tikv][crate::services::tikv]: Tikv Services + Tikv, /// Custom that allow users to implement services outside of OpenDAL. /// /// # NOTE @@ -211,7 +211,7 @@ impl From for &'static str { Scheme::Webdav => "webdav", Scheme::Webhdfs => "webhdfs", Scheme::Redb => "redb", - Scheme::TiKV => "tikv", + Scheme::Tikv => "tikv", Scheme::Custom(v) => v, } } From 2d6e2820c33a36dad69de1d93d6547a1225b71a9 Mon Sep 17 00:00:00 2001 From: owl Date: Sun, 23 Jul 2023 15:21:38 +0800 Subject: [PATCH 05/12] feat(services/tikv): fix code Signed-off-by: owl --- core/src/services/tikv/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/services/tikv/mod.rs b/core/src/services/tikv/mod.rs index a79f3d61d432..c3310120641c 100644 --- a/core/src/services/tikv/mod.rs +++ b/core/src/services/tikv/mod.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. - mod backend; pub use backend::TikvBuilder as Tikv; From edcee3ec97177e06ff20f3edee64d46535fde10b Mon Sep 17 00:00:00 2001 From: owl Date: Sun, 23 Jul 2023 15:25:37 +0800 Subject: [PATCH 06/12] feat(services/tikv): fix code Signed-off-by: owl --- core/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index 75bd373f776c..08ba3a91fcb4 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -50,7 +50,6 @@ default = [ "services-s3", "services-webdav", "services-webhdfs", - "services-tikv", ] # Build docs or not. From 35dd1db1935f7d80b27d003587ea1605f62c0644 Mon Sep 17 00:00:00 2001 From: owl Date: Sun, 23 Jul 2023 15:48:29 +0800 Subject: [PATCH 07/12] feat(services/tikv): fix code Signed-off-by: owl --- core/src/services/tikv/backend.rs | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/core/src/services/tikv/backend.rs b/core/src/services/tikv/backend.rs index fcee836cd02a..861087f385b8 100644 --- a/core/src/services/tikv/backend.rs +++ b/core/src/services/tikv/backend.rs @@ -119,21 +119,15 @@ impl Builder for TikvBuilder { .with_context("service", Scheme::Tikv) })?; - if self.insecure { - if self.ca_path.is_some() || self.key_path.is_some() || self.cert_path.is_some() { + if self.insecure && + (self.ca_path.is_some() || self.key_path.is_some() || self.cert_path.is_some()) { return Err( Error::new(ErrorKind::ConfigInvalid, "invalid tls configuration") .with_context("service", Scheme::Tikv) .with_context("endpoints", format!("{:?}", endpoints)), )?; } - } else if !(self.ca_path.is_some() && self.key_path.is_some() && self.cert_path.is_some()) { - return Err( - Error::new(ErrorKind::ConfigInvalid, "invalid tls configuration") - .with_context("service", Scheme::Tikv) - .with_context("endpoints", format!("{:?}", endpoints)), - )?; - } + Ok(Backend::new(Adapter { client: None, From 747ffb89d12625ca3f195937d4027d87b3ed32e2 Mon Sep 17 00:00:00 2001 From: owl Date: Sun, 23 Jul 2023 15:50:37 +0800 Subject: [PATCH 08/12] feat(services/tikv): fix code Signed-off-by: owl --- core/src/services/tikv/backend.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/services/tikv/backend.rs b/core/src/services/tikv/backend.rs index 861087f385b8..4496bbf837fe 100644 --- a/core/src/services/tikv/backend.rs +++ b/core/src/services/tikv/backend.rs @@ -119,15 +119,15 @@ impl Builder for TikvBuilder { .with_context("service", Scheme::Tikv) })?; - if self.insecure && - (self.ca_path.is_some() || self.key_path.is_some() || self.cert_path.is_some()) { - return Err( - Error::new(ErrorKind::ConfigInvalid, "invalid tls configuration") - .with_context("service", Scheme::Tikv) - .with_context("endpoints", format!("{:?}", endpoints)), - )?; - } - + if self.insecure + && (self.ca_path.is_some() || self.key_path.is_some() || self.cert_path.is_some()) + { + return Err( + Error::new(ErrorKind::ConfigInvalid, "invalid tls configuration") + .with_context("service", Scheme::Tikv) + .with_context("endpoints", format!("{:?}", endpoints)), + )?; + } Ok(Backend::new(Adapter { client: None, From 98c7c33caea934ace699405d5cd9c0d31e212aa9 Mon Sep 17 00:00:00 2001 From: owl Date: Sun, 23 Jul 2023 16:46:25 +0800 Subject: [PATCH 09/12] feat(services/tikv): fix code Signed-off-by: owl --- core/src/services/tikv/backend.rs | 33 +++++++++++++++---------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/core/src/services/tikv/backend.rs b/core/src/services/tikv/backend.rs index 4496bbf837fe..58d8e941280c 100644 --- a/core/src/services/tikv/backend.rs +++ b/core/src/services/tikv/backend.rs @@ -19,6 +19,9 @@ use std::collections::HashMap; use tikv_client::Config; use tikv_client::RawClient; +use tokio::sync::OnceCell; + +use crate::raw::adapters; use crate::raw::adapters::kv; use crate::Capability; use crate::Scheme; @@ -130,7 +133,7 @@ impl Builder for TikvBuilder { } Ok(Backend::new(Adapter { - client: None, + client: OnceCell::new(), endpoints, insecure: self.insecure, ca_path: self.ca_path.clone(), @@ -145,7 +148,7 @@ pub type Backend = kv::Backend; #[derive(Clone)] pub struct Adapter { - client: Option, + client: OnceCell, endpoints: Vec, insecure: bool, ca_path: Option, @@ -163,19 +166,13 @@ impl Debug for Adapter { impl Adapter { async fn get_connection(&self) -> Result { - if let Some(client) = self.client.clone() { - return Ok(client); + if let Some(client) = self.client.get() { + return Ok(client.clone()); } - let client = if self.insecure { RawClient::new(self.endpoints.clone()) .await - .map_err(|err| { - Error::new(ErrorKind::ConfigInvalid, "invalid configuration") - .with_context("service", Scheme::Tikv) - .with_context("endpoints", format!("{:?}", self.endpoints)) - .set_source(err) - })? + .map_err(parse_tikv_config_error)? } else if self.ca_path.is_some() && self.key_path.is_some() && self.cert_path.is_some() { let (ca_path, key_path, cert_path) = ( self.ca_path.clone().unwrap(), @@ -185,12 +182,7 @@ impl Adapter { let config = Config::default().with_security(ca_path, cert_path, key_path); RawClient::new_with_config(self.endpoints.clone(), config) .await - .map_err(|err| { - Error::new(ErrorKind::ConfigInvalid, "invalid configuration") - .with_context("service", Scheme::Tikv) - .with_context("endpoints", format!("{:?}", self.endpoints)) - .set_source(err) - })? + .map_err(parse_tikv_config_error)? } else { return Err( Error::new(ErrorKind::ConfigInvalid, "invalid configuration") @@ -198,6 +190,7 @@ impl Adapter { .with_context("endpoints", format!("{:?}", self.endpoints)), ); }; + self.client.set(client.clone()).ok(); Ok(client) } } @@ -245,3 +238,9 @@ impl kv::Adapter for Adapter { fn parse_tikv_error(e: tikv_client::Error) -> Error { Error::new(ErrorKind::Unexpected, "error from tikv").set_source(e) } + +fn parse_tikv_config_error(e: tikv_client::Error) -> Error { + Error::new(ErrorKind::ConfigInvalid, "invalid configuration") + .with_context("service", Scheme::Tikv) + .set_source(e) +} \ No newline at end of file From 6ceab0d559f7f33cd96bff14cc98dc8332572810 Mon Sep 17 00:00:00 2001 From: owl Date: Sun, 23 Jul 2023 16:48:26 +0800 Subject: [PATCH 10/12] feat(services/tikv): fix code Signed-off-by: owl --- core/src/services/tikv/backend.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/services/tikv/backend.rs b/core/src/services/tikv/backend.rs index 58d8e941280c..3733bc7c434f 100644 --- a/core/src/services/tikv/backend.rs +++ b/core/src/services/tikv/backend.rs @@ -20,8 +20,6 @@ use tikv_client::Config; use tikv_client::RawClient; use tokio::sync::OnceCell; - -use crate::raw::adapters; use crate::raw::adapters::kv; use crate::Capability; use crate::Scheme; @@ -243,4 +241,4 @@ fn parse_tikv_config_error(e: tikv_client::Error) -> Error { Error::new(ErrorKind::ConfigInvalid, "invalid configuration") .with_context("service", Scheme::Tikv) .set_source(e) -} \ No newline at end of file +} From 1fdc0a8640ff1c2cd7af7654e468cde880bd789c Mon Sep 17 00:00:00 2001 From: owl Date: Sun, 23 Jul 2023 17:13:08 +0800 Subject: [PATCH 11/12] feat(services/tikv): fix code Signed-off-by: owl --- core/src/services/tikv/backend.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/services/tikv/backend.rs b/core/src/services/tikv/backend.rs index 3733bc7c434f..9f13ba2d2d8a 100644 --- a/core/src/services/tikv/backend.rs +++ b/core/src/services/tikv/backend.rs @@ -19,11 +19,11 @@ use std::collections::HashMap; use tikv_client::Config; use tikv_client::RawClient; -use tokio::sync::OnceCell; use crate::raw::adapters::kv; use crate::Capability; use crate::Scheme; use async_trait::async_trait; +use tokio::sync::OnceCell; use crate::Builder; use crate::Error; From cfaadf76c3704084506c1e9b6afd44b280065bc8 Mon Sep 17 00:00:00 2001 From: owl Date: Sun, 23 Jul 2023 18:23:10 +0800 Subject: [PATCH 12/12] feat(services/tikv): fix code Signed-off-by: owl --- core/src/services/tikv/backend.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/services/tikv/backend.rs b/core/src/services/tikv/backend.rs index 9f13ba2d2d8a..c62d45ffa888 100644 --- a/core/src/services/tikv/backend.rs +++ b/core/src/services/tikv/backend.rs @@ -53,9 +53,8 @@ pub struct TikvBuilder { impl TikvBuilder { /// Set the network address of the TiKV service. pub fn endpoints(&mut self, endpoints: Vec) -> &mut Self { - let ep: Vec = endpoints.into_iter().collect(); - if !ep.is_empty() { - self.endpoints = Some(ep) + if !endpoints.is_empty() { + self.endpoints = Some(endpoints) } self } @@ -92,7 +91,7 @@ impl TikvBuilder { } impl Builder for TikvBuilder { - const SCHEME: Scheme = Scheme::Redb; + const SCHEME: Scheme = Scheme::Tikv; type Accessor = Backend; fn from_map(map: HashMap) -> Self {