diff --git a/Cargo.lock b/Cargo.lock index 95a2dea104..269b8757cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2544,6 +2544,16 @@ dependencies = [ "dirs-sys", ] +[[package]] +name = "dirs-next" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b98cf8ebf19c3d1b223e151f99a4f9f0690dca41414773390fc824184ac833e1" +dependencies = [ + "cfg-if", + "dirs-sys-next", +] + [[package]] name = "dirs-sys" version = "0.4.1" @@ -2556,6 +2566,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "dirs-sys-next" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d" +dependencies = [ + "libc", + "redox_users", + "winapi", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -3001,6 +3022,15 @@ dependencies = [ "slab", ] +[[package]] +name = "fxhash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +dependencies = [ + "byteorder", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -3024,6 +3054,18 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "getset" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f636605b743120a8d32ed92fc27b6cde1a769f8f936c065151eb66f88ded513c" +dependencies = [ + "proc-macro-error2", + "proc-macro2", + "quote", + "syn 2.0.85", +] + [[package]] name = "gimli" version = "0.28.1" @@ -5437,6 +5479,36 @@ dependencies = [ "prost", ] +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" +dependencies = [ + "bytes", +] + +[[package]] +name = "protobuf-build" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2df9942df2981178a930a72d442de47e2f0df18ad68e50a30f816f1848215ad0" +dependencies = [ + "bitflags 1.3.2", + "protobuf", + "protobuf-codegen", + "regex", +] + +[[package]] +name = "protobuf-codegen" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "033460afb75cf755fcfc16dfaed20b86468082a2ea24e05ac35ab4a099a017d6" +dependencies = [ + "protobuf", +] + [[package]] name = "ptr_meta" version = "0.1.4" @@ -5554,6 +5626,36 @@ dependencies = [ "nibble_vec", ] +[[package]] +name = "raft" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f12688b23a649902762d4c11d854d73c49c9b93138f2de16403ef9f571ad5bae" +dependencies = [ + "bytes", + "fxhash", + "getset", + "protobuf", + "raft-proto", + "rand", + "slog", + "slog-envlogger", + "slog-stdlog", + "slog-term", + "thiserror", +] + +[[package]] +name = "raft-proto" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb6884896294f553e8d5cfbdb55080b9f5f2f43394afff59c9f077e0f4b46d6b" +dependencies = [ + "bytes", + "protobuf", + "protobuf-build", +] + [[package]] name = "rand" version = "0.8.5" @@ -6407,12 +6509,15 @@ dependencies = [ "hyper-util", "prost", "prost-types", + "protobuf", + "raft", "restate-core", "restate-rocksdb", "restate-types", "rust-rocksdb", "schemars", "serde", + "slog", "static_assertions", "tempfile", "test-log", @@ -6427,7 +6532,9 @@ dependencies = [ "tower", "tower-http 0.5.2", "tracing", + "tracing-slog", "tracing-subscriber", + "ulid", ] [[package]] @@ -7755,6 +7862,74 @@ dependencies = [ "autocfg", ] +[[package]] +name = "slog" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8347046d4ebd943127157b94d63abb990fcf729dc4e9978927fdf4ac3c998d06" + +[[package]] +name = "slog-async" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72c8038f898a2c79507940990f05386455b3a317d8f18d4caea7cbc3d5096b84" +dependencies = [ + "crossbeam-channel", + "slog", + "take_mut", + "thread_local", +] + +[[package]] +name = "slog-envlogger" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "906a1a0bc43fed692df4b82a5e2fbfc3733db8dad8bb514ab27a4f23ad04f5c0" +dependencies = [ + "log", + "regex", + "slog", + "slog-async", + "slog-scope", + "slog-stdlog", + "slog-term", +] + +[[package]] +name = "slog-scope" +version = "4.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f95a4b4c3274cd2869549da82b57ccc930859bdbf5bcea0424bc5f140b3c786" +dependencies = [ + "arc-swap", + "lazy_static", + "slog", +] + +[[package]] +name = "slog-stdlog" +version = "4.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6706b2ace5bbae7291d3f8d2473e2bfab073ccd7d03670946197aec98471fa3e" +dependencies = [ + "log", + "slog", + "slog-scope", +] + +[[package]] +name = "slog-term" +version = "2.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6e022d0b998abfe5c3782c1f03551a596269450ccd677ea51c56f8b214610e8" +dependencies = [ + "is-terminal", + "slog", + "term", + "thread_local", + "time", +] + [[package]] name = "smallvec" version = "1.13.2" @@ -8051,6 +8226,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" +[[package]] +name = "take_mut" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60" + [[package]] name = "tap" version = "1.0.1" @@ -8069,6 +8250,17 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "term" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c59df8ac95d96ff9bede18eb7300b0fda5e5d8d90960e76f8e14ae765eedbf1f" +dependencies = [ + "dirs-next", + "rustversion", + "winapi", +] + [[package]] name = "termcolor" version = "1.4.1" @@ -8636,6 +8828,17 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-slog" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9306d2ca06aa9dfc8aa729ff884e9dca181f588a298cc5c59d4fdd91372570bf" +dependencies = [ + "once_cell", + "slog", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.18" @@ -8782,6 +8985,7 @@ checksum = "34778c17965aa2a08913b57e1f34db9b4a63f5de31768b55bf20d2795f921259" dependencies = [ "getrandom", "rand", + "serde", "web-time", ] diff --git a/crates/metadata-store/Cargo.toml b/crates/metadata-store/Cargo.toml index b04d107c80..b1bf2753b8 100644 --- a/crates/metadata-store/Cargo.toml +++ b/crates/metadata-store/Cargo.toml @@ -18,6 +18,7 @@ restate-rocksdb = { workspace = true } restate-types = { workspace = true } anyhow = { workspace = true } +assert2 = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } bytestring = { workspace = true } @@ -29,9 +30,12 @@ hyper = { workspace = true } hyper-util = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } +protobuf = "2.28.0" +raft = { version = "0.7.0" } rocksdb = { workspace = true } schemars = { workspace = true, optional = true } serde = { workspace = true } +slog = { version = "2.7.0" } static_assertions = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } @@ -43,6 +47,8 @@ tonic-health = { workspace = true } tower = { workspace = true } tower-http = { workspace = true, features = ["trace"] } tracing = { workspace = true } +tracing-slog = { version = "0.3.0" } +ulid = { workspace = true, features = ["serde"] } [dev-dependencies] restate-core = { workspace = true, features = ["test-util"] } diff --git a/crates/metadata-store/src/grpc/handler.rs b/crates/metadata-store/src/grpc/handler.rs index 4ee984aaa2..601ecbbce1 100644 --- a/crates/metadata-store/src/grpc/handler.rs +++ b/crates/metadata-store/src/grpc/handler.rs @@ -132,7 +132,7 @@ impl MetadataStoreSvc for MetadataStoreHandler { impl From for Status { fn from(err: RequestError) -> Self { match err { - RequestError::FailedPrecondition(msg) => Status::failed_precondition(msg), + RequestError::FailedPrecondition(msg) => Status::failed_precondition(msg.to_string()), err => Status::internal(err.to_string()), } } diff --git a/crates/metadata-store/src/lib.rs b/crates/metadata-store/src/lib.rs index 018be71a30..8524a1beff 100644 --- a/crates/metadata-store/src/lib.rs +++ b/crates/metadata-store/src/lib.rs @@ -11,6 +11,7 @@ mod grpc; mod grpc_svc; pub mod local; +pub mod raft; use bytestring::ByteString; use restate_core::metadata_store::VersionedValue; @@ -30,10 +31,10 @@ pub type RequestReceiver = mpsc::Receiver; #[derive(Debug, thiserror::Error)] pub enum RequestError { - #[error("storage error: {0}")] - Storage(#[from] GenericError), + #[error("internal error: {0}")] + Internal(#[from] GenericError), #[error("failed precondition: {0}")] - FailedPrecondition(String), + FailedPrecondition(#[from] PreconditionViolation), #[error("invalid argument: {0}")] InvalidArgument(String), #[error("encode error: {0}")] @@ -42,6 +43,27 @@ pub enum RequestError { Decode(#[from] StorageDecodeError), } +#[derive(Debug, thiserror::Error)] +pub enum PreconditionViolation { + #[error("key-value pair already exists")] + Exists, + #[error("expected version '{expected}' but found version '{actual:?}'")] + VersionMismatch { + expected: Version, + actual: Option, + }, +} + +impl PreconditionViolation { + fn kv_pair_exists() -> Self { + PreconditionViolation::Exists + } + + fn version_mismatch(expected: Version, actual: Option) -> Self { + PreconditionViolation::VersionMismatch { expected, actual } + } +} + #[derive(Debug, thiserror::Error)] pub enum Error { #[error("error while running server grpc reflection service: {0}")] @@ -52,6 +74,12 @@ pub enum Error { Generic(#[from] GenericError), } +impl Error { + pub fn generic(err: impl Into) -> Error { + Error::Generic(err.into()) + } +} + #[async_trait::async_trait] pub trait MetadataStoreServiceBoxed { async fn run_boxed(self: Box) -> Result<(), Error>; @@ -105,15 +133,3 @@ pub enum MetadataStoreRequest { result_tx: oneshot::Sender>, }, } - -impl RequestError { - fn kv_pair_exists() -> Self { - RequestError::FailedPrecondition("key-value pair already exists".to_owned()) - } - - fn version_mismatch(expected: Version, actual: Option) -> Self { - RequestError::FailedPrecondition(format!( - "Expected version '{expected}' but found version '{actual:?}'" - )) - } -} diff --git a/crates/metadata-store/src/local/store.rs b/crates/metadata-store/src/local/store.rs index 26223a9117..5e812cd807 100644 --- a/crates/metadata-store/src/local/store.rs +++ b/crates/metadata-store/src/local/store.rs @@ -8,7 +8,9 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::{MetadataStoreRequest, RequestError, RequestReceiver, RequestSender}; +use crate::{ + MetadataStoreRequest, PreconditionViolation, RequestError, RequestReceiver, RequestSender, +}; use bytes::BytesMut; use bytestring::ByteString; use restate_core::cancellation_watcher; @@ -163,7 +165,7 @@ impl LocalMetadataStore { let slice = self .db .get_pinned_cf(&cf_handle, key) - .map_err(|err| RequestError::Storage(err.into()))?; + .map_err(|err| RequestError::Internal(err.into()))?; if let Some(bytes) = slice { Ok(Some(Self::decode(bytes)?)) @@ -177,7 +179,7 @@ impl LocalMetadataStore { let slice = self .db .get_pinned_cf(&cf_handle, key) - .map_err(|err| RequestError::Storage(err.into()))?; + .map_err(|err| RequestError::Internal(err.into()))?; if let Some(bytes) = slice { // todo only deserialize the version part @@ -201,7 +203,7 @@ impl LocalMetadataStore { if current_version.is_none() { Ok(self.write_versioned_kv_pair(key, value).await?) } else { - Err(RequestError::kv_pair_exists()) + Err(PreconditionViolation::kv_pair_exists())? } } Precondition::MatchesVersion(version) => { @@ -209,7 +211,10 @@ impl LocalMetadataStore { if current_version == Some(version) { Ok(self.write_versioned_kv_pair(key, value).await?) } else { - Err(RequestError::version_mismatch(version, current_version)) + Err(PreconditionViolation::version_mismatch( + version, + current_version, + ))? } } } @@ -236,7 +241,7 @@ impl LocalMetadataStore { wb, ) .await - .map_err(|err| RequestError::Storage(err.into())) + .map_err(|err| RequestError::Internal(err.into())) } fn delete(&mut self, key: &ByteString, precondition: Precondition) -> Result<(), RequestError> { @@ -250,7 +255,7 @@ impl LocalMetadataStore { // nothing to do Ok(()) } else { - Err(RequestError::kv_pair_exists()) + Err(PreconditionViolation::kv_pair_exists())? } } Precondition::MatchesVersion(version) => { @@ -259,7 +264,10 @@ impl LocalMetadataStore { if current_version == Some(version) { self.delete_kv_pair(key) } else { - Err(RequestError::version_mismatch(version, current_version)) + Err(PreconditionViolation::version_mismatch( + version, + current_version, + ))? } } } @@ -269,7 +277,7 @@ impl LocalMetadataStore { let write_options = self.write_options(); self.db .delete_cf_opt(&self.kv_cf_handle(), key, &write_options) - .map_err(|err| RequestError::Storage(err.into())) + .map_err(|err| RequestError::Internal(err.into())) } fn encode(value: &T, buf: &mut BytesMut) -> Result<(), RequestError> { diff --git a/crates/metadata-store/src/raft/mod.rs b/crates/metadata-store/src/raft/mod.rs new file mode 100644 index 0000000000..7079972204 --- /dev/null +++ b/crates/metadata-store/src/raft/mod.rs @@ -0,0 +1,12 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +pub mod service; +mod store; diff --git a/crates/metadata-store/src/raft/service.rs b/crates/metadata-store/src/raft/service.rs new file mode 100644 index 0000000000..1df3a02048 --- /dev/null +++ b/crates/metadata-store/src/raft/service.rs @@ -0,0 +1,67 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::grpc::handler::MetadataStoreHandler; +use crate::grpc::server::GrpcServer; +use crate::grpc::service_builder::GrpcServiceBuilder; +use crate::grpc_svc::metadata_store_svc_server::MetadataStoreSvcServer; +use crate::raft::store::RaftMetadataStore; +use crate::{grpc_svc, Error, MetadataStoreService}; +use futures::TryFutureExt; +use restate_core::{TaskCenter, TaskKind}; +use restate_types::config::MetadataStoreOptions; +use restate_types::health::HealthStatus; +use restate_types::live::BoxedLiveLoad; +use restate_types::protobuf::common::MetadataServerStatus; + +pub struct RaftMetadataStoreService { + health_status: HealthStatus, + options: BoxedLiveLoad, +} + +impl RaftMetadataStoreService { + pub fn new( + health_status: HealthStatus, + options: BoxedLiveLoad, + ) -> Self { + Self { + options, + health_status, + } + } +} + +#[async_trait::async_trait] +impl MetadataStoreService for RaftMetadataStoreService { + async fn run(mut self) -> Result<(), Error> { + let store_options = self.options.live_load(); + let store = RaftMetadataStore::new().map_err(Error::generic)?; + + let mut builder = GrpcServiceBuilder::default(); + + builder.register_file_descriptor_set_for_reflection(grpc_svc::FILE_DESCRIPTOR_SET); + builder.add_service(MetadataStoreSvcServer::new(MetadataStoreHandler::new( + store.request_sender(), + ))); + + let grpc_server = + GrpcServer::new(store_options.bind_address.clone(), builder.build().await?); + + TaskCenter::spawn_child( + TaskKind::RpcServer, + "metadata-store-grpc", + grpc_server.run(self.health_status).map_err(Into::into), + )?; + + store.run().await.map_err(Error::generic)?; + + Ok(()) + } +} diff --git a/crates/metadata-store/src/raft/store.rs b/crates/metadata-store/src/raft/store.rs new file mode 100644 index 0000000000..94292a2d34 --- /dev/null +++ b/crates/metadata-store/src/raft/store.rs @@ -0,0 +1,530 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::{ + MetadataStoreRequest, PreconditionViolation, RequestError, RequestReceiver, RequestSender, +}; +use assert2::let_assert; +use bytes::{Bytes, BytesMut}; +use bytestring::ByteString; +use protobuf::{Message as ProtobufMessage, ProtobufError}; +use raft::prelude::{ConfChange, ConfChangeV2, ConfState, Entry, EntryType, Message}; +use raft::storage::MemStorage; +use raft::{Config, RawNode}; +use restate_core::cancellation_watcher; +use restate_core::metadata_store::{Precondition, VersionedValue}; +use restate_types::storage::{StorageCodec, StorageDecodeError, StorageEncodeError}; +use restate_types::{flexbuffers_storage_encode_decode, Version}; +use slog::o; +use std::collections::HashMap; +use std::time::Duration; +use tokio::sync::{mpsc, oneshot}; +use tokio::time; +use tokio::time::MissedTickBehavior; +use tracing::{debug, info, warn}; +use tracing_slog::TracingSlogDrain; +use ulid::Ulid; + +#[derive(Debug, thiserror::Error)] +#[error("failed creating raft node: {0}")] +pub struct BuildError(#[from] raft::Error); + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("failed appending entries: {0}")] + Append(#[from] raft::Error), + #[error("failed deserializing raft serialized requests: {0}")] + DecodeRequest(StorageDecodeError), + #[error("failed deserializing conf change: {0}")] + DecodeConf(ProtobufError), + #[error("failed applying conf change: {0}")] + ApplyConfChange(raft::Error), +} + +pub struct RaftMetadataStore { + _logger: slog::Logger, + raw_node: RawNode, + tick_interval: time::Interval, + + callbacks: HashMap, + kv_entries: HashMap, + + request_tx: RequestSender, + request_rx: RequestReceiver, +} + +impl RaftMetadataStore { + pub fn new() -> Result { + let (request_tx, request_rx) = mpsc::channel(2); + + let config = Config { + id: 1, + ..Default::default() + }; + + let store = MemStorage::new_with_conf_state(ConfState::from((vec![1], vec![]))); + let drain = TracingSlogDrain; + let logger = slog::Logger::root(drain, o!()); + + let raw_node = RawNode::new(&config, store, &logger)?; + + let mut tick_interval = time::interval(Duration::from_millis(100)); + tick_interval.set_missed_tick_behavior(MissedTickBehavior::Burst); + + Ok(Self { + // we only need to keep it alive + _logger: logger, + raw_node, + tick_interval, + callbacks: HashMap::default(), + kv_entries: HashMap::default(), + request_rx, + request_tx, + }) + } + + pub fn request_sender(&self) -> RequestSender { + self.request_tx.clone() + } + + pub async fn run(mut self) -> Result<(), Error> { + let mut cancellation = std::pin::pin!(cancellation_watcher()); + + loop { + tokio::select! { + _ = &mut cancellation => { + break; + } + Some(request) = self.request_rx.recv() => { + // todo: Unclear whether every replica should be allowed to propose. Maybe + // only the leader should propose and respond to clients. + let (callback, request) = Self::split_request(request); + + if let Err(err) = request + .encode_to_vec() + .map_err(Into::into) + .and_then(|request| self.raw_node + .propose(vec![], request) + .map_err(|err| RequestError::Internal(err.into()))) { + info!("Failed processing request: {err}"); + callback.fail(err); + continue; + } + + self.register_callback(callback); + } + _ = self.tick_interval.tick() => { + self.raw_node.tick(); + } + } + + self.on_ready()?; + } + + debug!("Stop running RaftMetadataStore."); + + Ok(()) + } + + fn on_ready(&mut self) -> Result<(), Error> { + if !self.raw_node.has_ready() { + return Ok(()); + } + + let mut ready = self.raw_node.ready(); + + // first need to send outgoing messages + if !ready.messages().is_empty() { + self.send_messages(ready.take_messages()); + } + + // apply snapshot if one was sent + if !ready.snapshot().is_empty() { + if let Err(err) = self + .raw_node + .store() + .wl() + .apply_snapshot(ready.snapshot().clone()) + { + warn!("failed applying snapshot: {err}"); + } + } + + // then handle committed entries + self.handle_committed_entries(ready.take_committed_entries())?; + + // append new Raft entries to storage + self.raw_node.store().wl().append(ready.entries())?; + + // update the hard state if an update was produced (e.g. vote has happened) + if let Some(hs) = ready.hs() { + self.raw_node.store().wl().set_hardstate(hs.clone()); + } + + // send persisted messages (after entries were appended and hard state was updated) + if !ready.persisted_messages().is_empty() { + self.send_messages(ready.take_persisted_messages()); + } + + // advance the raft node + let mut light_ready = self.raw_node.advance(ready); + + // update the commit index if it changed + if let Some(commit) = light_ready.commit_index() { + self.raw_node + .store() + .wl() + .mut_hard_state() + .set_commit(commit); + } + + // send outgoing messages + if !light_ready.messages().is_empty() { + self.send_messages(light_ready.take_messages()); + } + + // handle committed entries + if !light_ready.committed_entries().is_empty() { + self.handle_committed_entries(light_ready.take_committed_entries())?; + } + + self.raw_node.advance_apply(); + + Ok(()) + } + + fn register_callback(&mut self, callback: Callback) { + self.callbacks.insert(callback.request_id, callback); + } + + fn send_messages(&self, _messages: Vec) { + // todo: Send messages to other peers + } + + fn handle_committed_entries(&mut self, committed_entries: Vec) -> Result<(), Error> { + for entry in committed_entries { + if entry.data.is_empty() { + // new leader was elected + continue; + } + + match entry.get_entry_type() { + EntryType::EntryNormal => self.handle_normal_entry(entry)?, + EntryType::EntryConfChange => self.handle_conf_change(entry)?, + EntryType::EntryConfChangeV2 => self.handle_conf_change_v2(entry)?, + } + } + + Ok(()) + } + + fn handle_normal_entry(&mut self, entry: Entry) -> Result<(), Error> { + let request = Request::decode_from_bytes(entry.data).map_err(Error::DecodeRequest)?; + self.handle_request(request); + + Ok(()) + } + + fn handle_request(&mut self, request: Request) { + match request.kind { + RequestKind::Get { key } => { + let result = self.get(key); + if let Some(callback) = self.callbacks.remove(&request.request_id) { + callback.complete_get(result); + } + } + RequestKind::GetVersion { key } => { + let result = self.get_version(key); + if let Some(callback) = self.callbacks.remove(&request.request_id) { + callback.complete_get_version(result); + } + } + RequestKind::Put { + key, + value, + precondition, + } => { + let result = self.put(key, value, precondition); + if let Some(callback) = self.callbacks.remove(&request.request_id) { + callback.complete_put(result.map_err(Into::into)); + } + } + RequestKind::Delete { key, precondition } => { + let result = self.delete(key, precondition); + if let Some(callback) = self.callbacks.remove(&request.request_id) { + callback.complete_delete(result.map_err(Into::into)); + } + } + } + } + + fn get(&self, key: ByteString) -> Option { + self.kv_entries.get(&key).cloned() + } + + fn get_version(&self, key: ByteString) -> Option { + self.kv_entries.get(&key).map(|entry| entry.version) + } + + fn put( + &mut self, + key: ByteString, + value: VersionedValue, + precondition: Precondition, + ) -> Result<(), PreconditionViolation> { + match precondition { + Precondition::None => { + self.kv_entries.insert(key, value); + } + Precondition::DoesNotExist => { + if self.kv_entries.contains_key(&key) { + return Err(PreconditionViolation::kv_pair_exists()); + } + + self.kv_entries.insert(key, value); + } + Precondition::MatchesVersion(expected_version) => { + let actual_version = self.kv_entries.get(&key).map(|entry| entry.version); + + if actual_version == Some(expected_version) { + self.kv_entries.insert(key, value); + } else { + return Err(PreconditionViolation::version_mismatch( + expected_version, + actual_version, + )); + } + } + } + + Ok(()) + } + + fn delete( + &mut self, + key: ByteString, + precondition: Precondition, + ) -> Result<(), PreconditionViolation> { + match precondition { + Precondition::None => { + self.kv_entries.remove(&key); + } + Precondition::DoesNotExist => { + if self.kv_entries.contains_key(&key) { + return Err(PreconditionViolation::kv_pair_exists()); + } + } + Precondition::MatchesVersion(expected_version) => { + let actual_version = self.kv_entries.get(&key).map(|entry| entry.version); + + if actual_version == Some(expected_version) { + self.kv_entries.remove(&key); + } else { + return Err(PreconditionViolation::version_mismatch( + expected_version, + actual_version, + )); + } + } + } + + Ok(()) + } + + fn handle_conf_change(&mut self, entry: Entry) -> Result<(), Error> { + let mut cc = ConfChange::default(); + cc.merge_from_bytes(&entry.data) + .map_err(Error::DecodeConf)?; + let cs = self + .raw_node + .apply_conf_change(&cc) + .map_err(Error::ApplyConfChange)?; + self.raw_node.store().wl().set_conf_state(cs); + Ok(()) + } + + fn handle_conf_change_v2(&mut self, entry: Entry) -> Result<(), Error> { + let mut cc = ConfChangeV2::default(); + cc.merge_from_bytes(&entry.data) + .map_err(Error::DecodeConf)?; + let cs = self + .raw_node + .apply_conf_change(&cc) + .map_err(Error::ApplyConfChange)?; + self.raw_node.store().wl().set_conf_state(cs); + Ok(()) + } + + fn split_request(request: MetadataStoreRequest) -> (Callback, Request) { + let (request_kind, callback_kind) = match request { + MetadataStoreRequest::Get { key, result_tx } => { + (RequestKind::Get { key }, CallbackKind::Get { result_tx }) + } + MetadataStoreRequest::GetVersion { key, result_tx } => ( + RequestKind::GetVersion { key }, + CallbackKind::GetVersion { result_tx }, + ), + MetadataStoreRequest::Put { + key, + value, + precondition, + result_tx, + } => ( + RequestKind::Put { + key, + value, + precondition, + }, + CallbackKind::Put { result_tx }, + ), + MetadataStoreRequest::Delete { + key, + precondition, + result_tx, + } => ( + RequestKind::Delete { key, precondition }, + CallbackKind::Delete { result_tx }, + ), + }; + + let request_id = Ulid::new(); + + let callback = Callback { + request_id, + kind: callback_kind, + }; + + let request = Request { + request_id, + kind: request_kind, + }; + + (callback, request) + } +} + +struct Callback { + request_id: Ulid, + kind: CallbackKind, +} + +impl Callback { + fn fail(self, err: impl Into) { + match self.kind { + CallbackKind::Get { result_tx } => { + // err only if the oneshot receiver has gone away + let _ = result_tx.send(Err(err.into())); + } + CallbackKind::GetVersion { result_tx } => { + // err only if the oneshot receiver has gone away + let _ = result_tx.send(Err(err.into())); + } + CallbackKind::Put { result_tx } => { + // err only if the oneshot receiver has gone away + let _ = result_tx.send(Err(err.into())); + } + CallbackKind::Delete { result_tx } => { + // err only if the oneshot receiver has gone away + let _ = result_tx.send(Err(err.into())); + } + }; + } + + fn complete_get(self, result: Option) { + let_assert!( + CallbackKind::Get { result_tx } = self.kind, + "expected 'Get' callback" + ); + // err if caller has gone + let _ = result_tx.send(Ok(result)); + } + + fn complete_get_version(self, result: Option) { + let_assert!( + CallbackKind::GetVersion { result_tx } = self.kind, + "expected 'GetVersion' callback" + ); + // err if caller has gone + let _ = result_tx.send(Ok(result)); + } + + fn complete_put(self, result: Result<(), RequestError>) { + let_assert!( + CallbackKind::Put { result_tx } = self.kind, + "expected 'Put' callback" + ); + // err if caller has gone + let _ = result_tx.send(result); + } + + fn complete_delete(self, result: Result<(), RequestError>) { + let_assert!( + CallbackKind::Delete { result_tx } = self.kind, + "expected 'Delete' callback" + ); + // err if caller has gone + let _ = result_tx.send(result); + } +} + +enum CallbackKind { + Get { + result_tx: oneshot::Sender, RequestError>>, + }, + GetVersion { + result_tx: oneshot::Sender, RequestError>>, + }, + Put { + result_tx: oneshot::Sender>, + }, + Delete { + result_tx: oneshot::Sender>, + }, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +struct Request { + request_id: Ulid, + kind: RequestKind, +} + +flexbuffers_storage_encode_decode!(Request); + +impl Request { + fn encode_to_vec(&self) -> Result, StorageEncodeError> { + let mut buffer = BytesMut::new(); + // todo: Removing support for BufMut requires an extra copy from BytesMut to Vec :-( + StorageCodec::encode(self, &mut buffer)?; + Ok(buffer.to_vec()) + } + + fn decode_from_bytes(mut bytes: Bytes) -> Result { + StorageCodec::decode::(&mut bytes) + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +enum RequestKind { + Get { + key: ByteString, + }, + GetVersion { + key: ByteString, + }, + Put { + key: ByteString, + value: VersionedValue, + precondition: Precondition, + }, + Delete { + key: ByteString, + precondition: Precondition, + }, +} diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 7c12d31fe8..640d2fd065 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -31,6 +31,7 @@ use restate_core::{ #[cfg(feature = "replicated-loglet")] use restate_log_server::LogServerService; use restate_metadata_store::local::LocalMetadataStoreService; +use restate_metadata_store::raft::service::RaftMetadataStoreService; use restate_metadata_store::{ BoxedMetadataStoreService, MetadataStoreClient, MetadataStoreService, }; @@ -318,9 +319,11 @@ impl Node { .boxed(), ) .boxed(), - Kind::Raft => { - unimplemented!("not yet supported") - } + Kind::Raft => RaftMetadataStoreService::new( + health_status, + updateable_config.clone().map(|c| &c.metadata_store).boxed(), + ) + .boxed(), } }