From 392bdc335c0cae9178249bda62621bfe56e896c5 Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Wed, 8 Feb 2017 15:22:09 -0800 Subject: [PATCH] raft: Use TransferLeadership to make leader demotion safer When we demote the leader, we currently wait for all queued messages to be sent, as a best-effort approach to making sure the other nodes find out that the node removal has been committed, and stop treating the current leader as a cluster member. This doesn't work perfectly. To make this more robust, use TransferLeadership when the leader is trying to remove itself. The new leader's reconcilation loop will kick in and remove the old leader. Signed-off-by: Aaron Lehmann --- manager/controlapi/node_test.go | 19 +---- manager/role_manager.go | 13 +++- manager/state/raft/raft.go | 92 +++++++++++++++++------ manager/state/raft/raft_test.go | 68 ----------------- manager/state/raft/transport/transport.go | 13 ++++ 5 files changed, 94 insertions(+), 111 deletions(-) diff --git a/manager/controlapi/node_test.go b/manager/controlapi/node_test.go index 71c11ca902..bde8526bf7 100644 --- a/manager/controlapi/node_test.go +++ b/manager/controlapi/node_test.go @@ -532,7 +532,7 @@ func TestUpdateNode(t *testing.T) { assert.Error(t, err) } -func testUpdateNodeDemote(leader bool, t *testing.T) { +func testUpdateNodeDemote(t *testing.T) { tc := cautils.NewTestCA(nil) defer tc.Stop() ts := newTestServer(t) @@ -654,14 +654,8 @@ func testUpdateNodeDemote(leader bool, t *testing.T) { return nil })) - var demoteNode, lastNode *raftutils.TestNode - if leader { - demoteNode = nodes[1] - lastNode = nodes[2] - } else { - demoteNode = nodes[2] - lastNode = nodes[1] - } + demoteNode := nodes[2] + lastNode := nodes[1] raftMember = ts.Server.raft.GetMemberByNodeID(demoteNode.SecurityConfig.ClientTLSCreds.NodeID()) assert.NotNil(t, raftMember) @@ -734,10 +728,5 @@ func testUpdateNodeDemote(leader bool, t *testing.T) { func TestUpdateNodeDemote(t *testing.T) { t.Parallel() - testUpdateNodeDemote(false, t) -} - -func TestUpdateNodeDemoteLeader(t *testing.T) { - t.Parallel() - testUpdateNodeDemote(true, t) + testUpdateNodeDemote(t) } diff --git a/manager/role_manager.go b/manager/role_manager.go index 63dbfb8505..c3a3fdfa65 100644 --- a/manager/role_manager.go +++ b/manager/role_manager.go @@ -133,12 +133,19 @@ func (rm *roleManager) reconcileRole(node *api.Node) { return } - rmCtx, rmCancel := context.WithTimeout(rm.ctx, 5*time.Second) + rmCtx, rmCancel := context.WithTimeout(context.Background(), 5*time.Second) defer rmCancel() if err := rm.raft.RemoveMember(rmCtx, member.RaftID); err != nil { - // TODO(aaronl): Retry later - log.L.WithError(err).Debugf("can't demote node %s at this time", node.ID) + if err == raft.ErrCannotRemoveSelf { + // Don't use rmCtx, because we expect to lose + // leadership, which will cancel this context. + log.L.Info("demoted; transferring leadership") + rm.raft.TransferLeadership(context.Background()) + } else { + // TODO(aaronl): Retry later + log.L.WithError(err).Debugf("can't demote node %s at this time", node.ID) + } return } } diff --git a/manager/state/raft/raft.go b/manager/state/raft/raft.go index ba77ae0ef6..38c65c5737 100644 --- a/manager/state/raft/raft.go +++ b/manager/state/raft/raft.go @@ -57,6 +57,9 @@ var ( // ErrMemberUnknown is sent in response to a message from an // unrecognized peer. ErrMemberUnknown = errors.New("raft: member unknown") + // ErrCannotRemoveSelf is returned if RemoveMember is called with the + // local node as the argument. + ErrCannotRemoveSelf = errors.New("raft: can't remove self") ) // LeadershipState indicates whether the node is a leader or follower. @@ -412,7 +415,7 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) { defer conn.Close() client := api.NewRaftMembershipClient(conn) - joinCtx, joinCancel := context.WithTimeout(ctx, 10*time.Second) + joinCtx, joinCancel := context.WithTimeout(ctx, n.reqTimeout()) defer joinCancel() resp, err := client.Join(joinCtx, &api.JoinRequest{ Addr: n.opts.Addr, @@ -1030,6 +1033,10 @@ func (n *Node) UpdateNode(id uint64, addr string) { // from a member who is willing to leave its raft // membership to an active member of the raft func (n *Node) Leave(ctx context.Context, req *api.LeaveRequest) (*api.LeaveResponse, error) { + if req.Node == nil { + return nil, grpc.Errorf(codes.InvalidArgument, "no node information provided") + } + nodeInfo, err := ca.RemoteNode(ctx) if err != nil { return nil, err @@ -1086,6 +1093,10 @@ func (n *Node) CanRemoveMember(id uint64) bool { } func (n *Node) removeMember(ctx context.Context, id uint64) error { + if id == n.Config.ID { + return ErrCannotRemoveSelf + } + // can't stop the raft node while an async RPC is in progress n.stopMu.RLock() defer n.stopMu.RUnlock() @@ -1100,18 +1111,56 @@ func (n *Node) removeMember(ctx context.Context, id uint64) error { n.membershipLock.Lock() defer n.membershipLock.Unlock() - if n.CanRemoveMember(id) { - cc := raftpb.ConfChange{ - ID: id, - Type: raftpb.ConfChangeRemoveNode, - NodeID: id, - Context: []byte(""), - } - err := n.configure(ctx, cc) - return err + if !n.CanRemoveMember(id) { + return ErrCannotRemoveMember } - return ErrCannotRemoveMember + cc := raftpb.ConfChange{ + ID: id, + Type: raftpb.ConfChangeRemoveNode, + NodeID: id, + Context: []byte(""), + } + return n.configure(ctx, cc) +} + +// TransferLeadership attempts to transfer leadership to a different node, +// and wait for the transfer to happen. +func (n *Node) TransferLeadership(ctx context.Context) error { + ctx, cancelTransfer := context.WithTimeout(ctx, n.reqTimeout()) + defer cancelTransfer() + + n.stopMu.RLock() + defer n.stopMu.RUnlock() + + if !n.IsMember() { + return ErrNoRaftMember + } + + if !n.isLeader() { + return ErrLostLeadership + } + + transferee, err := n.transport.LongestActive() + if err != nil { + return errors.Wrap(err, "failed to get longest-active member") + } + n.raftNode.TransferLeadership(ctx, n.Config.ID, transferee) + ticker := time.NewTicker(n.opts.TickInterval / 10) + defer ticker.Stop() + var leader uint64 + for { + leader = n.leader() + if leader != raft.None && leader != n.Config.ID { + break + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + } + return nil } // RemoveMember submits a configuration change to remove a member from the raft cluster @@ -1726,23 +1775,12 @@ func (n *Node) applyRemoveNode(ctx context.Context, cc raftpb.ConfChange) (err e } if cc.NodeID == n.Config.ID { - // wait the commit ack to be sent before closing connection + // wait for the commit ack to be sent before closing connection n.asyncTasks.Wait() n.NodeRemoved() - // if there are only 2 nodes in the cluster, and leader is leaving - // before closing the connection, leader has to ensure that follower gets - // noticed about this raft conf change commit. Otherwise, follower would - // assume there are still 2 nodes in the cluster and won't get elected - // into the leader by acquiring the majority (2 nodes) - - // while n.asyncTasks.Wait() could be helpful in this case - // it's the best-effort strategy, because this send could be fail due to some errors (such as time limit exceeds) - // TODO(Runshen Zhu): use leadership transfer to solve this case, after vendoring raft 3.0+ - } else { - if err := n.transport.RemovePeer(cc.NodeID); err != nil { - return err - } + } else if err := n.transport.RemovePeer(cc.NodeID); err != nil { + return err } return n.cluster.RemoveMember(cc.NodeID) @@ -1852,3 +1890,7 @@ func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 { } return sids } + +func (n *Node) reqTimeout() time.Duration { + return 5*time.Second + 2*time.Duration(n.Config.ElectionTick)*n.opts.TickInterval +} diff --git a/manager/state/raft/raft_test.go b/manager/state/raft/raft_test.go index 303fde4791..56999f3d4a 100644 --- a/manager/state/raft/raft_test.go +++ b/manager/state/raft/raft_test.go @@ -327,74 +327,6 @@ func TestRaftFollowerLeave(t *testing.T) { assert.Len(t, nodes[4].GetMemberlist(), 4) } -func TestRaftLeaderLeave(t *testing.T) { - t.Parallel() - - nodes, clockSource := raftutils.NewRaftCluster(t, tc) - defer raftutils.TeardownCluster(t, nodes) - - // node 1 is the leader - assert.Equal(t, nodes[1].Leader(), nodes[1].Config.ID) - - // Try to leave the raft - // Use gRPC instead of calling handler directly because of - // authorization check. - cc, err := dial(nodes[1], nodes[1].Address) - assert.NoError(t, err) - raftClient := api.NewRaftMembershipClient(cc) - defer cc.Close() - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - resp, err := raftClient.Leave(ctx, &api.LeaveRequest{Node: &api.RaftMember{RaftID: nodes[1].Config.ID}}) - assert.NoError(t, err, "error sending message to leave the raft") - assert.NotNil(t, resp, "leave response message is nil") - - newCluster := map[uint64]*raftutils.TestNode{ - 2: nodes[2], - 3: nodes[3], - } - // Wait for election tick - raftutils.WaitForCluster(t, clockSource, newCluster) - - // Leader should not be 1 - assert.NotEqual(t, nodes[2].Leader(), nodes[1].Config.ID) - assert.Equal(t, nodes[2].Leader(), nodes[3].Leader()) - - leader := nodes[2].Leader() - - // Find the leader node and a follower node - var ( - leaderNode *raftutils.TestNode - followerNode *raftutils.TestNode - ) - for i, n := range nodes { - if n.Config.ID == leader { - leaderNode = n - if i == 2 { - followerNode = nodes[3] - } else { - followerNode = nodes[2] - } - } - } - - require.NotNil(t, leaderNode) - require.NotNil(t, followerNode) - - // Propose a value - value, err := raftutils.ProposeValue(t, leaderNode, DefaultProposalTime) - assert.NoError(t, err, "failed to propose value") - - // The value should be replicated on all remaining nodes - raftutils.CheckValue(t, clockSource, leaderNode, value) - assert.Len(t, leaderNode.GetMemberlist(), 2) - - raftutils.CheckValue(t, clockSource, followerNode, value) - assert.Len(t, followerNode.GetMemberlist(), 2) - - raftutils.TeardownCluster(t, newCluster) -} - func TestRaftNewNodeGetsData(t *testing.T) { t.Parallel() diff --git a/manager/state/raft/transport/transport.go b/manager/state/raft/transport/transport.go index ec1e971cf3..9ce8efd51b 100644 --- a/manager/state/raft/transport/transport.go +++ b/manager/state/raft/transport/transport.go @@ -295,6 +295,19 @@ func (t *Transport) Active(id uint64) bool { return active } +// LongestActive returns the ID of the peer that has been active for the longest +// length of time. +func (t *Transport) LongestActive() (uint64, error) { + p, err := t.longestActive() + if err != nil { + return 0, err + } + + return p.id, nil +} + +// longestActive returns the peer that has been active for the longest length of +// time. func (t *Transport) longestActive() (*peer, error) { var longest *peer var longestTime time.Time