Skip to content

Commit

Permalink
feat(ctl): list serving fragment mappings
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Jun 14, 2023
1 parent 8eb0e43 commit 14f3915
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 7 deletions.
10 changes: 10 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -411,3 +411,13 @@ service SystemParamsService {
rpc GetSystemParams(GetSystemParamsRequest) returns (GetSystemParamsResponse);
rpc SetSystemParam(SetSystemParamRequest) returns (SetSystemParamResponse);
}

message GetServingVnodeMappingsRequest {}

message GetServingVnodeMappingsResponse {
repeated FragmentParallelUnitMapping mappings = 1;
}

service ServingService {
rpc GetServingVnodeMappings(GetServingVnodeMappingsRequest) returns (GetServingVnodeMappingsResponse);
}
2 changes: 1 addition & 1 deletion src/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@

pub mod backup_restore;
mod barrier;
pub(crate) mod batch;
#[cfg(not(madsim))] // no need in simulation test
mod dashboard;
mod error;
pub mod hummock;
pub mod manager;
mod model;
mod rpc;
pub(crate) mod serving;
pub mod storage;
mod stream;
pub(crate) mod telemetry;
Expand Down
12 changes: 8 additions & 4 deletions src/meta/src/rpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use risingwave_pb::meta::heartbeat_service_server::HeartbeatServiceServer;
use risingwave_pb::meta::meta_member_service_server::MetaMemberServiceServer;
use risingwave_pb::meta::notification_service_server::NotificationServiceServer;
use risingwave_pb::meta::scale_service_server::ScaleServiceServer;
use risingwave_pb::meta::serving_service_server::ServingServiceServer;
use risingwave_pb::meta::stream_manager_service_server::StreamManagerServiceServer;
use risingwave_pb::meta::system_params_service_server::SystemParamsServiceServer;
use risingwave_pb::meta::telemetry_info_service_server::TelemetryInfoServiceServer;
Expand All @@ -51,10 +52,10 @@ use super::intercept::MetricsMiddlewareLayer;
use super::service::health_service::HealthServiceImpl;
use super::service::notification_service::NotificationServiceImpl;
use super::service::scale_service::ScaleServiceImpl;
use super::service::serving_service::ServingServiceImpl;
use super::DdlServiceImpl;
use crate::backup_restore::BackupManager;
use crate::barrier::{BarrierScheduler, GlobalBarrierManager};
use crate::batch::ServingVnodeMapping;
use crate::hummock::{CompactionScheduler, HummockManager};
use crate::manager::{
CatalogManager, ClusterManager, FragmentManager, IdleManager, MetaOpts, MetaSrvEnv,
Expand All @@ -72,10 +73,11 @@ use crate::rpc::service::stream_service::StreamServiceImpl;
use crate::rpc::service::system_params_service::SystemParamsServiceImpl;
use crate::rpc::service::telemetry_service::TelemetryInfoServiceImpl;
use crate::rpc::service::user_service::UserServiceImpl;
use crate::serving::ServingVnodeMapping;
use crate::storage::{EtcdMetaStore, MemStore, MetaStore, WrappedEtcdClient as EtcdClient};
use crate::stream::{GlobalStreamManager, SourceManager};
use crate::telemetry::{MetaReportCreator, MetaTelemetryInfoFetcher};
use crate::{batch, hummock, MetaError, MetaResult};
use crate::{hummock, serving, MetaError, MetaResult};

#[derive(Debug)]
pub enum MetaStoreBackend {
Expand Down Expand Up @@ -358,7 +360,7 @@ pub async fn start_service_as_election_leader<S: MetaStore>(
.unwrap(),
);
let serving_vnode_mapping = Arc::new(ServingVnodeMapping::default());
batch::on_meta_start(
serving::on_meta_start(
env.notification_manager_ref(),
cluster_manager.clone(),
fragment_manager.clone(),
Expand Down Expand Up @@ -538,6 +540,7 @@ pub async fn start_service_as_election_leader<S: MetaStore>(
let backup_srv = BackupServiceImpl::new(backup_manager);
let telemetry_srv = TelemetryInfoServiceImpl::new(meta_store.clone());
let system_params_srv = SystemParamsServiceImpl::new(system_params_manager.clone());
let serving_srv = ServingServiceImpl::new(serving_vnode_mapping.clone());

if let Some(prometheus_addr) = address_info.prometheus_addr {
MetricsManager::boot_metrics_service(
Expand Down Expand Up @@ -580,7 +583,7 @@ pub async fn start_service_as_election_leader<S: MetaStore>(
sub_tasks.push(HummockManager::start_compaction_heartbeat(hummock_manager.clone()).await);
sub_tasks.push(HummockManager::start_lsm_stat_report(hummock_manager).await);
sub_tasks.push(
batch::start_serving_vnode_mapping_worker(
serving::start_serving_vnode_mapping_worker(
env.notification_manager_ref(),
cluster_manager.clone(),
fragment_manager.clone(),
Expand Down Expand Up @@ -687,6 +690,7 @@ pub async fn start_service_as_election_leader<S: MetaStore>(
.add_service(BackupServiceServer::new(backup_srv))
.add_service(SystemParamsServiceServer::new(system_params_srv))
.add_service(TelemetryInfoServiceServer::new(telemetry_srv))
.add_service(ServingServiceServer::new(serving_srv))
.serve_with_shutdown(address_info.listen_addr, async move {
tokio::select! {
res = svc_shutdown_rx.changed() => {
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/rpc/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod hummock_service;
pub mod meta_member_service;
pub mod notification_service;
pub mod scale_service;
pub mod serving_service;
pub mod stream_service;
pub mod system_params_service;
pub mod telemetry_service;
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/rpc/service/notification_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
use tonic::{Request, Response, Status};

use crate::backup_restore::BackupManagerRef;
use crate::batch::ServingVnodeMappingRef;
use crate::hummock::HummockManagerRef;
use crate::manager::{
Catalog, CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, MetaSrvEnv, Notification,
NotificationVersion, WorkerKey,
};
use crate::serving::ServingVnodeMappingRef;
use crate::storage::MetaStore;

pub struct NotificationServiceImpl<S: MetaStore> {
Expand Down
52 changes: 52 additions & 0 deletions src/meta/src/rpc/service/serving_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_pb::meta::serving_service_server::ServingService;
use risingwave_pb::meta::{
FragmentParallelUnitMapping, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse,
};
use tonic::{Request, Response, Status};

use crate::serving::ServingVnodeMappingRef;

pub struct ServingServiceImpl {
serving_vnode_mapping: ServingVnodeMappingRef,
}

impl ServingServiceImpl {
pub fn new(serving_vnode_mapping: ServingVnodeMappingRef) -> Self {
Self {
serving_vnode_mapping,
}
}
}

#[async_trait::async_trait]
impl ServingService for ServingServiceImpl {
async fn get_serving_vnode_mappings(
&self,
_request: Request<GetServingVnodeMappingsRequest>,
) -> Result<Response<GetServingVnodeMappingsResponse>, Status> {
let mappings = self
.serving_vnode_mapping
.all()
.into_iter()
.map(|(fragment_id, mapping)| FragmentParallelUnitMapping {
fragment_id,
mapping: Some(mapping.to_protobuf()),
})
.collect();
Ok(Response::new(GetServingVnodeMappingsResponse { mappings }))
}
}
File renamed without changes.
23 changes: 22 additions & 1 deletion src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use itertools::Itertools;
use lru::LruCache;
use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, TableId};
use risingwave_common::config::{MetaConfig, MAX_CONNECTION_WINDOW_SIZE};
use risingwave_common::hash::ParallelUnitMapping;
use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_common::telemetry::report::TelemetryInfoFetcher;
use risingwave_common::util::addr::HostAddr;
Expand Down Expand Up @@ -59,6 +60,7 @@ use risingwave_pb::meta::meta_member_service_client::MetaMemberServiceClient;
use risingwave_pb::meta::notification_service_client::NotificationServiceClient;
use risingwave_pb::meta::reschedule_request::PbReschedule;
use risingwave_pb::meta::scale_service_client::ScaleServiceClient;
use risingwave_pb::meta::serving_service_client::ServingServiceClient;
use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient;
use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient;
use risingwave_pb::meta::telemetry_info_service_client::TelemetryInfoServiceClient;
Expand Down Expand Up @@ -896,6 +898,21 @@ impl MetaClient {
let resp = self.inner.get_tables(req).await?;
Ok(resp.tables)
}

pub async fn list_serving_vnode_mappings(&self) -> Result<HashMap<u32, ParallelUnitMapping>> {
let req = GetServingVnodeMappingsRequest {};
let resp = self.inner.get_serving_vnode_mappings(req).await?;
let mappings = resp.mappings
.into_iter()
.map(|p| {
(
p.fragment_id,
ParallelUnitMapping::from_protobuf(p.mapping.as_ref().unwrap()),
)
})
.collect();
Ok(mappings)
}
}

#[async_trait]
Expand Down Expand Up @@ -1081,6 +1098,7 @@ struct GrpcMetaClientCore {
backup_client: BackupServiceClient<Channel>,
telemetry_client: TelemetryInfoServiceClient<Channel>,
system_params_client: SystemParamsServiceClient<Channel>,
serving_client: ServingServiceClient<Channel>,
}

impl GrpcMetaClientCore {
Expand All @@ -1096,7 +1114,8 @@ impl GrpcMetaClientCore {
let scale_client = ScaleServiceClient::new(channel.clone());
let backup_client = BackupServiceClient::new(channel.clone());
let telemetry_client = TelemetryInfoServiceClient::new(channel.clone());
let system_params_client = SystemParamsServiceClient::new(channel);
let system_params_client = SystemParamsServiceClient::new(channel.clone());
let serving_client = ServingServiceClient::new(channel);

GrpcMetaClientCore {
cluster_client,
Expand All @@ -1111,6 +1130,7 @@ impl GrpcMetaClientCore {
backup_client,
telemetry_client,
system_params_client,
serving_client,
}
}
}
Expand Down Expand Up @@ -1528,6 +1548,7 @@ macro_rules! for_all_meta_rpc {
,{ telemetry_client, get_telemetry_info, GetTelemetryInfoRequest, TelemetryInfoResponse}
,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse }
,{ system_params_client, set_system_param, SetSystemParamRequest, SetSystemParamResponse }
,{ serving_client, get_serving_vnode_mappings, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse }
}
};
}
Expand Down

0 comments on commit 14f3915

Please sign in to comment.