Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/gRPC drop stream bugfix #2749

Merged
merged 4 commits into from
Oct 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 8 additions & 20 deletions irohad/consensus/yac/transport/impl/consensus_service_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,39 +13,27 @@ ServiceImpl::ServiceImpl(logger::LoggerPtr log,
std::function<void(std::vector<VoteMessage>)> callback)
: callback_(std::move(callback)), log_(std::move(log)) {}

grpc::Status ServiceImpl::HandleState(
grpc::Status ServiceImpl::SendState(
::grpc::ServerContext *context,
::iroha::consensus::yac::proto::State &request) {
const ::iroha::consensus::yac::proto::State *request,
::google::protobuf::Empty *response) {
std::vector<VoteMessage> state;
for (const auto &pb_vote : request.votes())
if (auto vote = PbConverters::deserializeVote(pb_vote, log_))
for (const auto &pb_vote : request->votes()) {
if (auto vote = PbConverters::deserializeVote(pb_vote, log_)) {
state.push_back(*vote);

}
}
if (state.empty()) {
log_->info("Received an empty votes collection");
return grpc::Status::CANCELLED;
}

if (not sameKeys(state)) {
log_->info("Votes are statelessly invalid: proposal rounds are different");
return grpc::Status::CANCELLED;
}

log_->info("Received votes[size={}] from {}", state.size(), context->peer());

callback_(std::move(state));
return grpc::Status::OK;
}

grpc::Status ServiceImpl::SendState(
::grpc::ServerContext *context,
::grpc::ServerReader< ::iroha::consensus::yac::proto::State> *reader,
::google::protobuf::Empty *response) {
::iroha::consensus::yac::proto::State request;

grpc::Status status = grpc::Status::OK;
while (status.ok() && reader->Read(&request)) {
status = HandleState(context, request);
}

return status;
}
14 changes: 4 additions & 10 deletions irohad/consensus/yac/transport/impl/consensus_service_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,13 @@ namespace iroha::consensus::yac {
* Naming is confusing, because this is rpc call that
* perform on another machine;
*/
grpc::Status SendState(
::grpc::ServerContext *context,
::grpc::ServerReader< ::iroha::consensus::yac::proto::State> *reader,
::google::protobuf::Empty *response) override;

/**
* Handles state;
*/
grpc::Status HandleState(::grpc::ServerContext *context,
::iroha::consensus::yac::proto::State &request);
grpc::Status SendState(::grpc::ServerContext *context,
const ::iroha::consensus::yac::proto::State *request,
::google::protobuf::Empty *response) override;

private:
std::function<void(std::vector<VoteMessage>)> callback_;

logger::LoggerPtr log_;
};
} // namespace iroha::consensus::yac
Expand Down
75 changes: 21 additions & 54 deletions irohad/consensus/yac/transport/impl/network_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,78 +44,45 @@ void NetworkImpl::sendState(const shared_model::interface::Peer &to,
*pb_vote = PbConverters::serializeVote(vote);
}

auto stream_writer = stubs_.exclusiveAccess(
[&](auto &stubs) -> std::shared_ptr<::grpc::ClientWriterInterface<
::iroha::consensus::yac::proto::State>> {
auto const it = stubs.find(to.pubkey());
if (it == stubs.end() || std::get<0>(it->second) != to.address()) {
if (it != stubs.end()) {
// clear all
std::get<3>(it->second)->WritesDone();
stubs.erase(to.pubkey());
}

auto maybe_client = client_factory_->createClient(to);
if (expected::hasError(maybe_client)) {
log_->error("Could not send state to {}: {}",
to,
maybe_client.assumeError());
return nullptr;
}

std::unique_ptr<proto::Yac::StubInterface> client =
std::move(maybe_client).assumeValue();

auto context = std::make_unique<grpc::ClientContext>();
context->set_wait_for_ready(true);
context->set_deadline(std::chrono::system_clock::now()
+ std::chrono::seconds(5));

auto response = std::make_unique<::google::protobuf::Empty>();
std::shared_ptr<::grpc::ClientWriterInterface<
::iroha::consensus::yac::proto::State>>
writer = client->SendState(context.get(), response.get());

stubs[to.pubkey()] = std::make_tuple(std::string{to.address()},
std::move(client),
std::move(context),
writer,
std::move(response));
return writer;
}

return std::get<3>(it->second);
});

if (!stream_writer)
auto maybe_client = client_factory_->createClient(to);
if (expected::hasError(maybe_client)) {
log_->error(
"Could not send state to {}: {}", to, maybe_client.assumeError());
return;
}
std::shared_ptr<decltype(maybe_client)::ValueInnerType::element_type> client =
std::move(maybe_client).assumeValue();

log_->debug("Propagating votes for {}, size={} to {}",
state.front().hash.vote_round,
state.size(),
to);
getSubscription()->dispatcher()->add(
getSubscription()->dispatcher()->kExecuteInPool,
[peer{to.pubkey()},
request(std::move(request)),
wstream_writer(utils::make_weak(stream_writer)),
[request(std::move(request)),
client(std::move(client)),
log(utils::make_weak(log_)),
log_sending_msg(fmt::format("Send votes bundle[size={}] for {} to {}",
state.size(),
state.front().hash.vote_round,
to))] {
auto maybe_log = log.lock();
auto stream_writer = wstream_writer.lock();

if (!maybe_log || !stream_writer) {
if (not maybe_log) {
return;
}

grpc::ClientContext context;
context.set_wait_for_ready(true);
context.set_deadline(std::chrono::system_clock::now()
+ std::chrono::seconds(5));
google::protobuf::Empty response;
maybe_log->info(log_sending_msg);
if (!stream_writer->Write(request)) {
maybe_log->warn("RPC failed: {}", peer);
auto status = client->SendState(&context, request, &response);
if (not status.ok()) {
maybe_log->warn(
"RPC failed: {} {}", context.peer(), status.error_message());
return;
} else {
maybe_log->info("RPC succeeded: {}", context.peer());
}
maybe_log->info("RPC succeeded: {}", peer);
});
}
3 changes: 2 additions & 1 deletion irohad/consensus/yac/transport/impl/network_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ namespace iroha::consensus::yac {
* Class which provides implementation of client-side transport for
* consensus based on grpc
*/
class NetworkImpl : public YacNetwork {
class NetworkImpl : public YacNetwork,
public std::enable_shared_from_this<NetworkImpl> {
public:
using Service = proto::Yac;
using ClientFactory = iroha::network::ClientFactory<Service>;
Expand Down
11 changes: 6 additions & 5 deletions irohad/ordering/impl/on_demand_ordering_service_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,16 +295,17 @@ iroha::ordering::PackedProposalData
OnDemandOrderingServiceImpl::packNextProposals(const consensus::Round &round) {
auto const available_txs_count = availableTxsCountBatchesCache();
auto const full_proposals_count = available_txs_count / transaction_limit_;
auto const number_of_packs =
(available_txs_count
+ (full_proposals_count > 0 ? 0 : transaction_limit_ - 1))
/ transaction_limit_;
auto const number_of_proposals = std::min(
(uint32_t)((available_txs_count
+ (full_proposals_count > 0 ? 0 : transaction_limit_ - 1))
/ transaction_limit_),
max_proposal_pack_);

PackedProposalContainer outcome;
std::vector<std::shared_ptr<shared_model::interface::Transaction>> txs;
BloomFilter256 bf;

for (uint32_t ix = 0; ix < number_of_packs; ++ix) {
for (uint32_t ix = 0; ix < number_of_proposals; ++ix) {
assert(!isEmptyBatchesCache());
batches_cache_.getTransactions(
transaction_limit_, txs, bf, [&](auto const &batch) {
Expand Down
4 changes: 2 additions & 2 deletions irohad/ordering/impl/on_demand_os_server_grpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ grpc::Status OnDemandOsServerGrpc::RequestProposal(
if (!request->has_bloom_filter()
|| request->bloom_filter().size() != BloomFilter256::kBytesCount) {
#endif // USE_BLOOM_FILTER
log_->info("Response with full {} txs proposal.",
sptr_proposal->transactions().size());
log_->debug("Response with full {} txs proposal.",
sptr_proposal->transactions().size());
*proposal = proto_proposal;
#if USE_BLOOM_FILTER
} else {
Expand Down
2 changes: 1 addition & 1 deletion schema/yac.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ message State {
}

service Yac {
rpc SendState (stream State) returns (google.protobuf.Empty);
rpc SendState (State) returns (google.protobuf.Empty);
}
Original file line number Diff line number Diff line change
Expand Up @@ -652,8 +652,8 @@ IntegrationTestFramework &IntegrationTestFramework::sendTx(
const shared_model::proto::Transaction &tx) {
sendTx(tx, [this](const auto &status) {
if (!status.statelessErrorOrCommandName().empty()) {
log_->debug("Got error while sending transaction: "
+ status.statelessErrorOrCommandName());
log_->debug("Got error while sending transaction: {}",
status.statelessErrorOrCommandName());
}
});
return *this;
Expand Down
21 changes: 19 additions & 2 deletions test/module/irohad/consensus/yac/network_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,23 @@ namespace iroha::consensus::yac {
VoteMessage message;
};

/**
* @given initialized network
* @when send vote to itself
* @then vote handled
*/
TEST_F(YacNetworkTest, MessageHandledWhenMessageSent) {
proto::State request;
expectConnection(*peer, [&request](auto &stub) {
EXPECT_CALL(stub, SendState(_, _, _))
.WillOnce(DoAll(SaveArg<1>(&request), Return(grpc::Status::OK)));
});

network->sendState(*peer, {message});

ASSERT_EQ(request.votes_size(), 1);
}

/**
* @given initialized network
* @when send request with one vote
Expand All @@ -81,7 +98,7 @@ namespace iroha::consensus::yac {
auto pb_vote = request.add_votes();
*pb_vote = PbConverters::serializeVote(message);

auto response = service->HandleState(&context, request);
auto response = service->SendState(&context, &request, nullptr);
ASSERT_EQ(response.error_code(), grpc::StatusCode::OK);
}

Expand All @@ -93,7 +110,7 @@ namespace iroha::consensus::yac {
TEST_F(YacNetworkTest, SendMessageEmptyKeys) {
proto::State request;
grpc::ServerContext context;
auto response = service->HandleState(&context, request);
auto response = service->SendState(&context, &request, nullptr);
ASSERT_EQ(response.error_code(), grpc::StatusCode::CANCELLED);
}
} // namespace iroha::consensus::yac
31 changes: 31 additions & 0 deletions test/module/irohad/ordering/on_demand_os_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,37 @@ TEST_F(OnDemandOsTest, OverflowRound) {
.size());
}

TEST_F(OnDemandOsTest, OverflowRound4) {
generateTransactionsAndInsert({1, transaction_limit * 5});

os->onCollaborationOutcome(commit_round);

ASSERT_TRUE(os->onRequestProposal(target_round));
ASSERT_TRUE(os->onRequestProposal(target_round)->size() == 4);
for (size_t ix = 0; ix < 4; ++ix) {
ASSERT_EQ(transaction_limit,
os->onRequestProposal(target_round)
->
operator[](ix)
.first->transactions()
.size());
}
}

TEST_F(OnDemandOsTest, OverflowRound5) {
generateTransactionsAndInsert({1, transaction_limit * 15});

os->onCollaborationOutcome(commit_round);

auto pack = os->onRequestProposal(target_round);
ASSERT_TRUE(pack);
ASSERT_TRUE(pack->size() == max_proposal_pack);
for (size_t ix = 0; ix < max_proposal_pack; ++ix) {
ASSERT_EQ(transaction_limit,
pack->operator[](ix).first->transactions().size());
}
}

/**
* @given initialized on-demand OS
* @when insert commit round and then proposal_limit + 2 reject rounds
Expand Down