From ddae931ee2f963ed20afe64c5a1959cb1edee0a9 Mon Sep 17 00:00:00 2001 From: Floris Smit Date: Tue, 15 Nov 2022 18:23:56 +0100 Subject: [PATCH] feat: health check RPC for compute node (#6334) * Add health service to compute-node * Add correct imports * Ignore new_without_default clippy warning * Fix imports after merge Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> Co-authored-by: Liang <44948473+soundOfDestiny@users.noreply.github.com> --- src/compute/src/rpc/service/health_service.rs | 40 +++++++++++++++++++ src/compute/src/rpc/service/mod.rs | 1 + src/compute/src/server.rs | 4 ++ 3 files changed, 45 insertions(+) create mode 100644 src/compute/src/rpc/service/health_service.rs diff --git a/src/compute/src/rpc/service/health_service.rs b/src/compute/src/rpc/service/health_service.rs new file mode 100644 index 0000000000000..6ac225e6ffeda --- /dev/null +++ b/src/compute/src/rpc/service/health_service.rs @@ -0,0 +1,40 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_pb::health::health_check_response::ServingStatus; +use risingwave_pb::health::health_server::Health; +use risingwave_pb::health::{HealthCheckRequest, HealthCheckResponse}; +use tonic::{Request, Response, Status}; + +pub struct HealthServiceImpl {} + +impl HealthServiceImpl { + #[allow(clippy::new_without_default)] + pub fn new() -> Self { + Self {} + } +} + +#[async_trait::async_trait] +impl Health for HealthServiceImpl { + async fn check( + &self, + _request: Request, + ) -> Result, Status> { + // Reply serving as long as tonic service is started + Ok(Response::new(HealthCheckResponse { + status: ServingStatus::Serving as i32, + })) + } +} diff --git a/src/compute/src/rpc/service/mod.rs b/src/compute/src/rpc/service/mod.rs index 3d3c7d48ac59a..cd27859164e22 100644 --- a/src/compute/src/rpc/service/mod.rs +++ b/src/compute/src/rpc/service/mod.rs @@ -15,5 +15,6 @@ pub mod config_service; pub mod exchange_metrics; pub mod exchange_service; +pub mod health_service; pub mod monitor_service; pub mod stream_service; diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index f9cee17d98284..19c479f2cacde 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -26,6 +26,7 @@ use risingwave_common_service::metrics_manager::MetricsManager; use risingwave_hummock_sdk::compact::CompactorRuntimeConfig; use risingwave_pb::common::WorkerType; use risingwave_pb::compute::config_service_server::ConfigServiceServer; +use risingwave_pb::health::health_server::HealthServer; use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer; use risingwave_pb::stream_service::stream_service_server::StreamServiceServer; use risingwave_pb::task_service::exchange_service_server::ExchangeServiceServer; @@ -52,6 +53,7 @@ use tokio::task::JoinHandle; use crate::rpc::service::config_service::ConfigServiceImpl; use crate::rpc::service::exchange_metrics::ExchangeServiceMetrics; use crate::rpc::service::exchange_service::ExchangeServiceImpl; +use crate::rpc::service::health_service::HealthServiceImpl; use crate::rpc::service::monitor_service::{ GrpcStackTraceManagerRef, MonitorServiceImpl, StackTraceMiddlewareLayer, }; @@ -244,6 +246,7 @@ pub async fn compute_node_serve( let stream_srv = StreamServiceImpl::new(stream_mgr.clone(), stream_env.clone()); let monitor_srv = MonitorServiceImpl::new(stream_mgr.clone(), grpc_stack_trace_mgr.clone()); let config_srv = ConfigServiceImpl::new(batch_mgr, stream_mgr); + let health_srv = HealthServiceImpl::new(); let (shutdown_send, mut shutdown_recv) = tokio::sync::oneshot::channel::<()>(); let join_handle = tokio::spawn(async move { @@ -259,6 +262,7 @@ pub async fn compute_node_serve( .add_service(StreamServiceServer::new(stream_srv)) .add_service(MonitorServiceServer::new(monitor_srv)) .add_service(ConfigServiceServer::new(config_srv)) + .add_service(HealthServer::new(health_srv)) .serve_with_shutdown(listen_addr, async move { tokio::select! { _ = tokio::signal::ctrl_c() => {},