diff --git a/src/yb/consensus/consensus-test-util.h b/src/yb/consensus/consensus-test-util.h index 06a28a512936..bb641ca0f61d 100644 --- a/src/yb/consensus/consensus-test-util.h +++ b/src/yb/consensus/consensus-test-util.h @@ -52,7 +52,6 @@ #include "yb/consensus/raft_consensus.h" #include "yb/gutil/map-util.h" #include "yb/gutil/strings/substitute.h" -#include "yb/rpc/messenger.h" #include "yb/server/clock.h" #include "yb/util/countdown_latch.h" #include "yb/util/locks.h" @@ -73,7 +72,6 @@ namespace yb { namespace consensus { using log::Log; -using rpc::Messenger; using strings::Substitute; inline ReplicateMsgPtr CreateDummyReplicate(int term, @@ -389,19 +387,13 @@ class NoOpTestPeerProxyFactory : public PeerProxyFactory { public: NoOpTestPeerProxyFactory() { CHECK_OK(ThreadPoolBuilder("test-peer-pool").set_max_threads(3).Build(&pool_)); - messenger_ = CHECK_RESULT(rpc::MessengerBuilder("test").Build()); } PeerProxyPtr NewProxy(const RaftPeerPB& peer_pb) override { return std::make_unique(pool_.get(), peer_pb); } - std::shared_ptr messenger() const override { - return messenger_; - } - gscoped_ptr pool_; - std::shared_ptr messenger_; }; typedef std::unordered_map > TestPeerMap; @@ -600,7 +592,6 @@ class LocalTestPeerProxyFactory : public PeerProxyFactory { explicit LocalTestPeerProxyFactory(TestPeerMapManager* peers) : peers_(peers) { CHECK_OK(ThreadPoolBuilder("test-peer-pool").set_max_threads(3).Build(&pool_)); - messenger_ = CHECK_RESULT(rpc::MessengerBuilder("test").Build()); } PeerProxyPtr NewProxy(const consensus::RaftPeerPB& peer_pb) override { @@ -614,13 +605,8 @@ class LocalTestPeerProxyFactory : public PeerProxyFactory { return proxies_; } - std::shared_ptr messenger() const override { - return messenger_; - } - private: gscoped_ptr pool_; - std::shared_ptr messenger_; TestPeerMapManager* const peers_; // NOTE: There is no need to delete this on the dctor because proxies are externally managed vector proxies_; diff --git a/src/yb/consensus/consensus_peers-test.cc b/src/yb/consensus/consensus_peers-test.cc index a143f9f0c9f7..e7aaa1514cea 100644 --- a/src/yb/consensus/consensus_peers-test.cc +++ b/src/yb/consensus/consensus_peers-test.cc @@ -45,7 +45,6 @@ #include "yb/consensus/log_util.h" #include "yb/consensus/opid_util.h" #include "yb/fs/fs_manager.h" -#include "yb/rpc/messenger.h" #include "yb/server/hybrid_clock.h" #include "yb/util/metrics.h" #include "yb/util/test_macros.h" @@ -63,8 +62,6 @@ namespace consensus { using log::Log; using log::LogOptions; using log::LogAnchorRegistry; -using rpc::Messenger; -using rpc::MessengerBuilder; using std::shared_ptr; using std::unique_ptr; @@ -82,8 +79,6 @@ class ConsensusPeersTest : public YBTest { void SetUp() override { YBTest::SetUp(); - MessengerBuilder bld("test"); - messenger_ = ASSERT_RESULT(bld.Build()); ASSERT_OK(ThreadPoolBuilder("test-raft-pool").Build(&raft_pool_)); raft_pool_token_ = raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT); ASSERT_OK(ThreadPoolBuilder("append").Build(&append_pool_)); @@ -122,7 +117,6 @@ class ConsensusPeersTest : public YBTest { ASSERT_OK(log_->WaitUntilAllFlushed()); append_pool_->Shutdown(); raft_pool_->Shutdown(); - messenger_->Shutdown(); } DelayablePeerProxy* NewRemotePeer( @@ -134,7 +128,7 @@ class ConsensusPeersTest : public YBTest { raft_pool_.get(), new NoOpTestPeerProxy(raft_pool_.get(), peer_pb)); *peer = CHECK_RESULT(Peer::NewRemotePeer( peer_pb, kTabletId, kLeaderUuid, message_queue_.get(), raft_pool_token_.get(), - PeerProxyPtr(proxy_ptr), nullptr /* consensus */, messenger_)); + PeerProxyPtr(proxy_ptr), nullptr /* consensus */)); return proxy_ptr; } @@ -171,7 +165,6 @@ class ConsensusPeersTest : public YBTest { LogOptions options_; unique_ptr raft_pool_token_; scoped_refptr clock_; - shared_ptr messenger_; }; // Tests that a remote peer is correctly built and tracked @@ -318,8 +311,7 @@ TEST_F(ConsensusPeersTest, TestCloseWhenRemotePeerDoesntMakeProgress) { auto mock_proxy = new MockedPeerProxy(raft_pool_.get()); auto peer = ASSERT_RESULT(Peer::NewRemotePeer( FakeRaftPeerPB(kFollowerUuid), kTabletId, kLeaderUuid, message_queue_.get(), - raft_pool_token_.get(), PeerProxyPtr(mock_proxy), nullptr /* consensus */, - messenger_)); + raft_pool_token_.get(), PeerProxyPtr(mock_proxy), nullptr /* consensus */)); // Make the peer respond without making any progress -- it always returns // that it has only replicated op 0.0. When we see the response, we always @@ -347,8 +339,7 @@ TEST_F(ConsensusPeersTest, TestDontSendOneRpcPerWriteWhenPeerIsDown) { auto mock_proxy = new MockedPeerProxy(raft_pool_.get()); auto peer = ASSERT_RESULT(Peer::NewRemotePeer( FakeRaftPeerPB(kFollowerUuid), kTabletId, kLeaderUuid, message_queue_.get(), - raft_pool_token_.get(), PeerProxyPtr(mock_proxy), nullptr /* consensus */, - messenger_)); + raft_pool_token_.get(), PeerProxyPtr(mock_proxy), nullptr /* consensus */)); BOOST_SCOPE_EXIT(&peer) { // This guarantees that the Peer object doesn't get destroyed if there is a pending request. diff --git a/src/yb/consensus/consensus_peers.cc b/src/yb/consensus/consensus_peers.cc index 8627ffc30c00..9047df18b1d8 100644 --- a/src/yb/consensus/consensus_peers.cc +++ b/src/yb/consensus/consensus_peers.cc @@ -52,7 +52,6 @@ #include "yb/gutil/stl_util.h" #include "yb/gutil/strings/substitute.h" #include "yb/rpc/messenger.h" -#include "yb/rpc/periodic.h" #include "yb/tserver/tserver.pb.h" #include "yb/util/backoff_waiter.h" @@ -91,7 +90,6 @@ using log::Log; using log::LogEntryBatch; using std::shared_ptr; using rpc::Messenger; -using rpc::PeriodicTimer; using rpc::RpcController; using strings::Substitute; @@ -101,49 +99,35 @@ Result Peer::NewRemotePeer(const RaftPeerPB& peer_pb, PeerMessageQueue* queue, ThreadPoolToken* raft_pool_token, PeerProxyPtr proxy, - Consensus* consensus, - std::shared_ptr messenger) { + Consensus* consensus) { auto new_peer = std::make_shared( - peer_pb, tablet_id, leader_uuid, std::move(proxy), queue, raft_pool_token, consensus, - std::move(messenger)); + peer_pb, tablet_id, leader_uuid, std::move(proxy), queue, raft_pool_token, consensus); RETURN_NOT_OK(new_peer->Init()); return Result(std::move(new_peer)); } Peer::Peer( const RaftPeerPB& peer_pb, string tablet_id, string leader_uuid, PeerProxyPtr proxy, - PeerMessageQueue* queue, ThreadPoolToken* raft_pool_token, Consensus* consensus, - std::shared_ptr messenger) + PeerMessageQueue* queue, ThreadPoolToken* raft_pool_token, Consensus* consensus) : tablet_id_(std::move(tablet_id)), leader_uuid_(std::move(leader_uuid)), peer_pb_(peer_pb), proxy_(std::move(proxy)), queue_(queue), + heartbeater_( + peer_pb.permanent_uuid(), MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms), + std::bind(&Peer::SignalRequest, this, RequestTriggerMode::kAlwaysSend)), raft_pool_token_(raft_pool_token), - consensus_(consensus), - messenger_(std::move(messenger)) {} + consensus_(consensus) {} void Peer::SetTermForTest(int term) { response_.set_responder_term(term); } Status Peer::Init() { - { - std::lock_guard lock(peer_lock_); - queue_->TrackPeer(peer_pb_.permanent_uuid()); - } - // Capture a weak_ptr reference into the functor so it can safely handle - // outliving the peer. - std::weak_ptr w = shared_from_this(); - heartbeater_ = PeriodicTimer::Create( - messenger_, - [w]() { - if (auto p = w.lock()) { - Status s = p->SignalRequest(RequestTriggerMode::kAlwaysSend); - } - }, - MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms)); - heartbeater_->Start(); + std::lock_guard lock(peer_lock_); + queue_->TrackPeer(peer_pb_.permanent_uuid()); + RETURN_NOT_OK(heartbeater_.Start()); state_ = kPeerStarted; return Status::OK(); } @@ -289,7 +273,7 @@ void Peer::SendNextRequest(RequestTriggerMode trigger_mode) { // If we're actually sending ops there's no need to heartbeat for a while, reset the heartbeater. if (req_has_ops) { - heartbeater_->Snooze(); + heartbeater_.Reset(); } MAYBE_FAULT(FLAGS_fault_crash_on_leader_request_fraction); @@ -438,9 +422,7 @@ string Peer::LogPrefix() const { } void Peer::Close() { - if (heartbeater_) { - heartbeater_->Stop(); - } + WARN_NOT_OK(heartbeater_.Stop(), "Could not stop heartbeater"); // If the peer is already closed return. { diff --git a/src/yb/consensus/consensus_peers.h b/src/yb/consensus/consensus_peers.h index f8dc03ac8b3a..1b122dc62674 100644 --- a/src/yb/consensus/consensus_peers.h +++ b/src/yb/consensus/consensus_peers.h @@ -52,6 +52,7 @@ #include "yb/util/countdown_latch.h" #include "yb/util/locks.h" #include "yb/util/net/net_util.h" +#include "yb/util/resettable_heartbeater.h" #include "yb/util/semaphore.h" #include "yb/util/status.h" @@ -59,11 +60,6 @@ namespace yb { class HostPort; class ThreadPoolToken; -namespace rpc { -class Messenger; -class PeriodicTimer; -} - namespace log { class Log; } @@ -127,8 +123,7 @@ class Peer : public std::enable_shared_from_this { public: Peer(const RaftPeerPB& peer, std::string tablet_id, std::string leader_uuid, PeerProxyPtr proxy, PeerMessageQueue* queue, - ThreadPoolToken* raft_pool_token, Consensus* consensus, - std::shared_ptr messenger); + ThreadPoolToken* raft_pool_token, Consensus* consensus); // Initializes a peer and get its status. CHECKED_STATUS Init(); @@ -170,8 +165,7 @@ class Peer : public std::enable_shared_from_this { PeerMessageQueue* queue, ThreadPoolToken* raft_pool_token, PeerProxyPtr proxy, - Consensus* consensus, - std::shared_ptr messenger); + Consensus* consensus); uint64_t failed_attempts() { std::lock_guard l(peer_lock_); @@ -248,7 +242,7 @@ class Peer : public std::enable_shared_from_this { // Heartbeater for remote peer implementations. This will send status only requests to the remote // peers whenever we go more than 'FLAGS_raft_heartbeat_interval_ms' without sending actual data. - std::shared_ptr heartbeater_; + ResettableHeartbeater heartbeater_; // Thread pool used to construct requests to this peer. ThreadPoolToken* raft_pool_token_; @@ -265,7 +259,6 @@ class Peer : public std::enable_shared_from_this { mutable simple_spinlock peer_lock_; State state_ = kPeerCreated; Consensus* consensus_ = nullptr; - std::shared_ptr messenger_; }; // A proxy to another peer. Usually a thin wrapper around an rpc proxy but can be replaced for diff --git a/src/yb/consensus/leader_election-test.cc b/src/yb/consensus/leader_election-test.cc index bdf608372ced..0b1dca55e686 100644 --- a/src/yb/consensus/leader_election-test.cc +++ b/src/yb/consensus/leader_election-test.cc @@ -33,7 +33,6 @@ #include "yb/consensus/leader_election.h" #include -#include #include #include @@ -47,14 +46,8 @@ #include "yb/util/test_util.h" namespace yb { - -namespace rpc { -class Messenger; -} // namespace rpc - namespace consensus { -using std::shared_ptr; using std::string; using std::unordered_map; using std::vector; @@ -110,16 +103,11 @@ class FromMapPeerProxyFactory : public PeerProxyFactory { used_peer_proxy_.clear(); } - shared_ptr messenger() const override { - return null_messenger_; - } - private: // FYI, the tests may add and remove nodes from this map while we hold a // reference to it. const ProxyMap* const proxy_map_; std::set used_peer_proxy_; - shared_ptr null_messenger_; }; class LeaderElectionTest : public YBTest { diff --git a/src/yb/consensus/peer_manager.cc b/src/yb/consensus/peer_manager.cc index f2c70032e665..bc240c656418 100644 --- a/src/yb/consensus/peer_manager.cc +++ b/src/yb/consensus/peer_manager.cc @@ -81,7 +81,7 @@ void PeerManager::UpdateRaftConfig(const RaftConfigPB& config) { VLOG(1) << GetLogPrefix() << "Adding remote peer. Peer: " << peer_pb.ShortDebugString(); auto remote_peer = Peer::NewRemotePeer( peer_pb, tablet_id_, local_uuid_, queue_, raft_pool_token_, - peer_proxy_factory_->NewProxy(peer_pb), consensus_, peer_proxy_factory_->messenger()); + peer_proxy_factory_->NewProxy(peer_pb), consensus_); if (!remote_peer.ok()) { LOG(WARNING) << "Failed to create remote peer for " << peer_pb.ShortDebugString() << ": " << remote_peer.status(); diff --git a/src/yb/consensus/raft_consensus_quorum-test.cc b/src/yb/consensus/raft_consensus_quorum-test.cc index c9be1ced7571..36c9fa7173cb 100644 --- a/src/yb/consensus/raft_consensus_quorum-test.cc +++ b/src/yb/consensus/raft_consensus_quorum-test.cc @@ -745,20 +745,17 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderHeartbeats) { int repl0_init_count = counter_hook_rpl0->num_pre_update_calls(); int repl1_init_count = counter_hook_rpl1->num_pre_update_calls(); - // Now wait for about 4 times the heartbeat period the counters - // should have increased between 3 to 8 times. - // - // Why the variance? Heartbeat timing is jittered such that the period - // between heartbeats can be anywhere from half the interval to the full interval. + // Now wait for about 4 times the hearbeat period the counters + // should have increased 3/4 times. SleepFor(MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms * 4)); int repl0_final_count = counter_hook_rpl0->num_pre_update_calls(); int repl1_final_count = counter_hook_rpl1->num_pre_update_calls(); ASSERT_GE(repl0_final_count - repl0_init_count, 3); - ASSERT_LE(repl0_final_count - repl0_init_count, 8); + ASSERT_LE(repl0_final_count - repl0_init_count, 4); ASSERT_GE(repl1_final_count - repl1_init_count, 3); - ASSERT_LE(repl1_final_count - repl1_init_count, 8); + ASSERT_LE(repl1_final_count - repl1_init_count, 4); VerifyLogs(2, 0, 1); } diff --git a/src/yb/util/CMakeLists.txt b/src/yb/util/CMakeLists.txt index 983ffde5e831..879ecd28e9ec 100644 --- a/src/yb/util/CMakeLists.txt +++ b/src/yb/util/CMakeLists.txt @@ -197,6 +197,7 @@ set(UTIL_SRCS pstack_watcher.cc random_util.cc ref_cnt_buffer.cc + resettable_heartbeater.cc rolling_log.cc rw_mutex.cc rwc_lock.cc @@ -361,6 +362,7 @@ ADD_YB_TEST(pstack_watcher-test) ADD_YB_TEST(ref_cnt_buffer-test) ADD_YB_TEST(random-test) ADD_YB_TEST(random_util-test) +ADD_YB_TEST(resettable_heartbeater-test) ADD_YB_TEST(result-test) ADD_YB_TEST(rle-test) ADD_YB_TEST(rolling_log-test) diff --git a/src/yb/util/resettable_heartbeater-test.cc b/src/yb/util/resettable_heartbeater-test.cc new file mode 100644 index 000000000000..e12efd61ef7e --- /dev/null +++ b/src/yb/util/resettable_heartbeater-test.cc @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// The following only applies to changes made to this file as part of YugaByte development. +// +// Portions Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#include "yb/util/resettable_heartbeater.h" + +#include +#include +#include +#include + +#include "yb/gutil/gscoped_ptr.h" +#include "yb/util/countdown_latch.h" +#include "yb/util/locks.h" +#include "yb/util/monotime.h" +#include "yb/util/status.h" +#include "yb/util/test_util.h" + +namespace yb { + +// Number of heartbeats we want to observe before allowing the test to end. +static const int kNumHeartbeats = 2; + +class ResettableHeartbeaterTest : public YBTest { + public: + ResettableHeartbeaterTest() + : YBTest(), + latch_(kNumHeartbeats) { + } + + protected: + void CreateHeartbeater(uint64_t period_ms, const std::string& name) { + period_ms_ = period_ms; + heartbeater_.reset(new ResettableHeartbeater( + name, MonoDelta::FromMilliseconds(period_ms), + std::bind(&ResettableHeartbeaterTest::HeartbeatFunction, this))); + } + + Status HeartbeatFunction() { + latch_.CountDown(); + return Status::OK(); + } + + void WaitForCountDown() { + // Wait a large multiple (in the worst case) of the required time before we + // time out and fail the test. Large to avoid test flakiness. + const uint64_t kMaxWaitMillis = period_ms_ * kNumHeartbeats * 20; + CHECK(latch_.WaitFor(MonoDelta::FromMilliseconds(kMaxWaitMillis))) + << "Failed to count down " << kNumHeartbeats << " times in " << kMaxWaitMillis + << " ms: latch count == " << latch_.count(); + } + + CountDownLatch latch_; + uint64_t period_ms_; + gscoped_ptr heartbeater_; +}; + +// Tests that if Reset() is not called the heartbeat method is called +// the expected number of times. +TEST_F(ResettableHeartbeaterTest, TestRegularHeartbeats) { + const int64_t kHeartbeatPeriodMillis = 100; // Heartbeat every 100ms. + CreateHeartbeater(kHeartbeatPeriodMillis, CURRENT_TEST_NAME()); + ASSERT_OK(heartbeater_->Start()); + WaitForCountDown(); + ASSERT_OK(heartbeater_->Stop()); +} + +// Tests that if we Reset() the heartbeater in a period smaller than +// the heartbeat period the heartbeat method never gets called. +// After we stop resetting heartbeats should resume as normal +TEST_F(ResettableHeartbeaterTest, TestResetHeartbeats) { + const int64_t kHeartbeatPeriodMillis = 800; // Heartbeat every 800ms. + const int64_t kNumResetSlicesPerPeriod = 40; // Reset 40 times per heartbeat period. + // Reset once every 800ms / 40 = 20ms. + const int64_t kResetPeriodMillis = kHeartbeatPeriodMillis / kNumResetSlicesPerPeriod; + + CreateHeartbeater(kHeartbeatPeriodMillis, CURRENT_TEST_NAME()); + ASSERT_OK(heartbeater_->Start()); + // Call Reset() in a loop for 2 heartbeat periods' worth of time, with sleeps + // in-between as defined above. + for (int i = 0; i < kNumResetSlicesPerPeriod * 2; i++) { + heartbeater_->Reset(); + ASSERT_EQ(kNumHeartbeats, latch_.count()); // Ensure we haven't counted down, yet. + SleepFor(MonoDelta::FromMilliseconds(kResetPeriodMillis)); + } + WaitForCountDown(); + ASSERT_OK(heartbeater_->Stop()); +} + +} // namespace yb diff --git a/src/yb/util/resettable_heartbeater.cc b/src/yb/util/resettable_heartbeater.cc new file mode 100644 index 000000000000..33cd65684b16 --- /dev/null +++ b/src/yb/util/resettable_heartbeater.cc @@ -0,0 +1,195 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// The following only applies to changes made to this file as part of YugaByte development. +// +// Portions Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#include "yb/util/resettable_heartbeater.h" + +#include + +#include + +#include "yb/gutil/ref_counted.h" +#include "yb/gutil/strings/substitute.h" +#include "yb/util/countdown_latch.h" +#include "yb/util/locks.h" +#include "yb/util/random.h" +#include "yb/util/status.h" +#include "yb/util/thread.h" + +namespace yb { +using std::string; + +class ResettableHeartbeaterThread { + public: + ResettableHeartbeaterThread(std::string name, MonoDelta period, + HeartbeatFunction function); + + Status Start(); + Status Stop(); + void Reset(); + + private: + void RunThread(); + bool IsCurrentThread() const; + + const string name_; + + // The heartbeat period. + const MonoDelta period_; + + // The function to call to perform the heartbeat + const HeartbeatFunction function_; + + // The actual running thread (NULL before it is started) + scoped_refptr thread_; + + CountDownLatch run_latch_; + + // Whether the heartbeater should shutdown. + bool shutdown_; + + // lock that protects access to 'shutdown_' and to 'run_latch_' + // Reset() method. + mutable simple_spinlock lock_; + DISALLOW_COPY_AND_ASSIGN(ResettableHeartbeaterThread); +}; + +ResettableHeartbeater::ResettableHeartbeater(const std::string& name, + MonoDelta period, + HeartbeatFunction function) + : thread_(new ResettableHeartbeaterThread(name, period, function)) { +} + +Status ResettableHeartbeater::Start() { + return thread_->Start(); +} + +Status ResettableHeartbeater::Stop() { + return thread_->Stop(); +} +void ResettableHeartbeater::Reset() { + thread_->Reset(); +} + +ResettableHeartbeater::~ResettableHeartbeater() { + WARN_NOT_OK(Stop(), "Unable to stop heartbeater thread"); +} + +ResettableHeartbeaterThread::ResettableHeartbeaterThread( + std::string name, MonoDelta period, HeartbeatFunction function) + : name_(std::move(name)), + period_(std::move(period)), + function_(std::move(function)), + run_latch_(0), + shutdown_(false) {} + +void ResettableHeartbeaterThread::RunThread() { + CHECK(IsCurrentThread()); + VLOG(1) << "Heartbeater: " << name_ << " thread starting"; + + bool prev_reset_was_manual = false; + Random rng(random()); + while (true) { + MonoDelta wait_period = period_; + if (prev_reset_was_manual) { + // When the caller does a manual reset, we randomize the subsequent wait + // timeout between period_/2 and period_. This builds in some jitter so + // multiple tablets on the same TS don't end up heartbeating in lockstep. + int64_t half_period_ms = period_.ToMilliseconds() / 2; + wait_period = MonoDelta::FromMilliseconds( + half_period_ms + + rng.NextDoubleFraction() * half_period_ms); + prev_reset_was_manual = false; + } + if (run_latch_.WaitFor(wait_period)) { + // CountDownLatch reached 0 -- this means there was a manual reset. + prev_reset_was_manual = true; + std::lock_guard lock(lock_); + // check if we were told to shutdown + if (shutdown_) { + // Latch fired -- exit loop + VLOG(1) << "Heartbeater: " << name_ << " thread finished"; + return; + } else { + // otherwise it's just a reset, reset the latch + // and continue; + run_latch_.Reset(1); + continue; + } + } + + Status s = function_(); + if (!s.ok()) { + LOG(WARNING)<< "Failed to heartbeat in heartbeater: " << name_ + << " Status: " << s.ToString(); + continue; + } + } +} + +bool ResettableHeartbeaterThread::IsCurrentThread() const { + return thread_.get() == yb::Thread::current_thread(); +} + +Status ResettableHeartbeaterThread::Start() { + CHECK(!thread_); + run_latch_.Reset(1); + return yb::Thread::Create("heartbeater", strings::Substitute("$0-heartbeat", name_), + &ResettableHeartbeaterThread::RunThread, + this, &thread_); +} + +void ResettableHeartbeaterThread::Reset() { + if (!thread_) { + return; + } + run_latch_.CountDown(); +} + +Status ResettableHeartbeaterThread::Stop() { + if (!thread_) { + return Status::OK(); + } + + { + std::lock_guard l(lock_); + if (shutdown_) { + return Status::OK(); + } + shutdown_ = true; + } + + run_latch_.CountDown(); + RETURN_NOT_OK(ThreadJoiner(thread_.get()).Join()); + return Status::OK(); +} + +} // namespace yb diff --git a/src/yb/util/resettable_heartbeater.h b/src/yb/util/resettable_heartbeater.h new file mode 100644 index 000000000000..da2de3e0a48b --- /dev/null +++ b/src/yb/util/resettable_heartbeater.h @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// The following only applies to changes made to this file as part of YugaByte development. +// +// Portions Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#ifndef YB_UTIL_RESETTABLE_HEARTBEATER_H_ +#define YB_UTIL_RESETTABLE_HEARTBEATER_H_ + +#include +#include + +#include "yb/gutil/gscoped_ptr.h" +#include "yb/gutil/macros.h" +#include "yb/util/status.h" + +namespace yb { +class MonoDelta; +class ResettableHeartbeaterThread; + +typedef std::function HeartbeatFunction; + +// A resettable hearbeater that takes a function and calls +// it to perform a regular heartbeat, unless Reset() is called +// in which case the heartbeater resets the heartbeat period. +// The point is to send "I'm Alive" heartbeats only if no regular +// messages are sent in the same period. +// +// TODO Eventually this should be used instead of the master heartbeater +// as it shares a lot of logic with the exception of the specific master +// stuff (and the fact that it is resettable). +// +// TODO We'll have a lot of these per server, so eventually we need +// to refactor this so that multiple heartbeaters share something like +// java's ScheduledExecutor. +// +// TODO Do something about failed hearbeats, right now this is just +// logging. Probably could take more arguments and do more of an +// exponential backoff. +// +// This class is thread safe. +class ResettableHeartbeater { + public: + ResettableHeartbeater(const std::string& name, + MonoDelta period, + HeartbeatFunction function); + + // Starts the heartbeater + CHECKED_STATUS Start(); + + // Stops the hearbeater + CHECKED_STATUS Stop(); + + // Resets the heartbeat period. + // When this is called, the subsequent heartbeat has some built-in jitter and + // may trigger before a full period (as specified to the constructor). + void Reset(); + + ~ResettableHeartbeater(); + private: + gscoped_ptr thread_; + + DISALLOW_COPY_AND_ASSIGN(ResettableHeartbeater); +}; + +} // namespace yb + +#endif /* YB_UTIL_RESETTABLE_HEARTBEATER_H_ */