Skip to content

Commit 9405473

Browse files
author
Mahmood Ali
committed
Comment public functions and batch write txn
1 parent 7548800 commit 9405473

File tree

2 files changed

+19
-0
lines changed

2 files changed

+19
-0
lines changed

nomad/fsm.go

+4
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,9 @@ func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{}
520520
panic(fmt.Errorf("failed to decode request: %v", err))
521521
}
522522

523+
// Perform all store updates atomically to ensure a consistent views for store readers.
524+
// A partial update may increment the snapshot index, allowing eval brokers to process
525+
// evals for jobs whose deregistering didn't get committed yet.
523526
err := n.state.WithWriteTransaction(func(tx state.Txn) error {
524527
for jobNS, options := range req.Jobs {
525528
if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge, tx); err != nil {
@@ -540,6 +543,7 @@ func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{}
540543
return err
541544
}
542545

546+
// perform the side effects outside the transactions
543547
n.handleUpsertedEvals(req.Evals)
544548
return nil
545549
}

nomad/state/state_store.go

+15
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import (
1414
"github.com/hashicorp/nomad/nomad/structs"
1515
)
1616

17+
// Txn is a transaction against a state store.
18+
// This can be a read or write transaction.
1719
type Txn = *memdb.Txn
1820

1921
const (
@@ -925,6 +927,8 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error {
925927
return nil
926928
}
927929

930+
// UpsertJobTxn is used to register a job or update a job definition, like UpsertJob,
931+
// but in a transcation. Useful for when making multiple modifications atomically
928932
func (s *StateStore) UpsertJobTxn(index uint64, job *structs.Job, txn Txn) error {
929933
return s.upsertJobImpl(index, job, false, txn)
930934
}
@@ -1019,6 +1023,8 @@ func (s *StateStore) DeleteJob(index uint64, namespace, jobID string) error {
10191023
return err
10201024
}
10211025

1026+
// DeleteJobTxn is used to deregister a job, like DeleteJob,
1027+
// but in a transcation. Useful for when making multiple modifications atomically
10221028
func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn) error {
10231029
// COMPAT 0.7: Upgrade old objects that do not have namespaces
10241030
if namespace == "" {
@@ -1206,6 +1212,8 @@ func (s *StateStore) JobByID(ws memdb.WatchSet, namespace, id string) (*structs.
12061212
return s.JobByIDTxn(ws, namespace, id, txn)
12071213
}
12081214

1215+
// JobByIDTxn is used to lookup a job by its ID, like JobByID. JobByID returns the job version
1216+
// accessable through in the transaction
12091217
func (s *StateStore) JobByIDTxn(ws memdb.WatchSet, namespace, id string, txn Txn) (*structs.Job, error) {
12101218
// COMPAT 0.7: Upgrade old objects that do not have namespaces
12111219
if namespace == "" {
@@ -1534,6 +1542,8 @@ func (s *StateStore) DeletePeriodicLaunch(index uint64, namespace, jobID string)
15341542
return err
15351543
}
15361544

1545+
// DeletePeriodicLaunchTxn is used to delete the periodic launch, like DeletePeriodicLaunch
1546+
// but in a transcation. Useful for when making multiple modifications atomically
15371547
func (s *StateStore) DeletePeriodicLaunchTxn(index uint64, namespace, jobID string, txn Txn) error {
15381548
// COMPAT 0.7: Upgrade old objects that do not have namespaces
15391549
if namespace == "" {
@@ -1610,6 +1620,8 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro
16101620
return err
16111621
}
16121622

1623+
// UpsertEvals is used to upsert a set of evaluations, like UpsertEvals
1624+
// but in a transcation. Useful for when making multiple modifications atomically
16131625
func (s *StateStore) UpsertEvalsTxn(index uint64, evals []*structs.Evaluation, txn Txn) error {
16141626
// Do a nested upsert
16151627
jobs := make(map[structs.NamespacedID]string, len(evals))
@@ -3919,6 +3931,9 @@ func (s *StateStore) SchedulerSetConfig(idx uint64, config *structs.SchedulerCon
39193931
return nil
39203932
}
39213933

3934+
// WithWriteTransaction executes the passed function within a write transaction,
3935+
// and returns its result. If the invocation returns no error, the transaction
3936+
// is committed; otherwise, it's aborted.
39223937
func (s *StateStore) WithWriteTransaction(fn func(Txn) error) error {
39233938
tx := s.db.Txn(true)
39243939
defer tx.Abort()

0 commit comments

Comments
 (0)