diff --git a/manager/controlapi/node_test.go b/manager/controlapi/node_test.go index 71c11ca902..bde8526bf7 100644 --- a/manager/controlapi/node_test.go +++ b/manager/controlapi/node_test.go @@ -532,7 +532,7 @@ func TestUpdateNode(t *testing.T) { assert.Error(t, err) } -func testUpdateNodeDemote(leader bool, t *testing.T) { +func testUpdateNodeDemote(t *testing.T) { tc := cautils.NewTestCA(nil) defer tc.Stop() ts := newTestServer(t) @@ -654,14 +654,8 @@ func testUpdateNodeDemote(leader bool, t *testing.T) { return nil })) - var demoteNode, lastNode *raftutils.TestNode - if leader { - demoteNode = nodes[1] - lastNode = nodes[2] - } else { - demoteNode = nodes[2] - lastNode = nodes[1] - } + demoteNode := nodes[2] + lastNode := nodes[1] raftMember = ts.Server.raft.GetMemberByNodeID(demoteNode.SecurityConfig.ClientTLSCreds.NodeID()) assert.NotNil(t, raftMember) @@ -734,10 +728,5 @@ func testUpdateNodeDemote(leader bool, t *testing.T) { func TestUpdateNodeDemote(t *testing.T) { t.Parallel() - testUpdateNodeDemote(false, t) -} - -func TestUpdateNodeDemoteLeader(t *testing.T) { - t.Parallel() - testUpdateNodeDemote(true, t) + testUpdateNodeDemote(t) } diff --git a/manager/role_manager.go b/manager/role_manager.go index 63dbfb8505..4d5d3f19a0 100644 --- a/manager/role_manager.go +++ b/manager/role_manager.go @@ -133,12 +133,19 @@ func (rm *roleManager) reconcileRole(node *api.Node) { return } - rmCtx, rmCancel := context.WithTimeout(rm.ctx, 5*time.Second) + rmCtx, rmCancel := context.WithTimeout(context.Background(), 5*time.Second) defer rmCancel() if err := rm.raft.RemoveMember(rmCtx, member.RaftID); err != nil { - // TODO(aaronl): Retry later - log.L.WithError(err).Debugf("can't demote node %s at this time", node.ID) + if err == raft.ErrCantRemoveSelf { + // Don't use rmCtx, because we expect to lose + // leadership, which will cancel this context. + log.L.Info("demoted; ceding leadership") + rm.raft.TransferLeadership(context.Background()) + } else { + // TODO(aaronl): Retry later + log.L.WithError(err).Debugf("can't demote node %s at this time", node.ID) + } return } } diff --git a/manager/state/raft/raft.go b/manager/state/raft/raft.go index ba77ae0ef6..d276787a9a 100644 --- a/manager/state/raft/raft.go +++ b/manager/state/raft/raft.go @@ -57,6 +57,9 @@ var ( // ErrMemberUnknown is sent in response to a message from an // unrecognized peer. ErrMemberUnknown = errors.New("raft: member unknown") + // ErrCantRemoveSelf is returned if RemoveMember is called with the + // local node as the argument. + ErrCantRemoveSelf = errors.New("raft: can't remove self") ) // LeadershipState indicates whether the node is a leader or follower. @@ -412,7 +415,7 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) { defer conn.Close() client := api.NewRaftMembershipClient(conn) - joinCtx, joinCancel := context.WithTimeout(ctx, 10*time.Second) + joinCtx, joinCancel := context.WithTimeout(ctx, n.reqTimeout()) defer joinCancel() resp, err := client.Join(joinCtx, &api.JoinRequest{ Addr: n.opts.Addr, @@ -1030,6 +1033,10 @@ func (n *Node) UpdateNode(id uint64, addr string) { // from a member who is willing to leave its raft // membership to an active member of the raft func (n *Node) Leave(ctx context.Context, req *api.LeaveRequest) (*api.LeaveResponse, error) { + if req.Node == nil { + return nil, grpc.Errorf(codes.InvalidArgument, "no node information provided") + } + nodeInfo, err := ca.RemoteNode(ctx) if err != nil { return nil, err @@ -1086,6 +1093,10 @@ func (n *Node) CanRemoveMember(id uint64) bool { } func (n *Node) removeMember(ctx context.Context, id uint64) error { + if id == n.Config.ID { + return ErrCantRemoveSelf + } + // can't stop the raft node while an async RPC is in progress n.stopMu.RLock() defer n.stopMu.RUnlock() @@ -1100,18 +1111,56 @@ func (n *Node) removeMember(ctx context.Context, id uint64) error { n.membershipLock.Lock() defer n.membershipLock.Unlock() - if n.CanRemoveMember(id) { - cc := raftpb.ConfChange{ - ID: id, - Type: raftpb.ConfChangeRemoveNode, - NodeID: id, - Context: []byte(""), - } - err := n.configure(ctx, cc) - return err + if !n.CanRemoveMember(id) { + return ErrCannotRemoveMember } - return ErrCannotRemoveMember + cc := raftpb.ConfChange{ + ID: id, + Type: raftpb.ConfChangeRemoveNode, + NodeID: id, + Context: []byte(""), + } + return n.configure(ctx, cc) +} + +// TransferLeadership attempts to transfer leadership to a different node, +// and wait for the transfer to happen. +func (n *Node) TransferLeadership(ctx context.Context) error { + ctx, cancelTransfer := context.WithTimeout(ctx, n.reqTimeout()) + defer cancelTransfer() + + n.stopMu.RLock() + defer n.stopMu.RUnlock() + + if !n.IsMember() { + return ErrNoRaftMember + } + + if !n.isLeader() { + return ErrLostLeadership + } + + transferee, err := n.transport.LongestActive() + if err != nil { + return errors.Wrap(err, "failed to get longest-active member") + } + n.raftNode.TransferLeadership(ctx, n.Config.ID, transferee) + ticker := time.NewTicker(n.opts.TickInterval / 10) + defer ticker.Stop() + var leader uint64 + for { + leader = n.leader() + if leader != raft.None && leader != n.Config.ID { + break + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + } + return nil } // RemoveMember submits a configuration change to remove a member from the raft cluster @@ -1726,23 +1775,12 @@ func (n *Node) applyRemoveNode(ctx context.Context, cc raftpb.ConfChange) (err e } if cc.NodeID == n.Config.ID { - // wait the commit ack to be sent before closing connection + // wait for the commit ack to be sent before closing connection n.asyncTasks.Wait() n.NodeRemoved() - // if there are only 2 nodes in the cluster, and leader is leaving - // before closing the connection, leader has to ensure that follower gets - // noticed about this raft conf change commit. Otherwise, follower would - // assume there are still 2 nodes in the cluster and won't get elected - // into the leader by acquiring the majority (2 nodes) - - // while n.asyncTasks.Wait() could be helpful in this case - // it's the best-effort strategy, because this send could be fail due to some errors (such as time limit exceeds) - // TODO(Runshen Zhu): use leadership transfer to solve this case, after vendoring raft 3.0+ - } else { - if err := n.transport.RemovePeer(cc.NodeID); err != nil { - return err - } + } else if err := n.transport.RemovePeer(cc.NodeID); err != nil { + return err } return n.cluster.RemoveMember(cc.NodeID) @@ -1852,3 +1890,7 @@ func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 { } return sids } + +func (n *Node) reqTimeout() time.Duration { + return 5*time.Second + 2*time.Duration(n.Config.ElectionTick)*n.opts.TickInterval +} diff --git a/manager/state/raft/raft_test.go b/manager/state/raft/raft_test.go index 303fde4791..56999f3d4a 100644 --- a/manager/state/raft/raft_test.go +++ b/manager/state/raft/raft_test.go @@ -327,74 +327,6 @@ func TestRaftFollowerLeave(t *testing.T) { assert.Len(t, nodes[4].GetMemberlist(), 4) } -func TestRaftLeaderLeave(t *testing.T) { - t.Parallel() - - nodes, clockSource := raftutils.NewRaftCluster(t, tc) - defer raftutils.TeardownCluster(t, nodes) - - // node 1 is the leader - assert.Equal(t, nodes[1].Leader(), nodes[1].Config.ID) - - // Try to leave the raft - // Use gRPC instead of calling handler directly because of - // authorization check. - cc, err := dial(nodes[1], nodes[1].Address) - assert.NoError(t, err) - raftClient := api.NewRaftMembershipClient(cc) - defer cc.Close() - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - resp, err := raftClient.Leave(ctx, &api.LeaveRequest{Node: &api.RaftMember{RaftID: nodes[1].Config.ID}}) - assert.NoError(t, err, "error sending message to leave the raft") - assert.NotNil(t, resp, "leave response message is nil") - - newCluster := map[uint64]*raftutils.TestNode{ - 2: nodes[2], - 3: nodes[3], - } - // Wait for election tick - raftutils.WaitForCluster(t, clockSource, newCluster) - - // Leader should not be 1 - assert.NotEqual(t, nodes[2].Leader(), nodes[1].Config.ID) - assert.Equal(t, nodes[2].Leader(), nodes[3].Leader()) - - leader := nodes[2].Leader() - - // Find the leader node and a follower node - var ( - leaderNode *raftutils.TestNode - followerNode *raftutils.TestNode - ) - for i, n := range nodes { - if n.Config.ID == leader { - leaderNode = n - if i == 2 { - followerNode = nodes[3] - } else { - followerNode = nodes[2] - } - } - } - - require.NotNil(t, leaderNode) - require.NotNil(t, followerNode) - - // Propose a value - value, err := raftutils.ProposeValue(t, leaderNode, DefaultProposalTime) - assert.NoError(t, err, "failed to propose value") - - // The value should be replicated on all remaining nodes - raftutils.CheckValue(t, clockSource, leaderNode, value) - assert.Len(t, leaderNode.GetMemberlist(), 2) - - raftutils.CheckValue(t, clockSource, followerNode, value) - assert.Len(t, followerNode.GetMemberlist(), 2) - - raftutils.TeardownCluster(t, newCluster) -} - func TestRaftNewNodeGetsData(t *testing.T) { t.Parallel() diff --git a/manager/state/raft/transport/transport.go b/manager/state/raft/transport/transport.go index ec1e971cf3..9ce8efd51b 100644 --- a/manager/state/raft/transport/transport.go +++ b/manager/state/raft/transport/transport.go @@ -295,6 +295,19 @@ func (t *Transport) Active(id uint64) bool { return active } +// LongestActive returns the ID of the peer that has been active for the longest +// length of time. +func (t *Transport) LongestActive() (uint64, error) { + p, err := t.longestActive() + if err != nil { + return 0, err + } + + return p.id, nil +} + +// longestActive returns the peer that has been active for the longest length of +// time. func (t *Transport) longestActive() (*peer, error) { var longest *peer var longestTime time.Time