Skip to content

Commit

Permalink
raft: Use TransferLeadership to make leader demotion safer
Browse files Browse the repository at this point in the history
When we demote the leader, we currently wait for all queued messages to
be sent, as a best-effort approach to making sure the other nodes find
out that the node removal has been committed, and stop treating the
current leader as a cluster member. This doesn't work perfectly.

To make this more robust, use TransferLeadership when the leader is
trying to remove itself, then call the Leave RPC on the new leader once
the transfer has completed.

Signed-off-by: Aaron Lehmann <[email protected]>
  • Loading branch information
aaronlehmann committed Feb 9, 2017
1 parent ce8e78a commit 4e0b605
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 10 deletions.
81 changes: 71 additions & 10 deletions manager/state/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) {
defer conn.Close()
client := api.NewRaftMembershipClient(conn)

joinCtx, joinCancel := context.WithTimeout(ctx, 10*time.Second)
joinCtx, joinCancel := context.WithTimeout(ctx, n.reqTimeout())
defer joinCancel()
resp, err := client.Join(joinCtx, &api.JoinRequest{
Addr: n.opts.Addr,
Expand Down Expand Up @@ -1030,6 +1030,10 @@ func (n *Node) UpdateNode(id uint64, addr string) {
// from a member who is willing to leave its raft
// membership to an active member of the raft
func (n *Node) Leave(ctx context.Context, req *api.LeaveRequest) (*api.LeaveResponse, error) {
if req.Node == nil {
return nil, grpc.Errorf(codes.InvalidArgument, "no node information provided")
}

nodeInfo, err := ca.RemoteNode(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1100,18 +1104,72 @@ func (n *Node) removeMember(ctx context.Context, id uint64) error {

n.membershipLock.Lock()
defer n.membershipLock.Unlock()
if n.CanRemoveMember(id) {
cc := raftpb.ConfChange{
ID: id,
Type: raftpb.ConfChangeRemoveNode,
NodeID: id,
Context: []byte(""),
if !n.CanRemoveMember(id) {
return ErrCannotRemoveMember
}

if id == n.Config.ID {
// To avoid a corner case with self-removal, transfer leadership
// to another node and ask that node to remove us.
if err := n.removeSelfGracefully(ctx); err != nil {
log.G(ctx).WithError(err).Error("failed to leave raft cluster gracefully")
} else {
return nil
}
err := n.configure(ctx, cc)
}

cc := raftpb.ConfChange{
ID: id,
Type: raftpb.ConfChangeRemoveNode,
NodeID: id,
Context: []byte(""),
}
return n.configure(ctx, cc)
}

// removeSelfGracefully initiates a leadership transfer and calls Leave on the
// new leader. It must be called with stopMu held.
func (n *Node) removeSelfGracefully(ctx context.Context) error {
transferCtx, cancelTransfer := context.WithTimeout(ctx, n.reqTimeout())
defer cancelTransfer()

if err := n.transferLeadership(transferCtx); err != nil {
return errors.Wrap(err, "failed to transfer leadership")
}
conn, err := n.transport.PeerConn(n.leader())
if err != nil {
return err
}
_, err = api.NewRaftMembershipClient(conn).Leave(ctx, &api.LeaveRequest{Node: &api.RaftMember{RaftID: n.Config.ID}})
return err
}

return ErrCannotRemoveMember
// transferLeadership attempts to transfer leadership to a different node,
// and wait for the transfer to happen. It must be called with stopMu held.
func (n *Node) transferLeadership(ctx context.Context) error {
transferee, err := n.transport.LongestActive()
if err != nil {
return errors.Wrap(err, "failed to get longest-active member")
}
start := time.Now()
log.G(ctx).Infof("raft: transfer leadership %x -> %x", n.Config.ID, transferee)
n.raftNode.TransferLeadership(ctx, n.Config.ID, transferee)
ticker := time.NewTicker(n.opts.TickInterval / 10)
defer ticker.Stop()
var leader uint64
for {
leader = n.leader()
if leader != raft.None && leader != n.Config.ID {
break
}
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
}
log.G(ctx).Infof("raft: transfer leadership %x -> %x finished in %v", n.Config.ID, leader, time.Since(start))
return nil
}

// RemoveMember submits a configuration change to remove a member from the raft cluster
Expand Down Expand Up @@ -1738,7 +1796,6 @@ func (n *Node) applyRemoveNode(ctx context.Context, cc raftpb.ConfChange) (err e

// while n.asyncTasks.Wait() could be helpful in this case
// it's the best-effort strategy, because this send could be fail due to some errors (such as time limit exceeds)
// TODO(Runshen Zhu): use leadership transfer to solve this case, after vendoring raft 3.0+
} else {
if err := n.transport.RemovePeer(cc.NodeID); err != nil {
return err
Expand Down Expand Up @@ -1852,3 +1909,7 @@ func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
}
return sids
}

func (n *Node) reqTimeout() time.Duration {
return 5*time.Second + 2*time.Duration(n.Config.ElectionTick)*n.opts.TickInterval
}
13 changes: 13 additions & 0 deletions manager/state/raft/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,19 @@ func (t *Transport) Active(id uint64) bool {
return active
}

// LongestActive returns the ID of the peer that has been active for the longest
// length of time.
func (t *Transport) LongestActive() (uint64, error) {
p, err := t.longestActive()
if err != nil {
return 0, err
}

return p.id, nil
}

// longestActive returns the peer that has been active for the longest length of
// time.
func (t *Transport) longestActive() (*peer, error) {
var longest *peer
var longestTime time.Time
Expand Down

0 comments on commit 4e0b605

Please sign in to comment.