From a7c23e42affeb2183921a089f1ce3e03e584d058 Mon Sep 17 00:00:00 2001 From: jokemanfire Date: Thu, 13 Feb 2025 11:15:27 +0800 Subject: [PATCH] Update publisher err deal. ref:https://github.com/containerd/ttrpc-rust/pull/259 Due to the inclusion of the latest version of ttrpc, this part of the code has been updated Signed-off-by: jokemanfire --- crates/shim/src/asynchronous/publisher.rs | 25 ++++++++++++++--------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/crates/shim/src/asynchronous/publisher.rs b/crates/shim/src/asynchronous/publisher.rs index 19afe8c6..9ecab316 100644 --- a/crates/shim/src/asynchronous/publisher.rs +++ b/crates/shim/src/asynchronous/publisher.rs @@ -24,7 +24,7 @@ use containerd_shim_protos::{ ttrpc, ttrpc::context::Context, }; -use log::debug; +use log::{debug, error, warn}; use tokio::sync::mpsc; use crate::{ @@ -93,15 +93,20 @@ impl RemotePublisher { count: item.count + 1, }; if let Err(e) = client.forward(new_item.ctx.clone(), &req).await { - debug!("publish error {:?}", e); - // This is a bug from ttrpc, ttrpc should return RemoteClosed|ClientClosed error. change it in future - // if e == (ttrpc::error::Error::RemoteClosed || ttrpc::error::Error::ClientClosed) - // reconnect client - let new_client = Self::connect(address.as_str()).await.map_err(|e| { - debug!("reconnect the ttrpc client {:?} fail", e); - }); - if let Ok(c) = new_client { - client = EventsClient::new(c); + match e { + ttrpc::error::Error::RemoteClosed | ttrpc::error::Error::LocalClosed => { + warn!("publish fail because the server or client close {:?}", e); + // reconnect client + if let Ok(c) = Self::connect(address.as_str()).await.map_err(|e| { + debug!("reconnect the ttrpc client {:?} fail", e); + }) { + client = EventsClient::new(c); + } + } + _ => { + // TODO! if it is other error , May we should deal with socket file + error!("the client forward err is {:?}", e); + } } let sender_ref = sender.clone(); // Take a another task requeue , for no blocking the recv task