diff --git a/manager/dispatcher/dispatcher_test.go b/manager/dispatcher/dispatcher_test.go index ba66b6a63e..43246f3221 100644 --- a/manager/dispatcher/dispatcher_test.go +++ b/manager/dispatcher/dispatcher_test.go @@ -290,7 +290,7 @@ func TestHeartbeatTimeout(t *testing.T) { time.Sleep(500 * time.Millisecond) gd.Store.View(func(readTx store.ReadTx) { - storeNodes, err := store.FindNodes(readTx, store.All) + storeNodes, err := store.FindNodes(readTx, store.ByIDPrefix(gd.SecurityConfigs[0].ClientTLSCreds.NodeID())) assert.NoError(t, err) assert.NotEmpty(t, storeNodes) assert.Equal(t, api.NodeStatus_DOWN, storeNodes[0].Status.State) @@ -754,6 +754,6 @@ func TestNodesCount(t *testing.T) { stream.Recv() } assert.Equal(t, 6, gd.dispatcherServer.NodeCount()) - time.Sleep(500 * time.Millisecond) + time.Sleep(700 * time.Millisecond) assert.Equal(t, 0, gd.dispatcherServer.NodeCount()) } diff --git a/manager/dispatcher/nodes.go b/manager/dispatcher/nodes.go index 2d0ea076a5..aa962a01b1 100644 --- a/manager/dispatcher/nodes.go +++ b/manager/dispatcher/nodes.go @@ -43,26 +43,29 @@ func (rn *registeredNode) checkSessionID(sessionID string) error { } type nodeStore struct { - periodChooser *periodChooser - gracePeriodMultiplier time.Duration - rateLimitPeriod time.Duration - nodes map[string]*registeredNode - mu sync.RWMutex + periodChooser *periodChooser + gracePeriodMultiplierNormal time.Duration + gracePeriodMultiplierUnknown time.Duration + rateLimitPeriod time.Duration + nodes map[string]*registeredNode + mu sync.RWMutex } func newNodeStore(hbPeriod, hbEpsilon time.Duration, graceMultiplier int, rateLimitPeriod time.Duration) *nodeStore { return &nodeStore{ - nodes: make(map[string]*registeredNode), - periodChooser: newPeriodChooser(hbPeriod, hbEpsilon), - gracePeriodMultiplier: time.Duration(graceMultiplier), - rateLimitPeriod: rateLimitPeriod, + nodes: make(map[string]*registeredNode), + periodChooser: newPeriodChooser(hbPeriod, hbEpsilon), + gracePeriodMultiplierNormal: time.Duration(graceMultiplier), + gracePeriodMultiplierUnknown: time.Duration(graceMultiplier) * 2, + rateLimitPeriod: rateLimitPeriod, } } func (s *nodeStore) updatePeriod(hbPeriod, hbEpsilon time.Duration, gracePeriodMultiplier int) { s.mu.Lock() s.periodChooser = newPeriodChooser(hbPeriod, hbEpsilon) - s.gracePeriodMultiplier = time.Duration(gracePeriodMultiplier) + s.gracePeriodMultiplierNormal = time.Duration(gracePeriodMultiplier) + s.gracePeriodMultiplierUnknown = s.gracePeriodMultiplierNormal * 2 s.mu.Unlock() } @@ -79,7 +82,7 @@ func (s *nodeStore) AddUnknown(n *api.Node, expireFunc func()) error { Node: n, } s.nodes[n.ID] = rn - rn.Heartbeat = heartbeat.New(s.periodChooser.Choose()*s.gracePeriodMultiplier, expireFunc) + rn.Heartbeat = heartbeat.New(s.periodChooser.Choose()*s.gracePeriodMultiplierUnknown, expireFunc) return nil } @@ -124,7 +127,7 @@ func (s *nodeStore) Add(n *api.Node, expireFunc func()) *registeredNode { Disconnect: make(chan struct{}), } s.nodes[n.ID] = rn - rn.Heartbeat = heartbeat.New(s.periodChooser.Choose()*s.gracePeriodMultiplier, expireFunc) + rn.Heartbeat = heartbeat.New(s.periodChooser.Choose()*s.gracePeriodMultiplierNormal, expireFunc) return rn } @@ -154,7 +157,7 @@ func (s *nodeStore) Heartbeat(id, sid string) (time.Duration, error) { return 0, err } period := s.periodChooser.Choose() // base period for node - grace := period * time.Duration(s.gracePeriodMultiplier) + grace := period * time.Duration(s.gracePeriodMultiplierNormal) rn.mu.Lock() rn.Heartbeat.Update(grace) rn.Heartbeat.Beat()