Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Failover follower to leader #6937

Merged
merged 73 commits into from
Dec 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
1195ac7
init commit
CAJan93 Dec 16, 2022
d0c2ee4
Failover from follower to leader
CAJan93 Dec 16, 2022
bec44d7
add notes
CAJan93 Dec 16, 2022
0bc2fa5
disable commit tx commit_trx
CAJan93 Dec 16, 2022
30b8e4a
fix: Failover crashes hummock
CAJan93 Dec 16, 2022
dd7d5c8
Merge branch 'main' of ssh://github.com/risingwavelabs/risingwave int…
CAJan93 Dec 16, 2022
4d60e25
Merge branch 'main' of ssh://github.com/risingwavelabs/risingwave int…
CAJan93 Dec 19, 2022
ad3d442
introduce leader_svc mod
CAJan93 Dec 19, 2022
18e3ebb
delete unecessary var definitions
CAJan93 Dec 19, 2022
642a2dc
remove notes
CAJan93 Dec 19, 2022
8a073c8
Merge branch 'main' of ssh://github.com/risingwavelabs/risingwave int…
CAJan93 Dec 19, 2022
f8bfa6e
copyright
CAJan93 Dec 19, 2022
6b7ce59
risedev c
CAJan93 Dec 19, 2022
11e5b68
typo
CAJan93 Dec 19, 2022
4f3797d
notes
CAJan93 Dec 19, 2022
7513e2b
docstrings
CAJan93 Dec 19, 2022
5d69654
expect instead of panic
CAJan93 Dec 19, 2022
178f9a4
change logging
CAJan93 Dec 19, 2022
aafe067
Merge branch 'main' of ssh://github.com/risingwavelabs/risingwave int…
CAJan93 Dec 19, 2022
f733d9f
remove note
CAJan93 Dec 19, 2022
9ea6454
change import
CAJan93 Dec 19, 2022
9588151
remove comments
CAJan93 Dec 19, 2022
810c4f1
remove log line
CAJan93 Dec 19, 2022
3f6fcb3
Merge branch 'main' of ssh://github.com/risingwavelabs/risingwave int…
CAJan93 Dec 20, 2022
c36d7d0
change docstrings
CAJan93 Dec 20, 2022
0a2b5a8
leader_info_to_host_addr
CAJan93 Dec 20, 2022
916d92a
move start services to leader_svc
CAJan93 Dec 20, 2022
d3eb296
use follower service handle
CAJan93 Dec 20, 2022
9a3af54
node_is_leader
CAJan93 Dec 20, 2022
c68227a
remove was_follower
CAJan93 Dec 20, 2022
1845b9c
rename leader_svc
CAJan93 Dec 20, 2022
a105a34
move follower definition to leader_svc
CAJan93 Dec 20, 2022
33a1281
risedev c
CAJan93 Dec 20, 2022
f9e6a89
Merge branch 'main' of ssh://github.com/risingwavelabs/risingwave int…
CAJan93 Dec 20, 2022
31f98be
nit
CAJan93 Dec 21, 2022
6f3dff7
rename var
CAJan93 Dec 21, 2022
beacd7b
implement test feedback
CAJan93 Dec 21, 2022
0f82487
test is working
CAJan93 Dec 21, 2022
257983d
remove notes
CAJan93 Dec 21, 2022
695d9e6
simplify test a little
CAJan93 Dec 21, 2022
d55b1ca
Test single leader with diff amount of nodes
CAJan93 Dec 21, 2022
fd85bab
cleanup
CAJan93 Dec 21, 2022
2097471
docstring + notes
CAJan93 Dec 21, 2022
efcc000
add failover test
CAJan93 Dec 21, 2022
6156013
Merge branch 'main' of ssh://github.com/risingwavelabs/risingwave int…
CAJan93 Dec 21, 2022
193e977
add failover tests
CAJan93 Dec 21, 2022
472f574
remove notes
CAJan93 Dec 21, 2022
d2b9311
risedev c
CAJan93 Dec 21, 2022
5da3944
minor change
CAJan93 Dec 21, 2022
048278e
add helper func
CAJan93 Dec 21, 2022
96d91a9
try using handle during setup
CAJan93 Dec 21, 2022
9baa5cb
break node status loop if leader sender drops
CAJan93 Dec 22, 2022
333f10b
Merge branch 'main' of ssh://github.com/risingwavelabs/risingwave int…
CAJan93 Dec 22, 2022
a406ffe
create chanel manually + debugging
CAJan93 Dec 22, 2022
6f0ce60
test seems to work
CAJan93 Dec 22, 2022
c1a6b2f
fix tests
CAJan93 Dec 22, 2022
a837ea8
remove debug tests
CAJan93 Dec 22, 2022
971a98b
continue on channel err
CAJan93 Dec 22, 2022
5c4ddbd
skip deleted leader node in failover tests
CAJan93 Dec 22, 2022
9ec41dd
disable meta recovery
CAJan93 Dec 22, 2022
396bbe3
Merge branch 'main' of ssh://github.com/risingwavelabs/risingwave int…
CAJan93 Dec 22, 2022
2d27085
Leader no longer updated in env
CAJan93 Dec 22, 2022
24b47ab
minor changes
CAJan93 Dec 22, 2022
78b2de5
use #[cfg(test)]
CAJan93 Dec 23, 2022
4a22219
change sleep time to 5
CAJan93 Dec 23, 2022
225050f
Merge branch 'main' of ssh://github.com/risingwavelabs/risingwave int…
CAJan93 Dec 23, 2022
313cd41
nit
CAJan93 Dec 23, 2022
00fd0cb
remove large single_leader_setup tests
CAJan93 Dec 23, 2022
8add8d5
remove large failover tests
CAJan93 Dec 23, 2022
e14fabd
node_controllers
CAJan93 Dec 23, 2022
5ec5c83
Merge branch 'main' of ssh://github.com/risingwavelabs/risingwave int…
CAJan93 Dec 23, 2022
5fb03b5
wait for shutdown signal
CAJan93 Dec 23, 2022
c43c6ba
Merge branch 'main' into failover_follower_to_leader
mergify[bot] Dec 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/common/src/util/addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,16 @@ use std::net::SocketAddr;
use std::str::FromStr;

use risingwave_pb::common::HostAddress as ProstHostAddress;
use risingwave_pb::meta::MetaLeaderInfo;

use crate::error::{internal_error, Result};

pub fn leader_info_to_host_addr(mli: MetaLeaderInfo) -> HostAddr {
mli.node_address
.parse::<HostAddr>()
.expect("invalid leader addr")
}

/// General host address and port.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct HostAddr {
Expand All @@ -31,7 +38,6 @@ impl std::fmt::Display for HostAddr {
write!(f, "{}:{}", self.host, self.port)
}
}

impl From<SocketAddr> for HostAddr {
fn from(addr: SocketAddr) -> Self {
HostAddr {
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ where
return Err(Error::InvalidContext(context_id));
}
}

trx.check_equal(
META_CF_NAME.to_owned(),
META_LEADER_KEY.as_bytes().to_vec(),
Expand Down
57 changes: 19 additions & 38 deletions src/meta/src/rpc/elections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use prost::Message;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use risingwave_common::util::addr::HostAddr;
use risingwave_pb::meta::{MetaLeaderInfo, MetaLeaseInfo};
use tokio::sync::oneshot::Sender;
use tokio::sync::watch::Receiver;
Expand All @@ -48,26 +47,17 @@ struct ElectionResult {
pub is_leader: bool,
}

impl ElectionResult {
pub fn get_leader_addr(&self) -> HostAddr {
self.meta_leader_info
.node_address
.parse::<HostAddr>()
.expect("invalid leader addr")
}
}

/// Runs for election in an attempt to become leader
///
/// ## Returns
/// Returns `ElectionResult`, containing infos about the node who won the election or
/// `MetaError` if the election ran into an error
///
/// ## Arguments
/// `meta_store`: The meta store which holds the lease, deciding about the election result
/// `addr`: Address of the node that runs for election
/// `lease_time_sec`: Amount of seconds that this lease will be valid
/// `next_lease_id`: If the node wins, the lease used until the next election will have this id
///
/// ## Returns
/// Returns `ElectionResult`, containing infos about the node who won the election or
/// `MetaError` if the election ran into an error
async fn campaign<S: MetaStore>(
meta_store: &Arc<S>,
addr: &String,
Expand Down Expand Up @@ -175,15 +165,16 @@ async fn campaign<S: MetaStore>(

/// Try to renew/acquire the leader lease
///
/// ## Returns
/// True if node was leader and was able to renew/acquire the lease.
/// False if node was follower and thus could not renew/acquire lease.
/// `MetaError` if operation ran into an error
///
/// ## Arguments
/// `leader_info`: Info of the node that trie
/// `lease_time_sec`: Time in seconds that the lease is valid
/// `meta_store`: Store which holds the lease
/// `meta_store`: Store which holds the lease#
///
/// ## Returns
/// True if node was leader and was able to renew/acquire the lease.
/// False if node was follower and thus could not renew/acquire lease.
/// `MetaError` if operation ran into an error
async fn renew_lease<S: MetaStore>(
leader_info: &MetaLeaderInfo,
lease_time_sec: u64,
Expand Down Expand Up @@ -220,7 +211,7 @@ async fn renew_lease<S: MetaStore>(

/// Retrieve infos about the current leader
///
/// ## Attributes:
/// ## Arguments:
/// `meta_store`: The store holding information about the leader
///
/// ## Returns
Expand Down Expand Up @@ -299,7 +290,7 @@ pub async fn run_elections<S: MetaStore>(
MetaLeaderInfo,
JoinHandle<()>,
Sender<()>,
Receiver<(HostAddr, bool)>,
Receiver<(MetaLeaderInfo, bool)>,
)> {
// Randomize interval to reduce mitigate likelihood of simultaneous requests
let mut rng: StdRng = SeedableRng::from_entropy();
Expand All @@ -317,14 +308,10 @@ pub async fn run_elections<S: MetaStore>(
gen_rand_lease_id(addr.as_str()),
)
.await;
let (leader_addr, initial_leader, is_initial_leader) = match election_result {
let (initial_leader, is_initial_leader) = match election_result {
Ok(elect_result) => {
tracing::info!("initial election finished");
(
elect_result.get_leader_addr(),
elect_result.meta_leader_info,
elect_result.is_leader,
)
(elect_result.meta_leader_info, elect_result.is_leader)
}
Err(_) => {
tracing::info!("initial election failed. Repeating election");
Expand All @@ -344,7 +331,8 @@ pub async fn run_elections<S: MetaStore>(

// define all follow up elections and terms in handle
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
let (leader_tx, leader_rx) = tokio::sync::watch::channel((leader_addr, is_initial_leader));
let (leader_tx, leader_rx) =
tokio::sync::watch::channel((initial_leader.clone(), is_initial_leader));
let handle = tokio::spawn(async move {
// runs all followup elections
let mut ticker = tokio::time::interval(
Expand All @@ -355,12 +343,10 @@ pub async fn run_elections<S: MetaStore>(

let mut is_leader = is_initial_leader;
let mut leader_info = initial_leader.clone();
let n_addr = initial_leader.node_address.as_str();
let mut leader_addr = n_addr.parse::<HostAddr>().unwrap();
'election: loop {
// Do not elect new leader directly after running the initial election
if !initial_election {
let (leader_addr_, leader_info_, is_leader_) = match campaign(
let (leader_info_, is_leader_) = match campaign(
&meta_store,
&addr,
lease_time_sec,
Expand All @@ -375,11 +361,7 @@ pub async fn run_elections<S: MetaStore>(
}
Ok(elect_result) => {
tracing::info!("election finished");
(
elect_result.get_leader_addr(),
elect_result.meta_leader_info,
elect_result.is_leader,
)
(elect_result.meta_leader_info, elect_result.is_leader)
}
};

Expand All @@ -392,12 +374,11 @@ pub async fn run_elections<S: MetaStore>(
}
leader_info = leader_info_;
is_leader = is_leader_;
leader_addr = leader_addr_;
}
initial_election = false;

// signal to observers if there is a change in leadership
leader_tx.send((leader_addr.clone(), is_leader)).unwrap();
leader_tx.send((leader_info.clone(), is_leader)).unwrap();

// election done. Enter the term of the current leader
// Leader stays in power until leader crashes
Expand Down
57 changes: 57 additions & 0 deletions src/meta/src/rpc/follower_svc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use risingwave_pb::health::health_server::HealthServer;
use tokio::sync::oneshot::Receiver as OneReceiver;
use tokio::sync::watch::Receiver as WatchReceiver;

use super::intercept::MetricsMiddlewareLayer;
use super::server::AddressInfo;
use super::service::health_service::HealthServiceImpl;
use crate::rpc::metrics::MetaMetrics;

/// Starts all services needed for the meta follower node
pub async fn start_follower_srv(
mut svc_shutdown_rx: WatchReceiver<()>,
follower_shutdown_rx: OneReceiver<()>,
address_info: AddressInfo,
) {
let health_srv = HealthServiceImpl::new();
tonic::transport::Server::builder()
.layer(MetricsMiddlewareLayer::new(Arc::new(MetaMetrics::new())))
.add_service(HealthServer::new(health_srv))
.serve_with_shutdown(address_info.listen_addr, async move {
tokio::select! {
_ = tokio::signal::ctrl_c() => {},
// shutdown service if all services should be shut down
res = svc_shutdown_rx.changed() => {
match res {
Ok(_) => tracing::info!("Shutting down services"),
Err(_) => tracing::error!("Service shutdown sender dropped")
}
},
// shutdown service if follower becomes leader
res = follower_shutdown_rx => {
match res {
Ok(_) => tracing::info!("Shutting down follower services"),
Err(_) => tracing::error!("Follower service shutdown sender dropped")
}
},
}
})
.await
.unwrap();
}
Loading