Skip to content

Commit 8da140e

Browse files
committed
Merge pull request #766 from hashicorp/f-consul-update
Update the consul service when the task/alloc changes
2 parents d81ace2 + e8f88d3 commit 8da140e

File tree

3 files changed

+55
-40
lines changed

3 files changed

+55
-40
lines changed

client/alloc_runner.go

-18
Original file line numberDiff line numberDiff line change
@@ -364,8 +364,6 @@ func (r *AllocRunner) Run() {
364364
continue
365365
}
366366

367-
// Merge in the task resources
368-
task.Resources = alloc.TaskResources[task.Name]
369367
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.alloc,
370368
task, r.consulService)
371369
r.tasks[task.Name] = tr
@@ -392,22 +390,6 @@ OUTER:
392390
r.taskLock.RLock()
393391
for _, task := range tg.Tasks {
394392
tr := r.tasks[task.Name]
395-
396-
// Merge in the task resources
397-
task.Resources = update.TaskResources[task.Name]
398-
FOUND:
399-
for _, updateGroup := range update.Job.TaskGroups {
400-
if tg.Name != updateGroup.Name {
401-
continue
402-
}
403-
for _, updateTask := range updateGroup.Tasks {
404-
if updateTask.Name != task.Name {
405-
continue
406-
}
407-
task.Services = updateTask.Services
408-
break FOUND
409-
}
410-
}
411393
tr.Update(update)
412394
}
413395
r.taskLock.RUnlock()

client/consul.go

+22-9
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ type ConsulService struct {
7676

7777
trackedTasks map[string]*trackedTask
7878
serviceStates map[string]string
79+
allocToService map[string][]string
7980
trackedTskLock sync.Mutex
8081
}
8182

@@ -130,12 +131,13 @@ func NewConsulService(config *consulServiceConfig) (*ConsulService, error) {
130131
}
131132

132133
consulService := ConsulService{
133-
client: &consulApiClient{client: c},
134-
logger: config.logger,
135-
node: config.node,
136-
trackedTasks: make(map[string]*trackedTask),
137-
serviceStates: make(map[string]string),
138-
shutdownCh: make(chan struct{}),
134+
client: &consulApiClient{client: c},
135+
logger: config.logger,
136+
node: config.node,
137+
trackedTasks: make(map[string]*trackedTask),
138+
serviceStates: make(map[string]string),
139+
allocToService: make(map[string][]string),
140+
shutdownCh: make(chan struct{}),
139141
}
140142

141143
return &consulService, nil
@@ -148,8 +150,18 @@ func (c *ConsulService) Register(task *structs.Task, alloc *structs.Allocation)
148150
c.trackedTskLock.Lock()
149151
tt := &trackedTask{task: task, alloc: alloc}
150152
c.trackedTasks[fmt.Sprintf("%s-%s", alloc.ID, task.Name)] = tt
153+
154+
// Delete any previously registered service as the same alloc is being
155+
// re-registered.
156+
for _, service := range c.allocToService[alloc.ID] {
157+
delete(c.serviceStates, service)
158+
}
151159
c.trackedTskLock.Unlock()
160+
152161
for _, service := range task.Services {
162+
// Track the services this alloc is registering.
163+
c.allocToService[alloc.ID] = append(c.allocToService[alloc.ID], service.Name)
164+
153165
c.logger.Printf("[INFO] consul: registering service %s with consul.", service.Name)
154166
if err := c.registerService(service, task, alloc); err != nil {
155167
mErr.Errors = append(mErr.Errors, err)
@@ -165,6 +177,7 @@ func (c *ConsulService) Deregister(task *structs.Task, alloc *structs.Allocation
165177
var mErr multierror.Error
166178
c.trackedTskLock.Lock()
167179
delete(c.trackedTasks, fmt.Sprintf("%s-%s", alloc.ID, task.Name))
180+
delete(c.allocToService, alloc.ID)
168181
c.trackedTskLock.Unlock()
169182
for _, service := range task.Services {
170183
serviceID := alloc.Services[service.Name]
@@ -229,14 +242,14 @@ func (c *ConsulService) performSync() {
229242
// Add new services which Consul agent isn't aware of
230243
knownServices[serviceID] = struct{}{}
231244
if _, ok := consulServices[serviceID]; !ok {
232-
c.printLogMessage("[INFO] consul: registering service %s with consul.", service.Name)
245+
c.printLogMessage("[INFO] consul: perform sync, registering service %s with consul.", service.Name)
233246
c.registerService(service, trackedTask.task, trackedTask.alloc)
234247
continue
235248
}
236249

237250
// If a service has changed, re-register it with Consul agent
238251
if service.Hash() != c.serviceStates[serviceID] {
239-
c.printLogMessage("[INFO] consul: reregistering service %s with consul.", service.Name)
252+
c.printLogMessage("[INFO] consul: perform sync hash change, reregistering service %s with consul.", service.Name)
240253
c.registerService(service, trackedTask.task, trackedTask.alloc)
241254
continue
242255
}
@@ -268,7 +281,7 @@ func (c *ConsulService) performSync() {
268281
for _, consulService := range consulServices {
269282
if _, ok := knownServices[consulService.ID]; !ok {
270283
delete(c.serviceStates, consulService.ID)
271-
c.printLogMessage("[INFO] consul: deregistering service %v with consul", consulService.Service)
284+
c.printLogMessage("[INFO] consul: perform sync, deregistering service %v with consul", consulService.Service)
272285
c.deregisterService(consulService.ID)
273286
}
274287
}

client/task_runner.go

+33-13
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ import (
1010
"sync"
1111
"time"
1212

13+
"github.com/hashicorp/go-multierror"
1314
"github.com/hashicorp/nomad/client/config"
1415
"github.com/hashicorp/nomad/client/driver"
1516
"github.com/hashicorp/nomad/nomad/structs"
17+
"github.com/mitchellh/hashstructure"
1618

1719
cstructs "github.com/hashicorp/nomad/client/driver/structs"
1820
)
@@ -54,6 +56,9 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
5456
alloc *structs.Allocation, task *structs.Task,
5557
consulService *ConsulService) *TaskRunner {
5658

59+
// Merge in the task resources
60+
task.Resources = alloc.TaskResources[task.Name]
61+
5762
// Build the restart tracker.
5863
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
5964
if tg == nil {
@@ -319,21 +324,24 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
319324
}
320325

321326
// Extract the task.
322-
var task *structs.Task
327+
var updatedTask *structs.Task
323328
for _, t := range tg.Tasks {
324329
if t.Name == r.task.Name {
325-
task = t
330+
updatedTask = t
326331
}
327332
}
328-
if task == nil {
333+
if updatedTask == nil {
329334
return fmt.Errorf("task group %q doesn't contain task %q", tg.Name, r.task.Name)
330335
}
331-
r.task = task
336+
337+
// Merge in the task resources
338+
updatedTask.Resources = update.TaskResources[updatedTask.Name]
332339

333340
// Update will update resources and store the new kill timeout.
341+
var mErr multierror.Error
334342
if r.handle != nil {
335-
if err := r.handle.Update(task); err != nil {
336-
r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, err)
343+
if err := r.handle.Update(updatedTask); err != nil {
344+
mErr.Errors = append(mErr.Errors, fmt.Errorf("updating task resources failed: %v", err))
337345
}
338346
}
339347

@@ -342,14 +350,26 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
342350
r.restartTracker.SetPolicy(tg.RestartPolicy)
343351
}
344352

345-
/* TODO
346-
// Re-register the task to consul and store the updated alloc.
347-
r.consulService.Deregister(r.task, r.alloc)
348-
r.alloc = update
349-
r.consulService.Register(r.task, r.alloc)
350-
*/
353+
// Hash services returns the hash of the task's services
354+
hashServices := func(task *structs.Task) uint64 {
355+
h, err := hashstructure.Hash(task.Services, nil)
356+
if err != nil {
357+
mErr.Errors = append(mErr.Errors, fmt.Errorf("hashing services failed %#v: %v", task.Services, err))
358+
}
359+
return h
360+
}
351361

352-
return nil
362+
// Re-register the task to consul if any of the services have changed.
363+
if hashServices(updatedTask) != hashServices(r.task) {
364+
if err := r.consulService.Register(updatedTask, update); err != nil {
365+
mErr.Errors = append(mErr.Errors, fmt.Errorf("updating services with consul failed: %v", err))
366+
}
367+
}
368+
369+
// Store the updated alloc.
370+
r.alloc = update
371+
r.task = updatedTask
372+
return mErr.ErrorOrNil()
353373
}
354374

355375
// Helper function for converting a WaitResult into a TaskTerminated event.

0 commit comments

Comments
 (0)