1
1
package nomad
2
2
3
3
import (
4
+ "context"
4
5
"errors"
5
6
"fmt"
6
7
"net"
@@ -528,7 +529,7 @@ func TestClientEndpoint_UpdateStatus_Vault(t *testing.T) {
528
529
func TestClientEndpoint_UpdateStatus_Reconnect (t * testing.T ) {
529
530
ci .Parallel (t )
530
531
531
- // Setup server with tighther heartbeat so we don't have to wait so long
532
+ // Setup server with tighter heartbeat so we don't have to wait so long
532
533
// for nodes to go down.
533
534
heartbeatTTL := time .Duration (500 * testutil .TestMultiplier ()) * time .Millisecond
534
535
s , cleanupS := TestServer (t , func (c * Config ) {
@@ -550,13 +551,13 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
550
551
must .NoError (t , err )
551
552
552
553
// Start heartbeat.
553
- stopHeartbeat := make (chan interface {})
554
- heartbeat := func () {
554
+ heartbeat := func (ctx context.Context ) {
555
555
ticker := time .NewTicker (heartbeatTTL / 2 )
556
+ defer ticker .Stop ()
557
+
556
558
for {
557
559
select {
558
- case <- stopHeartbeat :
559
- ticker .Stop ()
560
+ case <- ctx .Done ():
560
561
return
561
562
case <- ticker .C :
562
563
if t .Failed () {
@@ -569,13 +570,15 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
569
570
WriteRequest : structs.WriteRequest {Region : "global" },
570
571
}
571
572
var resp structs.NodeUpdateResponse
572
- // Ignore errors since an unexpected failed hearbeat will cause
573
+ // Ignore errors since an unexpected failed heartbeat will cause
573
574
// the test conditions to fail.
574
575
msgpackrpc .CallWithCodec (codec , "Node.UpdateStatus" , req , & resp )
575
576
}
576
577
}
577
578
}
578
- go heartbeat ()
579
+ heartbeatCtx , cancelHeartbeat := context .WithCancel (context .Background ())
580
+ defer cancelHeartbeat ()
581
+ go heartbeat (heartbeatCtx )
579
582
580
583
// Wait for node to be ready.
581
584
testutil .WaitForClientStatus (t , s .RPC , node .ID , "global" , structs .NodeStatusReady )
@@ -602,7 +605,7 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
602
605
err = msgpackrpc .CallWithCodec (codec , "Job.Register" , jobReq , & jobResp )
603
606
must .NoError (t , err )
604
607
605
- // Wait for alloc run be pending in the server.
608
+ // Wait for alloc to be pending in the server.
606
609
testutil .WaitForJobAllocStatus (t , s .RPC , job , map [string ]int {
607
610
structs .AllocClientStatusPending : 1 ,
608
611
})
@@ -634,40 +637,30 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
634
637
err = msgpackrpc .CallWithCodec (codec , "Node.UpdateAlloc" , allocUpdateReq , & resp )
635
638
must .NoError (t , err )
636
639
637
- // Wait for alloc run be running in the server.
640
+ // Wait for alloc to be running in the server.
638
641
testutil .WaitForJobAllocStatus (t , s .RPC , job , map [string ]int {
639
642
structs .AllocClientStatusRunning : 1 ,
640
643
})
641
644
642
645
// Stop heartbeat and wait for the client to be disconnected and the alloc
643
646
// to be unknown.
644
- close ( stopHeartbeat )
647
+ cancelHeartbeat ( )
645
648
testutil .WaitForClientStatus (t , s .RPC , node .ID , "global" , structs .NodeStatusDisconnected )
646
649
testutil .WaitForJobAllocStatus (t , s .RPC , job , map [string ]int {
647
650
structs .AllocClientStatusUnknown : 1 ,
648
651
})
649
652
650
- // There should be a pending eval for the alloc replacement.
651
- state := s .fsm .State ()
652
- ws := memdb .NewWatchSet ()
653
- evals , err := state .EvalsByJob (ws , job .Namespace , job .ID )
654
- found := false
655
- for _ , eval := range evals {
656
- if eval .Status == structs .EvalStatusPending {
657
- found = true
658
- break
659
- }
660
- }
661
- must .True (t , found )
662
-
663
653
// Restart heartbeat to reconnect node.
664
- stopHeartbeat = make (chan interface {})
665
- go heartbeat ()
666
-
667
- // Wait for node to be initializing.
668
- // It must remain initializing until it updates its allocs with the server
669
- // so the scheduler have the necessary information to avoid unnecessary
670
- // placements by the pending eval.
654
+ heartbeatCtx , cancelHeartbeat = context .WithCancel (context .Background ())
655
+ defer cancelHeartbeat ()
656
+ go heartbeat (heartbeatCtx )
657
+
658
+ // Wait a few heartbeats and check that the node is still initializing.
659
+ //
660
+ // The heartbeat should not update the node to ready until it updates its
661
+ // allocs status with the server so the scheduler have the necessary
662
+ // information to avoid unnecessary placements.
663
+ time .Sleep (3 * heartbeatTTL )
671
664
testutil .WaitForClientStatus (t , s .RPC , node .ID , "global" , structs .NodeStatusInit )
672
665
673
666
// Get allocs that node should run.
@@ -683,21 +676,41 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
683
676
must .NoError (t , err )
684
677
must .Len (t , 1 , allocsResp .Allocs )
685
678
686
- // Tell server the alloc is running.
679
+ // Tell server the alloc is still running.
687
680
err = msgpackrpc .CallWithCodec (codec , "Node.UpdateAlloc" , allocUpdateReq , & resp )
688
681
must .NoError (t , err )
689
682
690
- // Wait for alloc run be running in the server.
683
+ // The client must end in the same state as before it disconnected:
684
+ // - client status is ready.
685
+ // - only 1 alloc and the alloc is running.
686
+ // - all evals are terminal, so cluster is in a stable state.
687
+ testutil .WaitForClientStatus (t , s .RPC , node .ID , "global" , structs .NodeStatusReady )
691
688
testutil .WaitForJobAllocStatus (t , s .RPC , job , map [string ]int {
692
689
structs .AllocClientStatusRunning : 1 ,
693
690
})
691
+ testutil .WaitForResult (func () (bool , error ) {
692
+ state := s .fsm .State ()
693
+ ws := memdb .NewWatchSet ()
694
+ evals , err := state .EvalsByJob (ws , job .Namespace , job .ID )
695
+ if err != nil {
696
+ return false , fmt .Errorf ("failed to read evals: %v" , err )
697
+ }
698
+ for _ , eval := range evals {
699
+ // TODO: remove this check once the disconnect process stops
700
+ // leaking a max-disconnect-timeout eval.
701
+ // https://github.com/hashicorp/nomad/issues/12809
702
+ if eval .TriggeredBy == structs .EvalTriggerMaxDisconnectTimeout {
703
+ continue
704
+ }
694
705
695
- // Wait for the client to be ready.
696
- testutil .WaitForClientStatus (t , s .RPC , node .ID , "global" , structs .NodeStatusReady )
697
-
698
- // Cleanup heartbeat goroutine before exiting.
699
- close (stopHeartbeat )
700
- testutil .WaitForClientStatus (t , s .RPC , node .ID , "global" , structs .NodeStatusDisconnected )
706
+ if ! eval .TerminalStatus () {
707
+ return false , fmt .Errorf ("found %s eval" , eval .Status )
708
+ }
709
+ }
710
+ return true , nil
711
+ }, func (err error ) {
712
+ must .NoError (t , err )
713
+ })
701
714
}
702
715
703
716
func TestClientEndpoint_UpdateStatus_HeartbeatRecovery (t * testing.T ) {
0 commit comments