Skip to content

Commit

Permalink
add reconnect logic in meta client
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Chen <[email protected]>
  • Loading branch information
shanicky committed Jan 29, 2023
1 parent 84c5d6d commit 550bb73
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 57 deletions.
15 changes: 8 additions & 7 deletions src/rpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,14 @@ macro_rules! meta_rpc_client_method_impl {
($( { $client:tt, $fn_name:ident, $req:ty, $resp:ty }),*) => {
$(
pub async fn $fn_name(&self, request: $req) -> $crate::Result<$resp> {
let guard = self.core.lock().await;
Ok(guard
.$client
.to_owned()
.$fn_name(request)
.await?
.into_inner())
let mut client = self.core.read().await.$client.to_owned();
match client.$fn_name(request).await {
Ok(resp) => Ok(resp.into_inner()),
Err(e) => {
self.refresh_client_if_needed(e.code()).await;
Err(RpcError::GrpcStatus(e))
}
}
}
)*
}
Expand Down
105 changes: 55 additions & 50 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ use risingwave_pb::user::user_service_client::UserServiceClient;
use risingwave_pb::user::*;
use tokio::sync::mpsc::Receiver;
use tokio::sync::oneshot::Sender;
use tokio::sync::{oneshot, Mutex};
use tokio::sync::{mpsc, oneshot, RwLock};
use tokio::task::JoinHandle;
use tokio::time;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tonic::transport::{Channel, Endpoint};
use tonic::Streaming;
use tonic::{Code, Streaming};

use crate::error::{Result, RpcError};
use crate::hummock_meta_client::{CompactTaskItem, HummockMetaClient};
Expand Down Expand Up @@ -121,7 +121,7 @@ impl MetaClient {
addr: &HostAddr,
worker_node_parallelism: usize,
) -> Result<Self> {
let grpc_meta_client = GrpcMetaClient::new(meta_addr, true).await?;
let grpc_meta_client = GrpcMetaClient::new(meta_addr).await?;
let request = AddWorkerNodeRequest {
worker_type: worker_type as i32,
host: Some(addr.to_protobuf()),
Expand Down Expand Up @@ -904,8 +904,8 @@ impl GrpcMetaClientCore {
/// It is a wrapper of tonic client. See [`rpc_client_method_impl`].
#[derive(Debug, Clone)]
struct GrpcMetaClient {
force_refresh: Option<tokio::sync::mpsc::Sender<oneshot::Sender<Result<()>>>>,
core: Arc<Mutex<GrpcMetaClientCore>>,
force_refresh_sender: mpsc::Sender<oneshot::Sender<Result<()>>>,
core: Arc<RwLock<GrpcMetaClientCore>>,
}

impl GrpcMetaClient {
Expand Down Expand Up @@ -935,26 +935,28 @@ impl GrpcMetaClient {
let mut members = HashMap::new();

// we use addr and election_client as default members
let init_election_client = core_ref.lock().await.election_client.clone();
let init_election_client = core_ref.read().await.election_client.clone();
members.insert(current_leader.clone(), init_election_client);

struct ElectionMemberManagement {
core_ref: Arc<Mutex<GrpcMetaClientCore>>,
core_ref: Arc<RwLock<GrpcMetaClientCore>>,
members: HashMap<String, LeaderServiceClient<Channel>>,
current_leader: String,
}

impl ElectionMemberManagement {
const ELECTION_MEMBER_REFRESH_PERIOD: Duration = Duration::from_secs(5);

fn host_address_to_url(addr: &HostAddress) -> String {
format!("http://{}:{}", addr.host, addr.port)
}

async fn recreate_core(&self, channel: Channel) {
let mut core = self.core_ref.lock().await;
let mut core = self.core_ref.write().await;
*core = GrpcMetaClientCore::new(channel);
}

async fn tick(&mut self) -> Result<()> {
async fn refresh(&mut self) -> Result<()> {
self.refresh_members()
.await
.context("error happened when refresh election members")?;
Expand All @@ -970,12 +972,12 @@ impl GrpcMetaClient {
let members_resp = match client.to_owned().members(MembersRequest {}).await {
Ok(resp) => resp.into_inner(),
Err(e) => {
tracing::warn!("failed to fetch members from {}, {}", addr, e);
tracing::debug!("failed to fetch members from {}, {}", addr, e);
continue;
}
};

let member_addrs: HashSet<_> = members_resp
let member_addrs: HashSet<String> = members_resp
.members
.iter()
.cloned()
Expand All @@ -987,12 +989,9 @@ impl GrpcMetaClient {
break;
}

let member_addrs = match members {
None => return Err(RpcError::Internal(anyhow!("could not refresh members"))),
Some(members) => members,
};
let member_addrs = members.ok_or_else(|| anyhow!("could not refresh members"))?;

let mut new_members = HashMap::new();
let mut new_members = HashMap::with_capacity(member_addrs.len());

for addr in member_addrs {
if let Some(client) = self.members.remove(&addr) {
Expand All @@ -1013,7 +1012,7 @@ impl GrpcMetaClient {
let leader_resp = match client.to_owned().leader(LeaderRequest {}).await {
Ok(resp) => resp.into_inner(),
Err(e) => {
tracing::warn!("failed to fetch leader from {}, {}", addr, e);
tracing::debug!("failed to fetch leader from {}, {}", addr, e);
continue;
}
};
Expand Down Expand Up @@ -1051,15 +1050,16 @@ impl GrpcMetaClient {

tokio::spawn(async move {
let mut member_management = member_management;
let mut ticker = time::interval(Duration::from_secs(1));
let mut ticker =
time::interval(ElectionMemberManagement::ELECTION_MEMBER_REFRESH_PERIOD);

loop {
let result_sender: Option<oneshot::Sender<Result<()>>> = tokio::select! {
_ = ticker.tick() => None,
result_sender = force_refresh_receiver.recv() => result_sender,
};

let tick_result = member_management.tick().await;
let tick_result = member_management.refresh().await;
if let Err(e) = tick_result.as_ref() {
tracing::error!("refresh election client failed {}", e);
}
Expand All @@ -1077,47 +1077,34 @@ impl GrpcMetaClient {
async fn force_refresh_leader(&self) -> Result<()> {
let (sender, receiver) = oneshot::channel();

if let Some(force_refresh_sender) = self.force_refresh.as_ref() {
force_refresh_sender
.send(sender)
.await
.map_err(|e| anyhow!(e))?;
self.force_refresh_sender
.send(sender)
.await
.map_err(|e| anyhow!(e))?;

receiver.await.map_err(|e| anyhow!(e))?
} else {
Ok(())
}
receiver.await.map_err(|e| anyhow!(e))?
}

/// Connect to the meta server `addr`.
pub async fn new(addr: &str, enable_tick_loop: bool) -> Result<Self> {
pub async fn new(addr: &str) -> Result<Self> {
let channel = Self::build_rpc_channel(addr).await?;

let (force_refresh_sender, force_refresh_receiver) = tokio::sync::mpsc::channel(128);
let (force_refresh_sender, force_refresh_receiver) = mpsc::channel(1);

if enable_tick_loop {
let client = GrpcMetaClient {
force_refresh: Some(force_refresh_sender),
core: Arc::new(Mutex::new(GrpcMetaClientCore::new(channel))),
};

client
.start_meta_election_monitor(addr, force_refresh_receiver)
.await?;

tracing::info!("force refresh leader of meta-node");
let client = GrpcMetaClient {
force_refresh_sender,
core: Arc::new(RwLock::new(GrpcMetaClientCore::new(channel))),
};

if let Err(e) = client.force_refresh_leader().await {
tracing::warn!("force refresh leader failed {}, init leader may failed", e);
}
client
.start_meta_election_monitor(addr, force_refresh_receiver)
.await?;

Ok(client)
} else {
Ok(GrpcMetaClient {
force_refresh: None,
core: Arc::new(Mutex::new(GrpcMetaClientCore::new(channel))),
})
if let Err(e) = client.force_refresh_leader().await {
tracing::warn!("force refresh leader failed {}, init leader may failed", e);
}

Ok(client)
}

pub(crate) async fn build_rpc_channel(addr: &str) -> Result<Channel> {
Expand Down Expand Up @@ -1232,6 +1219,24 @@ macro_rules! for_all_meta_rpc {
};
}

impl GrpcMetaClient {
async fn refresh_client_if_needed(&self, code: Code) {
if matches!(
code,
Code::Unknown | Code::Unimplemented | Code::Unavailable
) {
let (result_sender, result_receiver) = oneshot::channel();
if self.force_refresh_sender.try_send(result_sender).is_ok() {
if let Ok(Err(e)) = result_receiver.await {
tracing::error!("force refresh meta client failed {}", e);
}
} else {
tracing::debug!("Skipping the current refresh, somewhere else is already doing it")
}
}
}
}

impl GrpcMetaClient {
for_all_meta_rpc! { meta_rpc_client_method_impl }
}

0 comments on commit 550bb73

Please sign in to comment.