Skip to content

Commit

Permalink
Revert "[#386] Switch ResettableHeartbeater with Periodic timer"
Browse files Browse the repository at this point in the history
This reverts commit f4d6fdfaff603fa4dd60b5d78e8f5035ceb219f7 because a
lot of TSAN tests were broken in it.
  • Loading branch information
mbautin committed Sep 14, 2018
1 parent 54f580a commit 9560a12
Show file tree
Hide file tree
Showing 11 changed files with 433 additions and 87 deletions.
14 changes: 0 additions & 14 deletions src/yb/consensus/consensus-test-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
#include "yb/consensus/raft_consensus.h"
#include "yb/gutil/map-util.h"
#include "yb/gutil/strings/substitute.h"
#include "yb/rpc/messenger.h"
#include "yb/server/clock.h"
#include "yb/util/countdown_latch.h"
#include "yb/util/locks.h"
Expand All @@ -73,7 +72,6 @@ namespace yb {
namespace consensus {

using log::Log;
using rpc::Messenger;
using strings::Substitute;

inline ReplicateMsgPtr CreateDummyReplicate(int term,
Expand Down Expand Up @@ -389,19 +387,13 @@ class NoOpTestPeerProxyFactory : public PeerProxyFactory {
public:
NoOpTestPeerProxyFactory() {
CHECK_OK(ThreadPoolBuilder("test-peer-pool").set_max_threads(3).Build(&pool_));
messenger_ = CHECK_RESULT(rpc::MessengerBuilder("test").Build());
}

PeerProxyPtr NewProxy(const RaftPeerPB& peer_pb) override {
return std::make_unique<NoOpTestPeerProxy>(pool_.get(), peer_pb);
}

std::shared_ptr<rpc::Messenger> messenger() const override {
return messenger_;
}

gscoped_ptr<ThreadPool> pool_;
std::shared_ptr<rpc::Messenger> messenger_;
};

typedef std::unordered_map<std::string, std::shared_ptr<RaftConsensus> > TestPeerMap;
Expand Down Expand Up @@ -600,7 +592,6 @@ class LocalTestPeerProxyFactory : public PeerProxyFactory {
explicit LocalTestPeerProxyFactory(TestPeerMapManager* peers)
: peers_(peers) {
CHECK_OK(ThreadPoolBuilder("test-peer-pool").set_max_threads(3).Build(&pool_));
messenger_ = CHECK_RESULT(rpc::MessengerBuilder("test").Build());
}

PeerProxyPtr NewProxy(const consensus::RaftPeerPB& peer_pb) override {
Expand All @@ -614,13 +605,8 @@ class LocalTestPeerProxyFactory : public PeerProxyFactory {
return proxies_;
}

std::shared_ptr<rpc::Messenger> messenger() const override {
return messenger_;
}

private:
gscoped_ptr<ThreadPool> pool_;
std::shared_ptr<rpc::Messenger> messenger_;
TestPeerMapManager* const peers_;
// NOTE: There is no need to delete this on the dctor because proxies are externally managed
vector<LocalTestPeerProxy*> proxies_;
Expand Down
15 changes: 3 additions & 12 deletions src/yb/consensus/consensus_peers-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
#include "yb/consensus/log_util.h"
#include "yb/consensus/opid_util.h"
#include "yb/fs/fs_manager.h"
#include "yb/rpc/messenger.h"
#include "yb/server/hybrid_clock.h"
#include "yb/util/metrics.h"
#include "yb/util/test_macros.h"
Expand All @@ -63,8 +62,6 @@ namespace consensus {
using log::Log;
using log::LogOptions;
using log::LogAnchorRegistry;
using rpc::Messenger;
using rpc::MessengerBuilder;
using std::shared_ptr;
using std::unique_ptr;

Expand All @@ -82,8 +79,6 @@ class ConsensusPeersTest : public YBTest {

void SetUp() override {
YBTest::SetUp();
MessengerBuilder bld("test");
messenger_ = ASSERT_RESULT(bld.Build());
ASSERT_OK(ThreadPoolBuilder("test-raft-pool").Build(&raft_pool_));
raft_pool_token_ = raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT);
ASSERT_OK(ThreadPoolBuilder("append").Build(&append_pool_));
Expand Down Expand Up @@ -122,7 +117,6 @@ class ConsensusPeersTest : public YBTest {
ASSERT_OK(log_->WaitUntilAllFlushed());
append_pool_->Shutdown();
raft_pool_->Shutdown();
messenger_->Shutdown();
}

DelayablePeerProxy<NoOpTestPeerProxy>* NewRemotePeer(
Expand All @@ -134,7 +128,7 @@ class ConsensusPeersTest : public YBTest {
raft_pool_.get(), new NoOpTestPeerProxy(raft_pool_.get(), peer_pb));
*peer = CHECK_RESULT(Peer::NewRemotePeer(
peer_pb, kTabletId, kLeaderUuid, message_queue_.get(), raft_pool_token_.get(),
PeerProxyPtr(proxy_ptr), nullptr /* consensus */, messenger_));
PeerProxyPtr(proxy_ptr), nullptr /* consensus */));
return proxy_ptr;
}

Expand Down Expand Up @@ -171,7 +165,6 @@ class ConsensusPeersTest : public YBTest {
LogOptions options_;
unique_ptr<ThreadPoolToken> raft_pool_token_;
scoped_refptr<server::Clock> clock_;
shared_ptr<Messenger> messenger_;
};

// Tests that a remote peer is correctly built and tracked
Expand Down Expand Up @@ -318,8 +311,7 @@ TEST_F(ConsensusPeersTest, TestCloseWhenRemotePeerDoesntMakeProgress) {
auto mock_proxy = new MockedPeerProxy(raft_pool_.get());
auto peer = ASSERT_RESULT(Peer::NewRemotePeer(
FakeRaftPeerPB(kFollowerUuid), kTabletId, kLeaderUuid, message_queue_.get(),
raft_pool_token_.get(), PeerProxyPtr(mock_proxy), nullptr /* consensus */,
messenger_));
raft_pool_token_.get(), PeerProxyPtr(mock_proxy), nullptr /* consensus */));

// Make the peer respond without making any progress -- it always returns
// that it has only replicated op 0.0. When we see the response, we always
Expand Down Expand Up @@ -347,8 +339,7 @@ TEST_F(ConsensusPeersTest, TestDontSendOneRpcPerWriteWhenPeerIsDown) {
auto mock_proxy = new MockedPeerProxy(raft_pool_.get());
auto peer = ASSERT_RESULT(Peer::NewRemotePeer(
FakeRaftPeerPB(kFollowerUuid), kTabletId, kLeaderUuid, message_queue_.get(),
raft_pool_token_.get(), PeerProxyPtr(mock_proxy), nullptr /* consensus */,
messenger_));
raft_pool_token_.get(), PeerProxyPtr(mock_proxy), nullptr /* consensus */));

BOOST_SCOPE_EXIT(&peer) {
// This guarantees that the Peer object doesn't get destroyed if there is a pending request.
Expand Down
42 changes: 12 additions & 30 deletions src/yb/consensus/consensus_peers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
#include "yb/gutil/stl_util.h"
#include "yb/gutil/strings/substitute.h"
#include "yb/rpc/messenger.h"
#include "yb/rpc/periodic.h"
#include "yb/tserver/tserver.pb.h"

#include "yb/util/backoff_waiter.h"
Expand Down Expand Up @@ -91,7 +90,6 @@ using log::Log;
using log::LogEntryBatch;
using std::shared_ptr;
using rpc::Messenger;
using rpc::PeriodicTimer;
using rpc::RpcController;
using strings::Substitute;

Expand All @@ -101,49 +99,35 @@ Result<PeerPtr> Peer::NewRemotePeer(const RaftPeerPB& peer_pb,
PeerMessageQueue* queue,
ThreadPoolToken* raft_pool_token,
PeerProxyPtr proxy,
Consensus* consensus,
std::shared_ptr<rpc::Messenger> messenger) {
Consensus* consensus) {
auto new_peer = std::make_shared<Peer>(
peer_pb, tablet_id, leader_uuid, std::move(proxy), queue, raft_pool_token, consensus,
std::move(messenger));
peer_pb, tablet_id, leader_uuid, std::move(proxy), queue, raft_pool_token, consensus);
RETURN_NOT_OK(new_peer->Init());
return Result<PeerPtr>(std::move(new_peer));
}

Peer::Peer(
const RaftPeerPB& peer_pb, string tablet_id, string leader_uuid, PeerProxyPtr proxy,
PeerMessageQueue* queue, ThreadPoolToken* raft_pool_token, Consensus* consensus,
std::shared_ptr<rpc::Messenger> messenger)
PeerMessageQueue* queue, ThreadPoolToken* raft_pool_token, Consensus* consensus)
: tablet_id_(std::move(tablet_id)),
leader_uuid_(std::move(leader_uuid)),
peer_pb_(peer_pb),
proxy_(std::move(proxy)),
queue_(queue),
heartbeater_(
peer_pb.permanent_uuid(), MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms),
std::bind(&Peer::SignalRequest, this, RequestTriggerMode::kAlwaysSend)),
raft_pool_token_(raft_pool_token),
consensus_(consensus),
messenger_(std::move(messenger)) {}
consensus_(consensus) {}

void Peer::SetTermForTest(int term) {
response_.set_responder_term(term);
}

Status Peer::Init() {
{
std::lock_guard<simple_spinlock> lock(peer_lock_);
queue_->TrackPeer(peer_pb_.permanent_uuid());
}
// Capture a weak_ptr reference into the functor so it can safely handle
// outliving the peer.
std::weak_ptr<Peer> w = shared_from_this();
heartbeater_ = PeriodicTimer::Create(
messenger_,
[w]() {
if (auto p = w.lock()) {
Status s = p->SignalRequest(RequestTriggerMode::kAlwaysSend);
}
},
MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms));
heartbeater_->Start();
std::lock_guard<simple_spinlock> lock(peer_lock_);
queue_->TrackPeer(peer_pb_.permanent_uuid());
RETURN_NOT_OK(heartbeater_.Start());
state_ = kPeerStarted;
return Status::OK();
}
Expand Down Expand Up @@ -289,7 +273,7 @@ void Peer::SendNextRequest(RequestTriggerMode trigger_mode) {

// If we're actually sending ops there's no need to heartbeat for a while, reset the heartbeater.
if (req_has_ops) {
heartbeater_->Snooze();
heartbeater_.Reset();
}

MAYBE_FAULT(FLAGS_fault_crash_on_leader_request_fraction);
Expand Down Expand Up @@ -438,9 +422,7 @@ string Peer::LogPrefix() const {
}

void Peer::Close() {
if (heartbeater_) {
heartbeater_->Stop();
}
WARN_NOT_OK(heartbeater_.Stop(), "Could not stop heartbeater");

// If the peer is already closed return.
{
Expand Down
15 changes: 4 additions & 11 deletions src/yb/consensus/consensus_peers.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,14 @@
#include "yb/util/countdown_latch.h"
#include "yb/util/locks.h"
#include "yb/util/net/net_util.h"
#include "yb/util/resettable_heartbeater.h"
#include "yb/util/semaphore.h"
#include "yb/util/status.h"

namespace yb {
class HostPort;
class ThreadPoolToken;

namespace rpc {
class Messenger;
class PeriodicTimer;
}

namespace log {
class Log;
}
Expand Down Expand Up @@ -127,8 +123,7 @@ class Peer : public std::enable_shared_from_this<Peer> {
public:
Peer(const RaftPeerPB& peer, std::string tablet_id, std::string leader_uuid,
PeerProxyPtr proxy, PeerMessageQueue* queue,
ThreadPoolToken* raft_pool_token, Consensus* consensus,
std::shared_ptr<rpc::Messenger> messenger);
ThreadPoolToken* raft_pool_token, Consensus* consensus);

// Initializes a peer and get its status.
CHECKED_STATUS Init();
Expand Down Expand Up @@ -170,8 +165,7 @@ class Peer : public std::enable_shared_from_this<Peer> {
PeerMessageQueue* queue,
ThreadPoolToken* raft_pool_token,
PeerProxyPtr proxy,
Consensus* consensus,
std::shared_ptr<rpc::Messenger> messenger);
Consensus* consensus);

uint64_t failed_attempts() {
std::lock_guard<simple_spinlock> l(peer_lock_);
Expand Down Expand Up @@ -248,7 +242,7 @@ class Peer : public std::enable_shared_from_this<Peer> {

// Heartbeater for remote peer implementations. This will send status only requests to the remote
// peers whenever we go more than 'FLAGS_raft_heartbeat_interval_ms' without sending actual data.
std::shared_ptr<rpc::PeriodicTimer> heartbeater_;
ResettableHeartbeater heartbeater_;

// Thread pool used to construct requests to this peer.
ThreadPoolToken* raft_pool_token_;
Expand All @@ -265,7 +259,6 @@ class Peer : public std::enable_shared_from_this<Peer> {
mutable simple_spinlock peer_lock_;
State state_ = kPeerCreated;
Consensus* consensus_ = nullptr;
std::shared_ptr<rpc::Messenger> messenger_;
};

// A proxy to another peer. Usually a thin wrapper around an rpc proxy but can be replaced for
Expand Down
12 changes: 0 additions & 12 deletions src/yb/consensus/leader_election-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#include "yb/consensus/leader_election.h"

#include <functional>
#include <memory>
#include <string>
#include <vector>

Expand All @@ -47,14 +46,8 @@
#include "yb/util/test_util.h"

namespace yb {

namespace rpc {
class Messenger;
} // namespace rpc

namespace consensus {

using std::shared_ptr;
using std::string;
using std::unordered_map;
using std::vector;
Expand Down Expand Up @@ -110,16 +103,11 @@ class FromMapPeerProxyFactory : public PeerProxyFactory {
used_peer_proxy_.clear();
}

shared_ptr<rpc::Messenger> messenger() const override {
return null_messenger_;
}

private:
// FYI, the tests may add and remove nodes from this map while we hold a
// reference to it.
const ProxyMap* const proxy_map_;
std::set<string> used_peer_proxy_;
shared_ptr<rpc::Messenger> null_messenger_;
};

class LeaderElectionTest : public YBTest {
Expand Down
2 changes: 1 addition & 1 deletion src/yb/consensus/peer_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void PeerManager::UpdateRaftConfig(const RaftConfigPB& config) {
VLOG(1) << GetLogPrefix() << "Adding remote peer. Peer: " << peer_pb.ShortDebugString();
auto remote_peer = Peer::NewRemotePeer(
peer_pb, tablet_id_, local_uuid_, queue_, raft_pool_token_,
peer_proxy_factory_->NewProxy(peer_pb), consensus_, peer_proxy_factory_->messenger());
peer_proxy_factory_->NewProxy(peer_pb), consensus_);
if (!remote_peer.ok()) {
LOG(WARNING) << "Failed to create remote peer for " << peer_pb.ShortDebugString() << ": "
<< remote_peer.status();
Expand Down
11 changes: 4 additions & 7 deletions src/yb/consensus/raft_consensus_quorum-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -745,20 +745,17 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderHeartbeats) {
int repl0_init_count = counter_hook_rpl0->num_pre_update_calls();
int repl1_init_count = counter_hook_rpl1->num_pre_update_calls();

// Now wait for about 4 times the heartbeat period the counters
// should have increased between 3 to 8 times.
//
// Why the variance? Heartbeat timing is jittered such that the period
// between heartbeats can be anywhere from half the interval to the full interval.
// Now wait for about 4 times the hearbeat period the counters
// should have increased 3/4 times.
SleepFor(MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms * 4));

int repl0_final_count = counter_hook_rpl0->num_pre_update_calls();
int repl1_final_count = counter_hook_rpl1->num_pre_update_calls();

ASSERT_GE(repl0_final_count - repl0_init_count, 3);
ASSERT_LE(repl0_final_count - repl0_init_count, 8);
ASSERT_LE(repl0_final_count - repl0_init_count, 4);
ASSERT_GE(repl1_final_count - repl1_init_count, 3);
ASSERT_LE(repl1_final_count - repl1_init_count, 8);
ASSERT_LE(repl1_final_count - repl1_init_count, 4);

VerifyLogs(2, 0, 1);
}
Expand Down
2 changes: 2 additions & 0 deletions src/yb/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ set(UTIL_SRCS
pstack_watcher.cc
random_util.cc
ref_cnt_buffer.cc
resettable_heartbeater.cc
rolling_log.cc
rw_mutex.cc
rwc_lock.cc
Expand Down Expand Up @@ -361,6 +362,7 @@ ADD_YB_TEST(pstack_watcher-test)
ADD_YB_TEST(ref_cnt_buffer-test)
ADD_YB_TEST(random-test)
ADD_YB_TEST(random_util-test)
ADD_YB_TEST(resettable_heartbeater-test)
ADD_YB_TEST(result-test)
ADD_YB_TEST(rle-test)
ADD_YB_TEST(rolling_log-test)
Expand Down
Loading

0 comments on commit 9560a12

Please sign in to comment.