diff --git a/manager/state/raft/raft.go b/manager/state/raft/raft.go index ba77ae0ef6..6187e11938 100644 --- a/manager/state/raft/raft.go +++ b/manager/state/raft/raft.go @@ -412,7 +412,7 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) { defer conn.Close() client := api.NewRaftMembershipClient(conn) - joinCtx, joinCancel := context.WithTimeout(ctx, 10*time.Second) + joinCtx, joinCancel := context.WithTimeout(ctx, n.reqTimeout()) defer joinCancel() resp, err := client.Join(joinCtx, &api.JoinRequest{ Addr: n.opts.Addr, @@ -1030,6 +1030,10 @@ func (n *Node) UpdateNode(id uint64, addr string) { // from a member who is willing to leave its raft // membership to an active member of the raft func (n *Node) Leave(ctx context.Context, req *api.LeaveRequest) (*api.LeaveResponse, error) { + if req.Node == nil { + return nil, grpc.Errorf(codes.InvalidArgument, "no node information provided") + } + nodeInfo, err := ca.RemoteNode(ctx) if err != nil { return nil, err @@ -1100,18 +1104,72 @@ func (n *Node) removeMember(ctx context.Context, id uint64) error { n.membershipLock.Lock() defer n.membershipLock.Unlock() - if n.CanRemoveMember(id) { - cc := raftpb.ConfChange{ - ID: id, - Type: raftpb.ConfChangeRemoveNode, - NodeID: id, - Context: []byte(""), + if !n.CanRemoveMember(id) { + return ErrCannotRemoveMember + } + + if id == n.Config.ID { + // To avoid a corner case with self-removal, transfer leadership + // to another node and ask that node to remove us. + if err := n.removeSelfGracefully(ctx); err != nil { + log.G(ctx).WithError(err).Error("failed to leave raft cluster gracefully") + } else { + return nil } - err := n.configure(ctx, cc) + } + + cc := raftpb.ConfChange{ + ID: id, + Type: raftpb.ConfChangeRemoveNode, + NodeID: id, + Context: []byte(""), + } + return n.configure(ctx, cc) +} + +// removeSelfGracefully initiates a leadership transfer and calls Leave on the +// new leader. It must be called with stopMu held. +func (n *Node) removeSelfGracefully(ctx context.Context) error { + transferCtx, cancelTransfer := context.WithTimeout(ctx, n.reqTimeout()) + defer cancelTransfer() + + if err := n.transferLeadership(transferCtx); err != nil { + return errors.Wrap(err, "failed to transfer leadership") + } + conn, err := n.transport.PeerConn(n.leader()) + if err != nil { return err } + _, err = api.NewRaftMembershipClient(conn).Leave(ctx, &api.LeaveRequest{Node: &api.RaftMember{RaftID: n.Config.ID}}) + return err +} - return ErrCannotRemoveMember +// transferLeadership attempts to transfer leadership to a different node, +// and wait for the transfer to happen. It must be called with stopMu held. +func (n *Node) transferLeadership(ctx context.Context) error { + transferee, err := n.transport.LongestActive() + if err != nil { + return errors.Wrap(err, "failed to get longest-active member") + } + start := time.Now() + log.G(ctx).Infof("raft: transfer leadership %x -> %x", n.Config.ID, transferee) + n.raftNode.TransferLeadership(ctx, n.Config.ID, transferee) + ticker := time.NewTicker(n.opts.TickInterval / 10) + defer ticker.Stop() + var leader uint64 + for { + leader = n.leader() + if leader != raft.None && leader != n.Config.ID { + break + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + } + log.G(ctx).Infof("raft: transfer leadership %x -> %x finished in %v", n.Config.ID, leader, time.Since(start)) + return nil } // RemoveMember submits a configuration change to remove a member from the raft cluster @@ -1738,7 +1796,6 @@ func (n *Node) applyRemoveNode(ctx context.Context, cc raftpb.ConfChange) (err e // while n.asyncTasks.Wait() could be helpful in this case // it's the best-effort strategy, because this send could be fail due to some errors (such as time limit exceeds) - // TODO(Runshen Zhu): use leadership transfer to solve this case, after vendoring raft 3.0+ } else { if err := n.transport.RemovePeer(cc.NodeID); err != nil { return err @@ -1852,3 +1909,7 @@ func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 { } return sids } + +func (n *Node) reqTimeout() time.Duration { + return 5*time.Second + 2*time.Duration(n.Config.ElectionTick)*n.opts.TickInterval +} diff --git a/manager/state/raft/transport/transport.go b/manager/state/raft/transport/transport.go index ec1e971cf3..9ce8efd51b 100644 --- a/manager/state/raft/transport/transport.go +++ b/manager/state/raft/transport/transport.go @@ -295,6 +295,19 @@ func (t *Transport) Active(id uint64) bool { return active } +// LongestActive returns the ID of the peer that has been active for the longest +// length of time. +func (t *Transport) LongestActive() (uint64, error) { + p, err := t.longestActive() + if err != nil { + return 0, err + } + + return p.id, nil +} + +// longestActive returns the peer that has been active for the longest length of +// time. func (t *Transport) longestActive() (*peer, error) { var longest *peer var longestTime time.Time