-
Notifications
You must be signed in to change notification settings - Fork 2k
/
Copy pathtask_hook_coordinator.go
135 lines (107 loc) · 3.04 KB
/
task_hook_coordinator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package allocrunner
import (
"context"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner"
"github.com/hashicorp/nomad/nomad/structs"
)
// TaskHookCoordinator helps coordinate when main start tasks can launch
// namely after all Prestart Tasks have run, and after all BlockUntilCompleted have completed
type taskHookCoordinator struct {
logger hclog.Logger
closedCh chan struct{}
mainTaskCtx context.Context
mainTaskCtxCancel func()
prestartSidecar map[string]struct{}
prestartEphemeral map[string]struct{}
}
func newTaskHookCoordinator(logger hclog.Logger, tasks []*structs.Task) *taskHookCoordinator {
closedCh := make(chan struct{})
close(closedCh)
mainTaskCtx, cancelFn := context.WithCancel(context.Background())
c := &taskHookCoordinator{
logger: logger,
closedCh: closedCh,
mainTaskCtx: mainTaskCtx,
mainTaskCtxCancel: cancelFn,
prestartSidecar: map[string]struct{}{},
prestartEphemeral: map[string]struct{}{},
}
c.setTasks(tasks)
return c
}
func (c *taskHookCoordinator) setTasks(tasks []*structs.Task) {
for _, task := range tasks {
if task.Lifecycle == nil {
// move nothing
continue
}
switch task.Lifecycle.Hook {
case structs.TaskLifecycleHookPrestart:
if task.Lifecycle.Sidecar {
c.prestartSidecar[task.Name] = struct{}{}
} else {
c.prestartEphemeral[task.Name] = struct{}{}
}
default:
c.logger.Error("invalid lifecycle hook", "hook", task.Lifecycle.Hook)
}
}
if !c.hasPrestartTasks() {
c.mainTaskCtxCancel()
}
}
func (c *taskHookCoordinator) hasPrestartTasks() bool {
return len(c.prestartSidecar)+len(c.prestartEphemeral) > 0
}
func (c *taskHookCoordinator) startConditionForTask(task *structs.Task) <-chan struct{} {
if task.Lifecycle != nil && task.Lifecycle.Hook == structs.TaskLifecycleHookPrestart {
return c.closedCh
}
return c.mainTaskCtx.Done()
}
// This is not thread safe! This must only be called from one thread per alloc runner.
func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskState) {
if c.mainTaskCtx.Err() != nil {
// nothing to do here
return
}
for task := range c.prestartSidecar {
st := states[task]
if st == nil || st.StartedAt.IsZero() {
continue
}
delete(c.prestartSidecar, task)
}
for task := range c.prestartEphemeral {
st := states[task]
if st == nil || !st.Successful() {
continue
}
delete(c.prestartEphemeral, task)
}
// everything well
if !c.hasPrestartTasks() {
c.mainTaskCtxCancel()
}
}
// hasNonSidecarTasks returns false if all the passed tasks are sidecar tasks
func hasNonSidecarTasks(tasks []*taskrunner.TaskRunner) bool {
for _, tr := range tasks {
lc := tr.Task().Lifecycle
if lc == nil || !lc.Sidecar {
return true
}
}
return false
}
// hasSidecarTasks returns true if all the passed tasks are sidecar tasks
func hasSidecarTasks(tasks map[string]*taskrunner.TaskRunner) bool {
for _, tr := range tasks {
lc := tr.Task().Lifecycle
if lc != nil && lc.Sidecar {
return true
}
}
return false
}