Skip to content

Commit ac4d0f7

Browse files
authored
sentry: improve reconnect_channel logging (#2054)
1 parent 831b578 commit ac4d0f7

File tree

4 files changed

+36
-22
lines changed

4 files changed

+36
-22
lines changed

silkworm/infra/grpc/client/call.hpp

+7-5
Original file line numberDiff line numberDiff line change
@@ -118,22 +118,23 @@ Task<Response> unary_rpc_with_retries(
118118
Request request,
119119
agrpc::GrpcContext& grpc_context,
120120
std::function<Task<void>()>& on_disconnect,
121-
grpc::Channel& channel) {
121+
grpc::Channel& channel,
122+
std::string log_prefix) {
122123
// loop until a successful return or cancellation
123124
while (true) {
124125
try {
125126
co_return (co_await unary_rpc(rpc, stub, request, grpc_context));
126127
} catch (const GrpcStatusError& ex) {
127128
if (is_disconnect_error(ex.status(), channel)) {
128-
log::Warning() << "GRPC call failed: " << ex.what();
129+
log::Warning(log_prefix) << "GRPC call failed: " << ex.what();
129130
} else {
130131
throw;
131132
}
132133
}
133134

134135
co_await on_disconnect();
135136
if (channel.GetState(false) != GRPC_CHANNEL_READY) {
136-
co_await reconnect_channel(channel);
137+
co_await reconnect_channel(channel, log_prefix);
137138
}
138139
}
139140
}
@@ -146,6 +147,7 @@ Task<void> streaming_rpc_with_retries(
146147
agrpc::GrpcContext& grpc_context,
147148
std::function<Task<void>()>& on_disconnect,
148149
grpc::Channel& channel,
150+
std::string log_prefix,
149151
std::function<Task<void>(Response)> consumer) {
150152
// loop until a successful return or cancellation
151153
while (true) {
@@ -154,15 +156,15 @@ Task<void> streaming_rpc_with_retries(
154156
break;
155157
} catch (const GrpcStatusError& ex) {
156158
if (is_disconnect_error(ex.status(), channel)) {
157-
log::Warning() << "GRPC streaming call failed: " << ex.what();
159+
log::Warning(log_prefix) << "GRPC streaming call failed: " << ex.what();
158160
} else {
159161
throw;
160162
}
161163
}
162164

163165
co_await on_disconnect();
164166
if (channel.GetState(false) != GRPC_CHANNEL_READY) {
165-
co_await reconnect_channel(channel);
167+
co_await reconnect_channel(channel, log_prefix);
166168
}
167169
}
168170
}

silkworm/infra/grpc/client/reconnect.cpp

+11-3
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,22 @@ bool is_disconnect_error(const grpc::Status& status, grpc::Channel& channel) {
2929
((code == grpc::StatusCode::DEADLINE_EXCEEDED) && (channel.GetState(false) != GRPC_CHANNEL_READY) && (channel.GetState(false) != GRPC_CHANNEL_SHUTDOWN));
3030
}
3131

32-
Task<void> reconnect_channel(grpc::Channel& channel) {
32+
// min_sec, min_sec*2, min_sec*4, ... max_sec, max_sec, ...
33+
static int64_t backoff_timeout(size_t attempt, int64_t min_sec, int64_t max_sec) {
34+
if (attempt >= 20) return max_sec;
35+
return std::min(min_sec << attempt, max_sec);
36+
}
37+
38+
Task<void> reconnect_channel(grpc::Channel& channel, std::string log_prefix) {
3339
bool is_stopped = false;
3440

3541
std::function<void()> run = [&] {
3642
bool is_connected = false;
43+
size_t attempt = 0;
3744
while (!is_connected && !is_stopped && (channel.GetState(false) != GRPC_CHANNEL_SHUTDOWN)) {
38-
log::Info() << "Reconnecting grpc::Channel...";
39-
auto deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(5, GPR_TIMESPAN));
45+
log::Info(log_prefix) << "Reconnecting grpc::Channel...";
46+
auto timeout = backoff_timeout(attempt++, 5, 600);
47+
auto deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(timeout, GPR_TIMESPAN));
4048
is_connected = channel.WaitForConnected(deadline);
4149
}
4250
};

silkworm/infra/grpc/client/reconnect.hpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@
1616

1717
#pragma once
1818

19+
#include <string>
20+
1921
#include <silkworm/infra/concurrency/task.hpp>
2022

2123
#include <grpcpp/grpcpp.h>
2224

2325
namespace silkworm::rpc {
2426

2527
bool is_disconnect_error(const grpc::Status& status, grpc::Channel& channel);
26-
Task<void> reconnect_channel(grpc::Channel& channel);
28+
Task<void> reconnect_channel(grpc::Channel& channel, std::string log_prefix);
2729

2830
} // namespace silkworm::rpc

silkworm/sentry/grpc/client/sentry_client.cpp

+15-13
Original file line numberDiff line numberDiff line change
@@ -73,27 +73,27 @@ class SentryClientImpl final : public api::Service {
7373
}
7474

7575
Task<void> reconnect() {
76-
co_await sw_rpc::reconnect_channel(*channel_);
76+
co_await sw_rpc::reconnect_channel(*channel_, "sentry");
7777
}
7878

7979
// rpc SetStatus(StatusData) returns (SetStatusReply);
8080
Task<void> set_status(eth::StatusData status_data) override {
8181
proto::StatusData request = interfaces::proto_status_data_from_status_data(status_data);
82-
co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSetStatus, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
82+
co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSetStatus, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
8383
}
8484

8585
// rpc HandShake(google.protobuf.Empty) returns (HandShakeReply);
8686
Task<uint8_t> handshake() override {
8787
google::protobuf::Empty request;
88-
proto::HandShakeReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncHandShake, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
88+
proto::HandShakeReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncHandShake, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
8989
uint8_t result = interfaces::eth_version_from_protocol(reply.protocol());
9090
co_return result;
9191
}
9292

9393
// rpc NodeInfo(google.protobuf.Empty) returns(types.NodeInfoReply);
9494
Task<NodeInfos> node_infos() override {
9595
google::protobuf::Empty request;
96-
types::NodeInfoReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncNodeInfo, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
96+
types::NodeInfoReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncNodeInfo, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
9797
auto result = interfaces::node_info_from_proto_node_info(reply);
9898
co_return NodeInfos{result};
9999
}
@@ -104,7 +104,7 @@ class SentryClientImpl final : public api::Service {
104104
request.mutable_data()->CopyFrom(interfaces::outbound_data_from_message(message));
105105
request.mutable_peer_id()->CopyFrom(interfaces::peer_id_from_public_key(public_key));
106106

107-
proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageById, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
107+
proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageById, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
108108
auto result = interfaces::peer_keys_from_sent_peers_ids(reply);
109109
co_return result;
110110
}
@@ -115,15 +115,15 @@ class SentryClientImpl final : public api::Service {
115115
request.mutable_data()->CopyFrom(interfaces::outbound_data_from_message(message));
116116
request.set_max_peers(max_peers);
117117

118-
proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageToRandomPeers, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
118+
proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageToRandomPeers, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
119119
auto result = interfaces::peer_keys_from_sent_peers_ids(reply);
120120
co_return result;
121121
}
122122

123123
// rpc SendMessageToAll(OutboundMessageData) returns (SentPeers);
124124
Task<PeerKeys> send_message_to_all(Message message) override {
125125
proto::OutboundMessageData request = interfaces::outbound_data_from_message(message);
126-
proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageToAll, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
126+
proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageToAll, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
127127
auto result = interfaces::peer_keys_from_sent_peers_ids(reply);
128128
co_return result;
129129
}
@@ -136,7 +136,7 @@ class SentryClientImpl final : public api::Service {
136136
// request.set_min_block()
137137
request.set_max_peers(max_peers);
138138

139-
proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageByMinBlock, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
139+
proto::SentPeers reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncSendMessageByMinBlock, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
140140
auto result = interfaces::peer_keys_from_sent_peers_ids(reply);
141141
co_return result;
142142
}
@@ -147,7 +147,7 @@ class SentryClientImpl final : public api::Service {
147147
request.mutable_peer_id()->CopyFrom(interfaces::peer_id_from_public_key(public_key));
148148
// TODO: set_min_block
149149
// request.set_min_block()
150-
co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeerMinBlock, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
150+
co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeerMinBlock, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
151151
}
152152

153153
// rpc Messages(MessagesRequest) returns (stream InboundMessage);
@@ -172,21 +172,22 @@ class SentryClientImpl final : public api::Service {
172172
grpc_context_,
173173
on_disconnect_,
174174
*channel_,
175+
"sentry",
175176
std::move(proto_consumer));
176177
}
177178

178179
// rpc Peers(google.protobuf.Empty) returns (PeersReply);
179180
Task<PeerInfos> peers() override {
180181
google::protobuf::Empty request;
181-
proto::PeersReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeers, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
182+
proto::PeersReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeers, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
182183
auto result = interfaces::peer_infos_from_proto_peers_reply(reply);
183184
co_return result;
184185
}
185186

186187
// rpc PeerCount(PeerCountRequest) returns (PeerCountReply);
187188
Task<size_t> peer_count() override {
188189
proto::PeerCountRequest request;
189-
proto::PeerCountReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeerCount, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
190+
proto::PeerCountReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeerCount, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
190191
auto result = static_cast<size_t>(reply.count());
191192
co_return result;
192193
}
@@ -195,7 +196,7 @@ class SentryClientImpl final : public api::Service {
195196
Task<std::optional<PeerInfo>> peer_by_id(EccPublicKey public_key) override {
196197
proto::PeerByIdRequest request;
197198
request.mutable_peer_id()->CopyFrom(interfaces::peer_id_from_public_key(public_key));
198-
proto::PeerByIdReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeerById, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
199+
proto::PeerByIdReply reply = co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPeerById, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
199200
auto result = interfaces::peer_info_opt_from_proto_peer_reply(reply);
200201
co_return result;
201202
}
@@ -205,7 +206,7 @@ class SentryClientImpl final : public api::Service {
205206
proto::PenalizePeerRequest request;
206207
request.mutable_peer_id()->CopyFrom(interfaces::peer_id_from_public_key(public_key));
207208
request.set_penalty(proto::PenaltyKind::Kick);
208-
co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPenalizePeer, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_);
209+
co_await sw_rpc::unary_rpc_with_retries(&Stub::AsyncPenalizePeer, stub_, std::move(request), grpc_context_, on_disconnect_, *channel_, "sentry");
209210
}
210211

211212
// rpc PeerEvents(PeerEventsRequest) returns (stream PeerEvent);
@@ -225,6 +226,7 @@ class SentryClientImpl final : public api::Service {
225226
grpc_context_,
226227
on_disconnect_,
227228
*channel_,
229+
"sentry",
228230
std::move(proto_consumer));
229231
}
230232

0 commit comments

Comments
 (0)