Skip to content

Commit

Permalink
Introduces Disconnect block into the TaskGroup configuration (#19886
Browse files Browse the repository at this point in the history
)

This PR is the first on two that will implement the new Disconnect block. In this PR the new block is introduced to be backwards compatible with the fields it will replace. For more information refer to this RFC and this ticket.
  • Loading branch information
Juanadelacuesta authored Feb 19, 2024
1 parent e8db588 commit 20cfbc8
Show file tree
Hide file tree
Showing 27 changed files with 3,298 additions and 1,940 deletions.
101 changes: 80 additions & 21 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"time"
)

type ReconcileOption = string

const (
// RestartPolicyModeDelay causes an artificial delay till the next interval is
// reached when the specified attempts have been reached in the interval.
Expand All @@ -19,6 +21,14 @@ const (
// RestartPolicyModeFail causes a job to fail if the specified number of
// attempts are reached within an interval.
RestartPolicyModeFail = "fail"

// ReconcileOption is used to specify the behavior of the reconciliation process
// between the original allocations and the replacements when a previously
// disconnected client comes back online.
ReconcileOptionKeepOriginal = "keep_original"
ReconcileOptionKeepReplacement = "keep_replacement"
ReconcileOptionBestScore = "best_score"
ReconcileOptionLongestRunning = "longest_running"
)

// MemoryStats holds memory usage related stats
Expand Down Expand Up @@ -113,6 +123,37 @@ func (r *RestartPolicy) Merge(rp *RestartPolicy) {
}
}

// Disconnect strategy defines how both clients and server should behave in case of
// disconnection between them.
type DisconnectStrategy struct {
// Defines for how long the server will consider the unresponsive node as
// disconnected but alive instead of lost.
LostAfter *time.Duration `mapstructure:"lost_after" hcl:"lost_after,optional"`

// Defines for how long a disconnected client will keep its allocations running.
StopOnClientAfter *time.Duration `mapstructure:"stop_on_client_after" hcl:"stop_on_client_after,optional"`

// A boolean field used to define if the allocations should be replaced while
// it's considered disconnected.
Replace *bool `mapstructure:"replace" hcl:"replace,optional"`

// Once the disconnected node starts reporting again, it will define which
// instances to keep: the original allocations, the replacement, the one
// running on the node with the best score as it is currently implemented,
// or the allocation that has been running continuously the longest.
Reconcile *ReconcileOption `mapstructure:"reconcile" hcl:"reconcile,optional"`
}

func (ds *DisconnectStrategy) Canonicalize() {
if ds.Replace == nil {
ds.Replace = pointerOf(true)
}

if ds.Reconcile == nil {
ds.Reconcile = pointerOf(ReconcileOptionBestScore)
}
}

// Reschedule configures how Tasks are rescheduled when they crash or fail.
type ReschedulePolicy struct {
// Attempts limits the number of rescheduling attempts that can occur in an interval.
Expand Down Expand Up @@ -205,6 +246,14 @@ func (a *Affinity) Canonicalize() {
}
}

func NewDefaultDisconnectStrategy() *DisconnectStrategy {
return &DisconnectStrategy{
LostAfter: pointerOf(0 * time.Minute),
Replace: pointerOf(true),
Reconcile: pointerOf(ReconcileOptionBestScore),
}
}

func NewDefaultReschedulePolicy(jobType string) *ReschedulePolicy {
var dp *ReschedulePolicy
switch jobType {
Expand Down Expand Up @@ -445,27 +494,31 @@ func (vm *VolumeMount) Canonicalize() {

// TaskGroup is the unit of scheduling.
type TaskGroup struct {
Name *string `hcl:"name,label"`
Count *int `hcl:"count,optional"`
Constraints []*Constraint `hcl:"constraint,block"`
Affinities []*Affinity `hcl:"affinity,block"`
Tasks []*Task `hcl:"task,block"`
Spreads []*Spread `hcl:"spread,block"`
Volumes map[string]*VolumeRequest `hcl:"volume,block"`
RestartPolicy *RestartPolicy `hcl:"restart,block"`
ReschedulePolicy *ReschedulePolicy `hcl:"reschedule,block"`
EphemeralDisk *EphemeralDisk `hcl:"ephemeral_disk,block"`
Update *UpdateStrategy `hcl:"update,block"`
Migrate *MigrateStrategy `hcl:"migrate,block"`
Networks []*NetworkResource `hcl:"network,block"`
Meta map[string]string `hcl:"meta,block"`
Services []*Service `hcl:"service,block"`
ShutdownDelay *time.Duration `mapstructure:"shutdown_delay" hcl:"shutdown_delay,optional"`
StopAfterClientDisconnect *time.Duration `mapstructure:"stop_after_client_disconnect" hcl:"stop_after_client_disconnect,optional"`
MaxClientDisconnect *time.Duration `mapstructure:"max_client_disconnect" hcl:"max_client_disconnect,optional"`
Scaling *ScalingPolicy `hcl:"scaling,block"`
Consul *Consul `hcl:"consul,block"`
PreventRescheduleOnLost *bool `hcl:"prevent_reschedule_on_lost,optional"`
Name *string `hcl:"name,label"`
Count *int `hcl:"count,optional"`
Constraints []*Constraint `hcl:"constraint,block"`
Affinities []*Affinity `hcl:"affinity,block"`
Tasks []*Task `hcl:"task,block"`
Spreads []*Spread `hcl:"spread,block"`
Volumes map[string]*VolumeRequest `hcl:"volume,block"`
RestartPolicy *RestartPolicy `hcl:"restart,block"`
Disconnect *DisconnectStrategy `hcl:"disconnect,block"`
ReschedulePolicy *ReschedulePolicy `hcl:"reschedule,block"`
EphemeralDisk *EphemeralDisk `hcl:"ephemeral_disk,block"`
Update *UpdateStrategy `hcl:"update,block"`
Migrate *MigrateStrategy `hcl:"migrate,block"`
Networks []*NetworkResource `hcl:"network,block"`
Meta map[string]string `hcl:"meta,block"`
Services []*Service `hcl:"service,block"`
ShutdownDelay *time.Duration `mapstructure:"shutdown_delay" hcl:"shutdown_delay,optional"`
// Deprecated: StopAfterClientDisconnect is deprecated in Nomad 1.8. Use Disconnect.StopOnClientAfter instead.
StopAfterClientDisconnect *time.Duration `mapstructure:"stop_after_client_disconnect" hcl:"stop_after_client_disconnect,optional"`
// To be deprecated after 1.8.0 infavour of Disconnect.LostAfter
MaxClientDisconnect *time.Duration `mapstructure:"max_client_disconnect" hcl:"max_client_disconnect,optional"`
Scaling *ScalingPolicy `hcl:"scaling,block"`
Consul *Consul `hcl:"consul,block"`
// To be deprecated after 1.8.0 infavour of Disconnect.Replace
PreventRescheduleOnLost *bool `hcl:"prevent_reschedule_on_lost,optional"`
}

// NewTaskGroup creates a new TaskGroup.
Expand Down Expand Up @@ -537,6 +590,7 @@ func (g *TaskGroup) Canonicalize(job *Job) {
if g.ReschedulePolicy != nil {
g.ReschedulePolicy.Canonicalize(*job.Type)
}

// Merge the migrate strategy from the job
if jm, tm := job.Migrate != nil, g.Migrate != nil; jm && tm {
jobMigrate := job.Migrate.Copy()
Expand Down Expand Up @@ -584,9 +638,14 @@ func (g *TaskGroup) Canonicalize(job *Job) {
for _, s := range g.Services {
s.Canonicalize(nil, g, job)
}

if g.PreventRescheduleOnLost == nil {
g.PreventRescheduleOnLost = pointerOf(false)
}

if g.Disconnect != nil {
g.Disconnect.Canonicalize()
}
}

// These needs to be in sync with DefaultServiceJobRestartPolicy in
Expand Down
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2668,7 +2668,7 @@ func (c *Client) updateAlloc(update *structs.Allocation) {
// Reconnect unknown allocations if they were updated and are not terminal.
reconnect := update.ClientStatus == structs.AllocClientStatusUnknown &&
update.AllocModifyIndex > alloc.AllocModifyIndex &&
(!update.ServerTerminalStatus() || !alloc.PreventRescheduleOnLost())
(!update.ServerTerminalStatus() || !alloc.PreventRescheduleOnDisconnect())
if reconnect {
err = ar.Reconnect(update)
if err != nil {
Expand Down
12 changes: 7 additions & 5 deletions client/heartbeatstop.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func newHeartbeatStop(
// allocation to be stopped if the taskgroup is configured appropriately
func (h *heartbeatStop) allocHook(alloc *structs.Allocation) {
tg := allocTaskGroup(alloc)
if tg.StopAfterClientDisconnect != nil {
if tg.GetDisconnectStopTimeout() != nil {
h.allocHookCh <- alloc
}
}
Expand All @@ -56,8 +56,9 @@ func (h *heartbeatStop) allocHook(alloc *structs.Allocation) {
// past that it should be prevented from restarting
func (h *heartbeatStop) shouldStop(alloc *structs.Allocation) bool {
tg := allocTaskGroup(alloc)
if tg.StopAfterClientDisconnect != nil {
return h.shouldStopAfter(time.Now(), *tg.StopAfterClientDisconnect)
timeout := tg.GetDisconnectStopTimeout()
if timeout != nil {
return h.shouldStopAfter(time.Now(), *timeout)
}
return false
}
Expand Down Expand Up @@ -103,8 +104,9 @@ func (h *heartbeatStop) watch() {

case alloc := <-h.allocHookCh:
tg := allocTaskGroup(alloc)
if tg.StopAfterClientDisconnect != nil {
h.allocInterval[alloc.ID] = *tg.StopAfterClientDisconnect
timeout := tg.GetDisconnectStopTimeout()
if timeout != nil {
h.allocInterval[alloc.ID] = *timeout
}

case <-timeout:
Expand Down
67 changes: 62 additions & 5 deletions client/heartbeatstop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
"github.com/shoenig/test/must"
)

func TestHeartbeatStop_allocHook(t *testing.T) {
Expand All @@ -27,6 +27,63 @@ func TestHeartbeatStop_allocHook(t *testing.T) {
})
defer cleanupC1()

// an allocation, with a tiny lease
d := 1 * time.Microsecond
alloc := &structs.Allocation{
ID: uuid.Generate(),
TaskGroup: "foo",
Job: &structs.Job{
TaskGroups: []*structs.TaskGroup{
{
Name: "foo",
Disconnect: &structs.DisconnectStrategy{
StopOnClientAfter: &d,
},
},
},
},
Resources: &structs.Resources{
CPU: 100,
MemoryMB: 100,
DiskMB: 0,
},
}

// alloc added to heartbeatStop.allocs
err := client.addAlloc(alloc, "")
must.NoError(t, err)
testutil.WaitForResult(func() (bool, error) {
_, ok := client.heartbeatStop.allocInterval[alloc.ID]
return ok, nil
}, func(err error) {
must.NoError(t, err)
})

// the tiny lease causes the watch loop to destroy it
testutil.WaitForResult(func() (bool, error) {
_, ok := client.heartbeatStop.allocInterval[alloc.ID]
return !ok, nil
}, func(err error) {
must.NoError(t, err)
})

must.Nil(t, client.allocs[alloc.ID])
}

// Test using stop_after_client_disconnect, remove after its deprecated in favor
// of Disconnect.StopOnClientAfter introduced in 1.8.0.
func TestHeartbeatStop_allocHook_Disconnect(t *testing.T) {
ci.Parallel(t)

server, _, cleanupS1 := testServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, server.RPC)

client, cleanupC1 := TestClient(t, func(c *config.Config) {
c.RPCHandler = server
})
defer cleanupC1()

// an allocation, with a tiny lease
d := 1 * time.Microsecond
alloc := &structs.Allocation{
Expand All @@ -49,21 +106,21 @@ func TestHeartbeatStop_allocHook(t *testing.T) {

// alloc added to heartbeatStop.allocs
err := client.addAlloc(alloc, "")
require.NoError(t, err)
must.NoError(t, err)
testutil.WaitForResult(func() (bool, error) {
_, ok := client.heartbeatStop.allocInterval[alloc.ID]
return ok, nil
}, func(err error) {
require.NoError(t, err)
must.NoError(t, err)
})

// the tiny lease causes the watch loop to destroy it
testutil.WaitForResult(func() (bool, error) {
_, ok := client.heartbeatStop.allocInterval[alloc.ID]
return !ok, nil
}, func(err error) {
require.NoError(t, err)
must.NoError(t, err)
})

require.Empty(t, client.allocs[alloc.ID])
must.Nil(t, client.allocs[alloc.ID])
}
15 changes: 15 additions & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,21 @@ func ApiTgToStructsTG(job *structs.Job, taskGroup *api.TaskGroup, tg *structs.Ta
}
}

if taskGroup.Disconnect != nil {
tg.Disconnect = &structs.DisconnectStrategy{
StopOnClientAfter: taskGroup.Disconnect.StopOnClientAfter,
Replace: taskGroup.Disconnect.Replace,
}

if taskGroup.Disconnect.Reconcile != nil {
tg.Disconnect.Reconcile = *taskGroup.Disconnect.Reconcile
}

if taskGroup.Disconnect.LostAfter != nil {
tg.Disconnect.LostAfter = *taskGroup.Disconnect.LostAfter
}
}

if taskGroup.Migrate != nil {
tg.Migrate = &structs.MigrateStrategy{
MaxParallel: *taskGroup.Migrate.MaxParallel,
Expand Down
8 changes: 8 additions & 0 deletions command/agent/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2752,6 +2752,9 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
},
},
},
Disconnect: &api.DisconnectStrategy{
LostAfter: pointer.Of(30 * time.Second),
},
MaxClientDisconnect: pointer.Of(30 * time.Second),
Tasks: []*api.Task{
{
Expand Down Expand Up @@ -3185,6 +3188,11 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
},
},
},
Disconnect: &structs.DisconnectStrategy{
LostAfter: 30 * time.Second,
Replace: pointer.Of(true),
Reconcile: structs.ReconcileOptionBestScore,
},
MaxClientDisconnect: pointer.Of(30 * time.Second),
Tasks: []*structs.Task{
{
Expand Down
4 changes: 4 additions & 0 deletions command/testdata/example-short-bad.json
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@
"ShutdownDelay": null,
"StopAfterClientDisconnect": null,
"MaxClientDisconnect": null,
"Disconnect":{
"StopAfterClient": null,
"LostAfter": null
},
"Scaling": null,
"Consul": null
}
Expand Down
1 change: 1 addition & 0 deletions command/testdata/example-short.json
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
"ShutdownDelay": null,
"StopAfterClientDisconnect": null,
"MaxClientDisconnect": null,
"Disconnect": null,
"Scaling": null,
"Consul": null
}
Expand Down
36 changes: 36 additions & 0 deletions jobspec/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,42 @@ func parseUpdate(result **api.UpdateStrategy, list *ast.ObjectList) error {
return dec.Decode(m)
}

func parseDisconnect(result **api.DisconnectStrategy, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) > 1 {
return fmt.Errorf("only one 'disconnect' block allowed")
}

// Get our resource object
o := list.Items[0]

var m map[string]interface{}
if err := hcl.DecodeObject(&m, o.Val); err != nil {
return err
}

// Check for invalid keys
valid := []string{
"lost_after",
"replace",
"reconcile",
"stop_on_client_after",
}
if err := checkHCLKeys(o.Val, valid); err != nil {
return err
}

dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
WeaklyTypedInput: true,
Result: result,
})
if err != nil {
return err
}
return dec.Decode(m)
}

func parseMigrate(result **api.MigrateStrategy, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) > 1 {
Expand Down
Loading

0 comments on commit 20cfbc8

Please sign in to comment.