Skip to content

Commit 27af1f2

Browse files
authored
Merge pull request #7772 from hashicorp/b-7768-remove-policies-for-stopped-jobs
delete/create autoscaling policies as job is stopped/started
2 parents 8808b2b + 6f5610c commit 27af1f2

File tree

2 files changed

+113
-9
lines changed

2 files changed

+113
-9
lines changed

nomad/state/state_store.go

+23-8
Original file line numberDiff line numberDiff line change
@@ -1478,16 +1478,10 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn
14781478
return fmt.Errorf("index update failed: %v", err)
14791479
}
14801480

1481-
// Delete any job scaling policies
1482-
numDeletedScalingPolicies, err := txn.DeleteAll("scaling_policy", "target_prefix", namespace, jobID)
1483-
if err != nil {
1481+
// Delete any remaining job scaling policies
1482+
if err := s.deleteJobScalingPolicies(index, job, txn); err != nil {
14841483
return fmt.Errorf("deleting job scaling policies failed: %v", err)
14851484
}
1486-
if numDeletedScalingPolicies > 0 {
1487-
if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil {
1488-
return fmt.Errorf("index update failed: %v", err)
1489-
}
1490-
}
14911485

14921486
// Delete the scaling events
14931487
if _, err = txn.DeleteAll("scaling_event", "id", namespace, jobID); err != nil {
@@ -1506,6 +1500,20 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn
15061500
return nil
15071501
}
15081502

1503+
// deleteJobScalingPolicies deletes any scaling policies associated with the job
1504+
func (s *StateStore) deleteJobScalingPolicies(index uint64, job *structs.Job, txn *memdb.Txn) error {
1505+
numDeletedScalingPolicies, err := txn.DeleteAll("scaling_policy", "target_prefix", job.Namespace, job.ID)
1506+
if err != nil {
1507+
return fmt.Errorf("deleting job scaling policies failed: %v", err)
1508+
}
1509+
if numDeletedScalingPolicies > 0 {
1510+
if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil {
1511+
return fmt.Errorf("index update failed: %v", err)
1512+
}
1513+
}
1514+
return nil
1515+
}
1516+
15091517
// deleteJobVersions deletes all versions of the given job.
15101518
func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *memdb.Txn) error {
15111519
iter, err := txn.Get("job_version", "id_prefix", job.Namespace, job.ID)
@@ -4247,6 +4255,13 @@ func (s *StateStore) updateJobScalingPolicies(index uint64, job *structs.Job, tx
42474255

42484256
ws := memdb.NewWatchSet()
42494257

4258+
if job.Stop {
4259+
if err := s.deleteJobScalingPolicies(index, job, txn); err != nil {
4260+
return fmt.Errorf("deleting job scaling policies failed: %v", err)
4261+
}
4262+
return nil
4263+
}
4264+
42504265
scalingPolicies := job.GetScalingPolicies()
42514266
newTargets := map[string]struct{}{}
42524267
for _, p := range scalingPolicies {

nomad/state/state_store_test.go

+90-1
Original file line numberDiff line numberDiff line change
@@ -8427,7 +8427,96 @@ func TestStateStore_DeleteScalingPolicies(t *testing.T) {
84278427
require.False(watchFired(ws))
84288428
}
84298429

8430-
func TestStateStore_DeleteJob_ChildScalingPolicies(t *testing.T) {
8430+
func TestStateStore_StopJob_DeleteScalingPolicies(t *testing.T) {
8431+
t.Parallel()
8432+
8433+
require := require.New(t)
8434+
8435+
state := testStateStore(t)
8436+
8437+
job := mock.Job()
8438+
8439+
err := state.UpsertJob(1000, job)
8440+
require.NoError(err)
8441+
8442+
policy := mock.ScalingPolicy()
8443+
policy.Target[structs.ScalingTargetJob] = job.ID
8444+
err = state.UpsertScalingPolicies(1100, []*structs.ScalingPolicy{policy})
8445+
require.NoError(err)
8446+
8447+
// Ensure the scaling policy is present and start some watches
8448+
wsGet := memdb.NewWatchSet()
8449+
out, err := state.ScalingPolicyByTarget(wsGet, policy.Target)
8450+
require.NoError(err)
8451+
require.NotNil(out)
8452+
wsList := memdb.NewWatchSet()
8453+
_, err = state.ScalingPolicies(wsList)
8454+
require.NoError(err)
8455+
8456+
// Stop the job
8457+
job, err = state.JobByID(nil, job.Namespace, job.ID)
8458+
require.NoError(err)
8459+
job.Stop = true
8460+
err = state.UpsertJob(1200, job)
8461+
require.NoError(err)
8462+
8463+
// Ensure:
8464+
// * the scaling policy was deleted
8465+
// * the watches were fired
8466+
// * the table index was advanced
8467+
require.True(watchFired(wsGet))
8468+
require.True(watchFired(wsList))
8469+
out, err = state.ScalingPolicyByTarget(nil, policy.Target)
8470+
require.NoError(err)
8471+
require.Nil(out)
8472+
index, err := state.Index("scaling_policy")
8473+
require.GreaterOrEqual(index, uint64(1200))
8474+
}
8475+
8476+
func TestStateStore_UnstopJob_UpsertScalingPolicies(t *testing.T) {
8477+
t.Parallel()
8478+
8479+
require := require.New(t)
8480+
8481+
state := testStateStore(t)
8482+
8483+
job, policy := mock.JobWithScalingPolicy()
8484+
job.Stop = true
8485+
8486+
// establish watcher, verify there are no scaling policies yet
8487+
ws := memdb.NewWatchSet()
8488+
list, err := state.ScalingPolicies(ws)
8489+
require.NoError(err)
8490+
require.Nil(list.Next())
8491+
8492+
// upsert a stopped job, verify that we don't fire the watcher or add any scaling policies
8493+
err = state.UpsertJob(1000, job)
8494+
require.NoError(err)
8495+
require.False(watchFired(ws))
8496+
// stopped job should have no scaling policies, watcher doesn't fire
8497+
list, err = state.ScalingPolicies(ws)
8498+
require.NoError(err)
8499+
require.Nil(list.Next())
8500+
8501+
// Establish a new watcher
8502+
ws = memdb.NewWatchSet()
8503+
_, err = state.ScalingPolicies(ws)
8504+
require.NoError(err)
8505+
// Unstop this job, say you'll run it again...
8506+
job.Stop = false
8507+
err = state.UpsertJob(1100, job)
8508+
require.NoError(err)
8509+
8510+
// Ensure the scaling policy was added, watch was fired, index was advanced
8511+
require.True(watchFired(ws))
8512+
out, err := state.ScalingPolicyByTarget(nil, policy.Target)
8513+
require.NoError(err)
8514+
require.NotNil(out)
8515+
index, err := state.Index("scaling_policy")
8516+
require.GreaterOrEqual(index, uint64(1100))
8517+
}
8518+
8519+
func TestStateStore_DeleteJob_DeleteScalingPolicies(t *testing.T) {
84318520
t.Parallel()
84328521

84338522
require := require.New(t)

0 commit comments

Comments
 (0)