Skip to content

Commit 387b016

Browse files
authored
client: improve group service stanza interpolation and check_re… (#6586)
* client: improve group service stanza interpolation and check_restart support Interpolation can now be done on group service stanzas. Note that some task runtime specific information that was previously available when the service was registered poststart of a task is no longer available. The check_restart stanza for checks defined on group services will now properly restart the allocation upon check failures if configured.
1 parent 39f1d61 commit 387b016

29 files changed

+921
-733
lines changed

client/allochealth/tracker.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ OUTER:
419419
type taskHealthState struct {
420420
task *structs.Task
421421
state *structs.TaskState
422-
taskRegistrations *consul.TaskRegistration
422+
taskRegistrations *consul.ServiceRegistrations
423423
}
424424

425425
// event takes the deadline time for the allocation to be healthy and the update

client/allocrunner/alloc_runner.go

+34
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
cstate "github.com/hashicorp/nomad/client/state"
2323
cstructs "github.com/hashicorp/nomad/client/structs"
2424
"github.com/hashicorp/nomad/client/vaultclient"
25+
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
2526
"github.com/hashicorp/nomad/helper"
2627
"github.com/hashicorp/nomad/nomad/structs"
2728
"github.com/hashicorp/nomad/plugins/device"
@@ -1001,6 +1002,39 @@ func (ar *allocRunner) RestartTask(taskName string, taskEvent *structs.TaskEvent
10011002
return tr.Restart(context.TODO(), taskEvent, false)
10021003
}
10031004

1005+
// Restart satisfies the WorkloadRestarter interface restarts all task runners
1006+
// concurrently
1007+
func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
1008+
waitCh := make(chan struct{})
1009+
var err *multierror.Error
1010+
var errMutex sync.Mutex
1011+
1012+
go func() {
1013+
var wg sync.WaitGroup
1014+
defer close(waitCh)
1015+
for tn, tr := range ar.tasks {
1016+
wg.Add(1)
1017+
go func(taskName string, r agentconsul.WorkloadRestarter) {
1018+
defer wg.Done()
1019+
e := r.Restart(ctx, event, failure)
1020+
if e != nil {
1021+
errMutex.Lock()
1022+
defer errMutex.Unlock()
1023+
err = multierror.Append(err, fmt.Errorf("failed to restart task %s: %v", taskName, e))
1024+
}
1025+
}(tn, tr)
1026+
}
1027+
wg.Wait()
1028+
}()
1029+
1030+
select {
1031+
case <-waitCh:
1032+
case <-ctx.Done():
1033+
}
1034+
1035+
return err.ErrorOrNil()
1036+
}
1037+
10041038
// RestartAll signalls all task runners in the allocation to restart and passes
10051039
// a copy of the task event to each restart event.
10061040
// Returns any errors in a concatenated form.

client/allocrunner/alloc_runner_hooks.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
multierror "github.com/hashicorp/go-multierror"
88
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
99
clientconfig "github.com/hashicorp/nomad/client/config"
10+
"github.com/hashicorp/nomad/client/taskenv"
1011
"github.com/hashicorp/nomad/nomad/structs"
1112
"github.com/hashicorp/nomad/plugins/drivers"
1213
)
@@ -125,7 +126,13 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
125126
newDiskMigrationHook(hookLogger, ar.prevAllocMigrator, ar.allocDir),
126127
newAllocHealthWatcherHook(hookLogger, alloc, hs, ar.Listener(), ar.consulClient),
127128
newNetworkHook(hookLogger, ns, alloc, nm, nc),
128-
newGroupServiceHook(hookLogger, alloc, ar.consulClient),
129+
newGroupServiceHook(groupServiceHookConfig{
130+
alloc: alloc,
131+
consul: ar.consulClient,
132+
restarter: ar,
133+
taskEnvBuilder: taskenv.NewBuilder(config.Node, ar.Alloc(), nil, config.Region).SetAllocDir(ar.allocDir.AllocDir),
134+
logger: hookLogger,
135+
}),
129136
newConsulSockHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig),
130137
}
131138

client/allocrunner/alloc_runner_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,7 @@ func TestAllocRunner_DeploymentHealth_Unhealthy_Checks(t *testing.T) {
528528
consulClient := conf.Consul.(*cconsul.MockConsulServiceClient)
529529
consulClient.AllocRegistrationsFn = func(allocID string) (*consul.AllocRegistration, error) {
530530
return &consul.AllocRegistration{
531-
Tasks: map[string]*consul.TaskRegistration{
531+
Tasks: map[string]*consul.ServiceRegistrations{
532532
task.Name: {
533533
Services: map[string]*consul.ServiceRegistration{
534534
"123": {
@@ -847,7 +847,7 @@ func TestAllocRunner_TaskFailed_KillTG(t *testing.T) {
847847
consulClient := conf.Consul.(*cconsul.MockConsulServiceClient)
848848
consulClient.AllocRegistrationsFn = func(allocID string) (*consul.AllocRegistration, error) {
849849
return &consul.AllocRegistration{
850-
Tasks: map[string]*consul.TaskRegistration{
850+
Tasks: map[string]*consul.ServiceRegistrations{
851851
task.Name: {
852852
Services: map[string]*consul.ServiceRegistration{
853853
"123": {

client/allocrunner/alloc_runner_unix_test.go

+9-4
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@ func TestAllocRunner_Restore_RunningTerminal(t *testing.T) {
3333
// 5. Assert task and logmon are cleaned up
3434

3535
alloc := mock.Alloc()
36+
alloc.Job.TaskGroups[0].Services = []*structs.Service{
37+
{
38+
Name: "foo",
39+
PortLabel: "8888",
40+
},
41+
}
3642
task := alloc.Job.TaskGroups[0].Tasks[0]
3743
task.Driver = "mock_driver"
3844
task.Config = map[string]interface{}{
@@ -117,13 +123,12 @@ func TestAllocRunner_Restore_RunningTerminal(t *testing.T) {
117123
// 2 removals (canary+noncanary) during prekill
118124
// 2 removals (canary+noncanary) during exited
119125
// 2 removals (canary+noncanary) during stop
120-
// 1 remove group during stop
126+
// 2 removals (canary+noncanary) group during stop
121127
consulOps := conf2.Consul.(*consul.MockConsulServiceClient).GetOps()
122-
require.Len(t, consulOps, 7)
123-
for _, op := range consulOps[:6] {
128+
require.Len(t, consulOps, 8)
129+
for _, op := range consulOps {
124130
require.Equal(t, "remove", op.Op)
125131
}
126-
require.Equal(t, "remove_group", consulOps[6].Op)
127132

128133
// Assert terminated task event was emitted
129134
events := ar2.AllocState().TaskStates[task.Name].Events

client/allocrunner/groupservice_hook.go

+116-12
Original file line numberDiff line numberDiff line change
@@ -3,30 +3,63 @@ package allocrunner
33
import (
44
"sync"
55

6-
hclog "github.com/hashicorp/go-hclog"
76
log "github.com/hashicorp/go-hclog"
87
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
98
"github.com/hashicorp/nomad/client/consul"
9+
"github.com/hashicorp/nomad/client/taskenv"
10+
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
1011
"github.com/hashicorp/nomad/nomad/structs"
12+
"github.com/hashicorp/nomad/plugins/drivers"
1113
)
1214

1315
// groupServiceHook manages task group Consul service registration and
1416
// deregistration.
1517
type groupServiceHook struct {
16-
alloc *structs.Allocation
18+
allocID string
19+
group string
20+
restarter agentconsul.WorkloadRestarter
1721
consulClient consul.ConsulServiceAPI
1822
prerun bool
19-
mu sync.Mutex
2023

2124
logger log.Logger
25+
26+
// The following fields may be updated
27+
canary bool
28+
services []*structs.Service
29+
networks structs.Networks
30+
taskEnvBuilder *taskenv.Builder
31+
32+
// Since Update() may be called concurrently with any other hook all
33+
// hook methods must be fully serialized
34+
mu sync.Mutex
35+
}
36+
37+
type groupServiceHookConfig struct {
38+
alloc *structs.Allocation
39+
consul consul.ConsulServiceAPI
40+
restarter agentconsul.WorkloadRestarter
41+
taskEnvBuilder *taskenv.Builder
42+
logger log.Logger
2243
}
2344

24-
func newGroupServiceHook(logger hclog.Logger, alloc *structs.Allocation, consulClient consul.ConsulServiceAPI) *groupServiceHook {
45+
func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook {
2546
h := &groupServiceHook{
26-
alloc: alloc,
27-
consulClient: consulClient,
47+
allocID: cfg.alloc.ID,
48+
group: cfg.alloc.TaskGroup,
49+
restarter: cfg.restarter,
50+
consulClient: cfg.consul,
51+
taskEnvBuilder: cfg.taskEnvBuilder,
52+
}
53+
h.logger = cfg.logger.Named(h.Name())
54+
h.services = cfg.alloc.Job.LookupTaskGroup(h.group).Services
55+
56+
if cfg.alloc.AllocatedResources != nil {
57+
h.networks = cfg.alloc.AllocatedResources.Shared.Networks
58+
}
59+
60+
if cfg.alloc.DeploymentStatus != nil {
61+
h.canary = cfg.alloc.DeploymentStatus.Canary
2862
}
29-
h.logger = logger.Named(h.Name())
3063
return h
3164
}
3265

@@ -41,26 +74,97 @@ func (h *groupServiceHook) Prerun() error {
4174
h.prerun = true
4275
h.mu.Unlock()
4376
}()
44-
return h.consulClient.RegisterGroup(h.alloc)
77+
78+
if len(h.services) == 0 {
79+
return nil
80+
}
81+
82+
services := h.getWorkloadServices()
83+
return h.consulClient.RegisterWorkload(services)
4584
}
4685

4786
func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
4887
h.mu.Lock()
4988
defer h.mu.Unlock()
50-
oldAlloc := h.alloc
51-
h.alloc = req.Alloc
89+
oldWorkloadServices := h.getWorkloadServices()
90+
91+
// Store new updated values out of request
92+
canary := false
93+
if req.Alloc.DeploymentStatus != nil {
94+
canary = req.Alloc.DeploymentStatus.Canary
95+
}
96+
97+
var networks structs.Networks
98+
if req.Alloc.AllocatedResources != nil {
99+
networks = req.Alloc.AllocatedResources.Shared.Networks
100+
}
101+
102+
// Update group service hook fields
103+
h.networks = networks
104+
h.services = req.Alloc.Job.LookupTaskGroup(h.group).Services
105+
h.canary = canary
106+
h.taskEnvBuilder.UpdateTask(req.Alloc, nil)
107+
108+
// Create new task services struct with those new values
109+
newWorkloadServices := h.getWorkloadServices()
52110

53111
if !h.prerun {
54112
// Update called before Prerun. Update alloc and exit to allow
55113
// Prerun to do initial registration.
56114
return nil
57115
}
58116

59-
return h.consulClient.UpdateGroup(oldAlloc, h.alloc)
117+
return h.consulClient.UpdateWorkload(oldWorkloadServices, newWorkloadServices)
60118
}
61119

62120
func (h *groupServiceHook) Postrun() error {
63121
h.mu.Lock()
64122
defer h.mu.Unlock()
65-
return h.consulClient.RemoveGroup(h.alloc)
123+
h.deregister()
124+
return nil
125+
}
126+
127+
func (h *groupServiceHook) driverNet() *drivers.DriverNetwork {
128+
if len(h.networks) == 0 {
129+
return nil
130+
}
131+
132+
//TODO(schmichael) only support one network for now
133+
net := h.networks[0]
134+
//TODO(schmichael) there's probably a better way than hacking driver network
135+
return &drivers.DriverNetwork{
136+
AutoAdvertise: true,
137+
IP: net.IP,
138+
// Copy PortLabels from group network
139+
PortMap: net.PortLabels(),
140+
}
141+
}
142+
143+
// deregister services from Consul.
144+
func (h *groupServiceHook) deregister() {
145+
if len(h.services) > 0 {
146+
workloadServices := h.getWorkloadServices()
147+
h.consulClient.RemoveWorkload(workloadServices)
148+
149+
// Canary flag may be getting flipped when the alloc is being
150+
// destroyed, so remove both variations of the service
151+
workloadServices.Canary = !workloadServices.Canary
152+
h.consulClient.RemoveWorkload(workloadServices)
153+
}
154+
}
155+
156+
func (h *groupServiceHook) getWorkloadServices() *agentconsul.WorkloadServices {
157+
// Interpolate with the task's environment
158+
interpolatedServices := taskenv.InterpolateServices(h.taskEnvBuilder.Build(), h.services)
159+
160+
// Create task services struct with request's driver metadata
161+
return &agentconsul.WorkloadServices{
162+
AllocID: h.allocID,
163+
Group: h.group,
164+
Restarter: h.restarter,
165+
Services: interpolatedServices,
166+
DriverNetwork: h.driverNet(),
167+
Networks: h.networks,
168+
Canary: h.canary,
169+
}
66170
}

0 commit comments

Comments
 (0)