Skip to content

Commit

Permalink
Make grpc server reusable by other metadata store implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Nov 27, 2024
1 parent 49dec91 commit 0dd0374
Show file tree
Hide file tree
Showing 10 changed files with 279 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ use restate_core::network::net_util::CommonClientConnectionOptions;
use restate_types::net::AdvertisedAddress;
use restate_types::Version;

use crate::grpc::pb_conversions::ConversionError;
use crate::grpc_svc::metadata_store_svc_client::MetadataStoreSvcClient;
use crate::grpc_svc::{DeleteRequest, GetRequest, PutRequest};
use crate::local::grpc::pb_conversions::ConversionError;

/// Client end to interact with the [`LocalMetadataStore`].
/// Client end to interact with the metadata store.
#[derive(Debug, Clone)]
pub struct LocalMetadataStoreClient {
pub struct GrpcMetadataStoreClient {
svc_client: MetadataStoreSvcClient<Channel>,
}

impl LocalMetadataStoreClient {
impl GrpcMetadataStoreClient {
pub fn new<T: CommonClientConnectionOptions>(
metadata_store_address: AdvertisedAddress,
options: &T,
Expand All @@ -45,7 +45,7 @@ impl LocalMetadataStoreClient {
}

#[async_trait]
impl MetadataStore for LocalMetadataStoreClient {
impl MetadataStore for GrpcMetadataStoreClient {
async fn get(&self, key: ByteString) -> Result<Option<VersionedValue>, ReadError> {
let response = self
.svc_client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,28 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::grpc::pb_conversions::ConversionError;
use crate::grpc_svc::metadata_store_svc_server::MetadataStoreSvc;
use crate::grpc_svc::{DeleteRequest, GetRequest, GetResponse, GetVersionResponse, PutRequest};
use crate::local::grpc::pb_conversions::ConversionError;
use crate::local::store::{Error, MetadataStoreRequest, RequestSender};
use crate::{Error, MetadataStoreRequest, RequestSender};
use async_trait::async_trait;
use tokio::sync::oneshot;
use tonic::{Request, Response, Status};

/// Grpc svc handler for the [`LocalMetadataStore`].
/// Grpc svc handler for the metadata store.
#[derive(Debug)]
pub struct LocalMetadataStoreHandler {
pub struct MetadataStoreHandler {
request_tx: RequestSender,
}

impl LocalMetadataStoreHandler {
impl MetadataStoreHandler {
pub fn new(request_tx: RequestSender) -> Self {
Self { request_tx }
}
}

#[async_trait]
impl MetadataStoreSvc for LocalMetadataStoreHandler {
impl MetadataStoreSvc for MetadataStoreHandler {
async fn get(&self, request: Request<GetRequest>) -> Result<Response<GetResponse>, Status> {
let (result_tx, result_rx) = oneshot::channel();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

pub mod client;
pub mod handler;
pub mod server;
pub mod service_builder;

pub mod pb_conversions {
use crate::grpc_svc;
Expand Down
76 changes: 76 additions & 0 deletions crates/metadata-store/src/grpc/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use http::Request;
use hyper::body::Incoming;
use hyper_util::service::TowerToHyperService;
use restate_core::network::net_util;
use restate_core::ShutdownError;
use restate_types::health::HealthStatus;
use restate_types::net::BindAddress;
use restate_types::protobuf::common::MetadataServerStatus;
use tonic::body::boxed;
use tonic::service::Routes;
use tower::ServiceExt;
use tower_http::classify::{GrpcCode, GrpcErrorsAsFailures, SharedClassifier};

pub struct GrpcServer {
bind_address: BindAddress,
routes: Routes,
}

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("failed running grpc server: {0}")]
GrpcServer(#[from] net_util::Error),
#[error("system is shutting down")]
Shutdown(#[from] ShutdownError),
}

impl GrpcServer {
pub fn new(bind_address: BindAddress, routes: Routes) -> Self {
Self {
bind_address,
routes,
}
}

pub async fn run(self, health_status: HealthStatus<MetadataServerStatus>) -> Result<(), Error> {
let span_factory = tower_http::trace::DefaultMakeSpan::new()
.include_headers(true)
.level(tracing::Level::ERROR);

let trace_layer = tower_http::trace::TraceLayer::new(SharedClassifier::new(
GrpcErrorsAsFailures::new().with_success(GrpcCode::FailedPrecondition),
))
.make_span_with(span_factory);

let server_builder = tonic::transport::Server::builder()
.layer(trace_layer)
.add_routes(self.routes);

let service = TowerToHyperService::new(
server_builder
.into_service()
.map_request(|req: Request<Incoming>| req.map(boxed)),
);

net_util::run_hyper_server(
&self.bind_address,
service,
"metadata-store-grpc",
|| health_status.update(MetadataServerStatus::Ready),
|| health_status.update(MetadataServerStatus::Unknown),
)
.await?;

Ok(())
}
}
81 changes: 81 additions & 0 deletions crates/metadata-store/src/grpc/service_builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use http::{Request, Response};
use std::convert::Infallible;
use tonic::body::BoxBody;
use tonic::server::NamedService;
use tonic::service::{Routes, RoutesBuilder};
use tonic_health::ServingStatus;
use tower::Service;

#[derive(Debug)]
pub struct GrpcServiceBuilder<'a> {
reflection_service_builder: Option<tonic_reflection::server::Builder<'a>>,
routes_builder: RoutesBuilder,
svc_names: Vec<&'static str>,
}

impl<'a> Default for GrpcServiceBuilder<'a> {
fn default() -> Self {
let routes_builder = RoutesBuilder::default();

Self {
reflection_service_builder: Some(tonic_reflection::server::Builder::configure()),
routes_builder,
svc_names: Vec::default(),
}
}
}

impl<'a> GrpcServiceBuilder<'a> {
pub fn add_service<S>(&mut self, svc: S)
where
S: Service<Request<BoxBody>, Response = Response<BoxBody>, Error = Infallible>
+ NamedService
+ Clone
+ Send
+ 'static,
S::Future: Send + 'static,
{
self.svc_names.push(S::NAME);
self.routes_builder.add_service(svc);
}

pub fn register_file_descriptor_set_for_reflection<'b: 'a>(
&mut self,
encoded_file_descriptor_set: &'b [u8],
) {
self.reflection_service_builder = Some(
self.reflection_service_builder
.take()
.expect("be present")
.register_encoded_file_descriptor_set(encoded_file_descriptor_set),
);
}

pub async fn build(mut self) -> Result<Routes, tonic_reflection::server::Error> {
let (mut health_reporter, health_service) = tonic_health::server::health_reporter();

for svc_name in self.svc_names {
health_reporter
.set_service_status(svc_name, ServingStatus::Serving)
.await;
}

self.routes_builder.add_service(health_service);
self.routes_builder.add_service(
self.reflection_service_builder
.expect("be present")
.build_v1()?,
);
Ok(self.routes_builder.routes())
}
}
62 changes: 62 additions & 0 deletions crates/metadata-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,71 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod grpc;
mod grpc_svc;
pub mod local;

use bytestring::ByteString;
use restate_core::metadata_store::VersionedValue;
pub use restate_core::metadata_store::{
MetadataStoreClient, Precondition, ReadError, ReadModifyWriteError, WriteError,
};
use restate_types::errors::GenericError;
use restate_types::storage::{StorageDecodeError, StorageEncodeError};
use restate_types::Version;
use tokio::sync::{mpsc, oneshot};

pub type RequestSender = mpsc::Sender<MetadataStoreRequest>;
pub type RequestReceiver = mpsc::Receiver<MetadataStoreRequest>;

type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(Debug)]
pub enum MetadataStoreRequest {
Get {
key: ByteString,
result_tx: oneshot::Sender<Result<Option<VersionedValue>>>,
},
GetVersion {
key: ByteString,
result_tx: oneshot::Sender<Result<Option<Version>>>,
},
Put {
key: ByteString,
value: VersionedValue,
precondition: Precondition,
result_tx: oneshot::Sender<Result<()>>,
},
Delete {
key: ByteString,
precondition: Precondition,
result_tx: oneshot::Sender<Result<()>>,
},
}

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("storage error: {0}")]
Storage(#[from] GenericError),
#[error("failed precondition: {0}")]
FailedPrecondition(String),
#[error("invalid argument: {0}")]
InvalidArgument(String),
#[error("encode error: {0}")]
Encode(#[from] StorageEncodeError),
#[error("decode error: {0}")]
Decode(#[from] StorageDecodeError),
}

impl Error {
fn kv_pair_exists() -> Self {
Error::FailedPrecondition("key-value pair already exists".to_owned())
}

fn version_mismatch(expected: Version, actual: Option<Version>) -> Self {
Error::FailedPrecondition(format!(
"Expected version '{}' but found version '{:?}'",
expected, actual
))
}
}
11 changes: 6 additions & 5 deletions crates/metadata-store/src/local/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod grpc;
mod store;

mod service;
Expand All @@ -19,10 +18,11 @@ use restate_types::{
errors::GenericError,
};

use crate::local::grpc::client::LocalMetadataStoreClient;
pub use service::{Error, LocalMetadataStoreService};

/// Creates a [`MetadataStoreClient`].
use crate::grpc::client::GrpcMetadataStoreClient;

/// Creates a [`MetadataStoreClient`] for the [`GrpcMetadataStoreClient`].
pub async fn create_client(
metadata_store_client_options: MetadataStoreClientOptions,
) -> Result<MetadataStoreClient, GenericError> {
Expand All @@ -34,8 +34,9 @@ pub async fn create_client(

let client = match metadata_store_client_options.metadata_store_client.clone() {
MetadataStoreClientConfig::Embedded { address } => {
let store = LocalMetadataStoreClient::new(address, &metadata_store_client_options);
MetadataStoreClient::new(store, backoff_policy)
let inner_client =
GrpcMetadataStoreClient::new(address, &metadata_store_client_options);
MetadataStoreClient::new(inner_client, backoff_policy)
}
MetadataStoreClientConfig::Etcd { addresses } => {
let store = EtcdMetadataStore::new(addresses, &metadata_store_client_options).await?;
Expand Down
Loading

0 comments on commit 0dd0374

Please sign in to comment.