Skip to content

Commit

Permalink
Make it configurable which metadata store to run
Browse files Browse the repository at this point in the history
This commit makes it configurable which metadata will be run
by the Node when starting the Restate server.
  • Loading branch information
tillrohrmann committed Nov 27, 2024
1 parent 0dd0374 commit 444cd75
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 72 deletions.
8 changes: 4 additions & 4 deletions crates/metadata-store/src/grpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use crate::grpc::pb_conversions::ConversionError;
use crate::grpc_svc::metadata_store_svc_server::MetadataStoreSvc;
use crate::grpc_svc::{DeleteRequest, GetRequest, GetResponse, GetVersionResponse, PutRequest};
use crate::{Error, MetadataStoreRequest, RequestSender};
use crate::{MetadataStoreRequest, RequestError, RequestSender};
use async_trait::async_trait;
use tokio::sync::oneshot;
use tonic::{Request, Response, Status};
Expand Down Expand Up @@ -129,10 +129,10 @@ impl MetadataStoreSvc for MetadataStoreHandler {
}
}

impl From<Error> for Status {
fn from(err: Error) -> Self {
impl From<RequestError> for Status {
fn from(err: RequestError) -> Self {
match err {
Error::FailedPrecondition(msg) => Status::failed_precondition(msg),
RequestError::FailedPrecondition(msg) => Status::failed_precondition(msg),
err => Status::internal(err.to_string()),
}
}
Expand Down
86 changes: 64 additions & 22 deletions crates/metadata-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,60 +17,102 @@ use restate_core::metadata_store::VersionedValue;
pub use restate_core::metadata_store::{
MetadataStoreClient, Precondition, ReadError, ReadModifyWriteError, WriteError,
};
use restate_core::ShutdownError;
use restate_types::errors::GenericError;
use restate_types::storage::{StorageDecodeError, StorageEncodeError};
use restate_types::Version;
use tokio::sync::{mpsc, oneshot};

pub type BoxedMetadataStoreService = Box<dyn MetadataStoreService>;

pub type RequestSender = mpsc::Sender<MetadataStoreRequest>;
pub type RequestReceiver = mpsc::Receiver<MetadataStoreRequest>;

type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, thiserror::Error)]
pub enum RequestError {
#[error("storage error: {0}")]
Storage(#[from] GenericError),
#[error("failed precondition: {0}")]
FailedPrecondition(String),
#[error("invalid argument: {0}")]
InvalidArgument(String),
#[error("encode error: {0}")]
Encode(#[from] StorageEncodeError),
#[error("decode error: {0}")]
Decode(#[from] StorageDecodeError),
}

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("error while running server grpc reflection service: {0}")]
GrpcReflection(#[from] tonic_reflection::server::Error),
#[error("system is shutting down")]
Shutdown(#[from] ShutdownError),
#[error(transparent)]
Generic(#[from] GenericError),
}

#[async_trait::async_trait]
pub trait MetadataStoreServiceBoxed {
async fn run_boxed(self: Box<Self>) -> Result<(), Error>;
}

#[async_trait::async_trait]
impl<T: MetadataStoreService> MetadataStoreServiceBoxed for T {
async fn run_boxed(self: Box<Self>) -> Result<(), Error> {
(*self).run().await
}
}

#[async_trait::async_trait]
pub trait MetadataStoreService: MetadataStoreServiceBoxed + Send {
async fn run(self) -> Result<(), Error>;

fn boxed(self) -> BoxedMetadataStoreService
where
Self: Sized + 'static,
{
Box::new(self)
}
}

#[async_trait::async_trait]
impl<T: MetadataStoreService + ?Sized> MetadataStoreService for Box<T> {
async fn run(self) -> Result<(), Error> {
self.run_boxed().await
}
}

#[derive(Debug)]
pub enum MetadataStoreRequest {
Get {
key: ByteString,
result_tx: oneshot::Sender<Result<Option<VersionedValue>>>,
result_tx: oneshot::Sender<Result<Option<VersionedValue>, RequestError>>,
},
GetVersion {
key: ByteString,
result_tx: oneshot::Sender<Result<Option<Version>>>,
result_tx: oneshot::Sender<Result<Option<Version>, RequestError>>,
},
Put {
key: ByteString,
value: VersionedValue,
precondition: Precondition,
result_tx: oneshot::Sender<Result<()>>,
result_tx: oneshot::Sender<Result<(), RequestError>>,
},
Delete {
key: ByteString,
precondition: Precondition,
result_tx: oneshot::Sender<Result<()>>,
result_tx: oneshot::Sender<Result<(), RequestError>>,
},
}

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("storage error: {0}")]
Storage(#[from] GenericError),
#[error("failed precondition: {0}")]
FailedPrecondition(String),
#[error("invalid argument: {0}")]
InvalidArgument(String),
#[error("encode error: {0}")]
Encode(#[from] StorageEncodeError),
#[error("decode error: {0}")]
Decode(#[from] StorageDecodeError),
}

impl Error {
impl RequestError {
fn kv_pair_exists() -> Self {
Error::FailedPrecondition("key-value pair already exists".to_owned())
RequestError::FailedPrecondition("key-value pair already exists".to_owned())
}

fn version_mismatch(expected: Version, actual: Option<Version>) -> Self {
Error::FailedPrecondition(format!(
RequestError::FailedPrecondition(format!(
"Expected version '{}' but found version '{:?}'",
expected, actual
))
Expand Down
4 changes: 2 additions & 2 deletions crates/metadata-store/src/local/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ use restate_types::{
errors::GenericError,
};

pub use service::{Error, LocalMetadataStoreService};

use crate::grpc::client::GrpcMetadataStoreClient;

pub use service::LocalMetadataStoreService;

/// Creates a [`MetadataStoreClient`] for the [`GrpcMetadataStoreClient`].
pub async fn create_client(
metadata_store_client_options: MetadataStoreClientOptions,
Expand Down
24 changes: 9 additions & 15 deletions crates/metadata-store/src/local/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@
use crate::grpc::handler::MetadataStoreHandler;
use crate::grpc::server::GrpcServer;
use crate::grpc::service_builder::GrpcServiceBuilder;
use crate::grpc_svc;
use crate::grpc_svc::metadata_store_svc_server::MetadataStoreSvcServer;
use crate::local::store::LocalMetadataStore;
use crate::{grpc_svc, Error, MetadataStoreService};
use futures::TryFutureExt;
use restate_core::{ShutdownError, TaskCenter, TaskKind};
use restate_rocksdb::RocksError;
use restate_core::{TaskCenter, TaskKind};
use restate_types::config::{MetadataStoreOptions, RocksDbOptions};
use restate_types::health::HealthStatus;
use restate_types::live::BoxedLiveLoad;
Expand All @@ -28,16 +27,6 @@ pub struct LocalMetadataStoreService {
rocksdb_options: BoxedLiveLoad<RocksDbOptions>,
}

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("error while running server grpc reflection service: {0}")]
GrpcReflection(#[from] tonic_reflection::server::Error),
#[error("system is shutting down")]
Shutdown(#[from] ShutdownError),
#[error("rocksdb error: {0}")]
RocksDB(#[from] RocksError),
}

impl LocalMetadataStoreService {
pub fn from_options(
health_status: HealthStatus<MetadataServerStatus>,
Expand All @@ -51,16 +40,21 @@ impl LocalMetadataStoreService {
rocksdb_options,
}
}
}

pub async fn run(self) -> Result<(), Error> {
#[async_trait::async_trait]
impl MetadataStoreService for LocalMetadataStoreService {
async fn run(self) -> Result<(), Error> {
let LocalMetadataStoreService {
health_status,
mut opts,
rocksdb_options,
} = self;
let options = opts.live_load();
let bind_address = options.bind_address.clone();
let store = LocalMetadataStore::create(options, rocksdb_options).await?;
let store = LocalMetadataStore::create(options, rocksdb_options)
.await
.map_err(|err| Error::Generic(err.into()))?;
let mut builder = GrpcServiceBuilder::default();

builder.register_file_descriptor_set_for_reflection(grpc_svc::FILE_DESCRIPTOR_SET);
Expand Down
36 changes: 18 additions & 18 deletions crates/metadata-store/src/local/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::{Error, MetadataStoreRequest, RequestReceiver, RequestSender, Result};
use crate::{MetadataStoreRequest, RequestError, RequestReceiver, RequestSender};
use bytes::BytesMut;
use bytestring::ByteString;
use restate_core::cancellation_watcher;
Expand Down Expand Up @@ -158,12 +158,12 @@ impl LocalMetadataStore {
};
}

fn get(&self, key: &ByteString) -> Result<Option<VersionedValue>> {
fn get(&self, key: &ByteString) -> Result<Option<VersionedValue>, RequestError> {
let cf_handle = self.kv_cf_handle();
let slice = self
.db
.get_pinned_cf(&cf_handle, key)
.map_err(|err| Error::Storage(err.into()))?;
.map_err(|err| RequestError::Storage(err.into()))?;

if let Some(bytes) = slice {
Ok(Some(Self::decode(bytes)?))
Expand All @@ -172,12 +172,12 @@ impl LocalMetadataStore {
}
}

fn get_version(&self, key: &ByteString) -> Result<Option<Version>> {
fn get_version(&self, key: &ByteString) -> Result<Option<Version>, RequestError> {
let cf_handle = self.kv_cf_handle();
let slice = self
.db
.get_pinned_cf(&cf_handle, key)
.map_err(|err| Error::Storage(err.into()))?;
.map_err(|err| RequestError::Storage(err.into()))?;

if let Some(bytes) = slice {
// todo only deserialize the version part
Expand All @@ -193,23 +193,23 @@ impl LocalMetadataStore {
key: &ByteString,
value: &VersionedValue,
precondition: Precondition,
) -> Result<()> {
) -> Result<(), RequestError> {
match precondition {
Precondition::None => Ok(self.write_versioned_kv_pair(key, value).await?),
Precondition::DoesNotExist => {
let current_version = self.get_version(key)?;
if current_version.is_none() {
Ok(self.write_versioned_kv_pair(key, value).await?)
} else {
Err(Error::kv_pair_exists())
Err(RequestError::kv_pair_exists())
}
}
Precondition::MatchesVersion(version) => {
let current_version = self.get_version(key)?;
if current_version == Some(version) {
Ok(self.write_versioned_kv_pair(key, value).await?)
} else {
Err(Error::version_mismatch(version, current_version))
Err(RequestError::version_mismatch(version, current_version))
}
}
}
Expand All @@ -219,7 +219,7 @@ impl LocalMetadataStore {
&mut self,
key: &ByteString,
value: &VersionedValue,
) -> Result<()> {
) -> Result<(), RequestError> {
self.buffer.clear();
Self::encode(value, &mut self.buffer)?;

Expand All @@ -236,10 +236,10 @@ impl LocalMetadataStore {
wb,
)
.await
.map_err(|err| Error::Storage(err.into()))
.map_err(|err| RequestError::Storage(err.into()))
}

fn delete(&mut self, key: &ByteString, precondition: Precondition) -> Result<()> {
fn delete(&mut self, key: &ByteString, precondition: Precondition) -> Result<(), RequestError> {
match precondition {
Precondition::None => self.delete_kv_pair(key),
// this condition does not really make sense for the delete operation
Expand All @@ -250,7 +250,7 @@ impl LocalMetadataStore {
// nothing to do
Ok(())
} else {
Err(Error::kv_pair_exists())
Err(RequestError::kv_pair_exists())
}
}
Precondition::MatchesVersion(version) => {
Expand All @@ -259,30 +259,30 @@ impl LocalMetadataStore {
if current_version == Some(version) {
self.delete_kv_pair(key)
} else {
Err(Error::version_mismatch(version, current_version))
Err(RequestError::version_mismatch(version, current_version))
}
}
}
}

fn delete_kv_pair(&mut self, key: &ByteString) -> Result<()> {
fn delete_kv_pair(&mut self, key: &ByteString) -> Result<(), RequestError> {
let write_options = self.write_options();
self.db
.delete_cf_opt(&self.kv_cf_handle(), key, &write_options)
.map_err(|err| Error::Storage(err.into()))
.map_err(|err| RequestError::Storage(err.into()))
}

fn encode<T: StorageEncode>(value: &T, buf: &mut BytesMut) -> Result<()> {
fn encode<T: StorageEncode>(value: &T, buf: &mut BytesMut) -> Result<(), RequestError> {
StorageCodec::encode(value, buf)?;
Ok(())
}

fn decode<T: StorageDecode>(buf: impl AsRef<[u8]>) -> Result<T> {
fn decode<T: StorageDecode>(buf: impl AsRef<[u8]>) -> Result<T, RequestError> {
let value = StorageCodec::decode(&mut buf.as_ref())?;
Ok(value)
}

fn log_error<T>(result: &Result<T>, request: &str) {
fn log_error<T>(result: &Result<T, RequestError>, request: &str) {
if let Err(err) = &result {
debug!("failed to process request '{}': '{}'", request, err)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/metadata-store/src/local/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use restate_types::{flexbuffers_storage_encode_decode, Version, Versioned};

use crate::grpc::client::GrpcMetadataStoreClient;
use crate::local::service::LocalMetadataStoreService;
use crate::{MetadataStoreClient, Precondition, WriteError};
use crate::{MetadataStoreClient, MetadataStoreService, Precondition, WriteError};

#[derive(Debug, Clone, PartialOrd, PartialEq, Serialize, Deserialize)]
struct Value {
Expand Down
Loading

0 comments on commit 444cd75

Please sign in to comment.