Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bound TXN queue lengths #16

Merged
merged 5 commits into from
Jul 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ Further PR's (with tests) are welcome, but please maintain backwards compatibili
* Support majority read concerns ([details](https://github.com/globalsign/mgo/pull/2))
* Improved connection handling ([details](https://github.com/globalsign/mgo/pull/5))
* Hides SASL warnings ([details](https://github.com/globalsign/mgo/pull/7))
* Improved multi-document transaction performance ([details](https://github.com/globalsign/mgo/pull/10), [more](https://github.com/globalsign/mgo/pull/11))
* Fixes timezone handling ([details](https://github.com/go-mgo/mgo/pull/464))
* Improved multi-document transaction performance ([details](https://github.com/globalsign/mgo/pull/10), [more](https://github.com/globalsign/mgo/pull/11), [more](https://github.com/globalsign/mgo/pull/16))
* Fixes cursor timeouts ([detials](https://jira.mongodb.org/browse/SERVER-24899))

---
Expand Down
10 changes: 10 additions & 0 deletions txn/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,16 @@ NextDoc:
change.Upsert = false
chaos("")
if _, err := cquery.Apply(change, &info); err == nil {
if f.opts.MaxTxnQueueLength > 0 && len(info.Queue) > f.opts.MaxTxnQueueLength {
// abort with TXN Queue too long, but remove the entry we just added
innerErr := c.UpdateId(dkey.Id,
bson.D{{"$pullAll", bson.D{{"txn-queue", []token{tt}}}}})
if innerErr != nil {
f.debugf("error while backing out of queue-too-long: %v", innerErr)
}
return nil, fmt.Errorf("txn-queue for %v in %q has too many transactions (%d)",
dkey.Id, dkey.C, len(info.Queue))
}
if info.Remove == "" {
// Fast path, unless workload is insert/remove heavy.
revno[dkey] = info.Revno
Expand Down
40 changes: 36 additions & 4 deletions txn/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,14 @@ const (
// A Runner applies operations as part of a transaction onto any number
// of collections within a database. See the Run method for details.
type Runner struct {
tc *mgo.Collection // txns
sc *mgo.Collection // stash
lc *mgo.Collection // log
tc *mgo.Collection // txns
sc *mgo.Collection // stash
lc *mgo.Collection // log
opts RunnerOptions // runtime options
}

const defaultMaxTxnQueueLength = 1000

// NewRunner returns a new transaction runner that uses tc to hold its
// transactions.
//
Expand All @@ -233,7 +236,36 @@ type Runner struct {
// will be used for implementing the transactional behavior of insert
// and remove operations.
func NewRunner(tc *mgo.Collection) *Runner {
return &Runner{tc, tc.Database.C(tc.Name + ".stash"), nil}
return &Runner{
tc: tc,
sc: tc.Database.C(tc.Name + ".stash"),
lc: nil,
opts: DefaultRunnerOptions(),
}
}

// RunnerOptions encapsulates ways you can tweak transaction Runner behavior.
type RunnerOptions struct {
// MaxTxnQueueLength is a way to limit bad behavior. Many operations on
// transaction queues are O(N^2), and transaction queues growing too large
// are usually indicative of a bug in behavior. This should be larger
// than the maximum number of concurrent operations to a single document.
// Normal operations are likely to only ever hit 10 or so, we use a default
// maximum length of 1000.
MaxTxnQueueLength int
}

// SetOptions allows people to change some of the internal behavior of a Runner.
func (r *Runner) SetOptions(opts RunnerOptions) {
r.opts = opts
}

// DefaultRunnerOptions defines default behavior for a Runner.
// Users can use the DefaultRunnerOptions to only override specific behavior.
func DefaultRunnerOptions() RunnerOptions {
return RunnerOptions{
MaxTxnQueueLength: defaultMaxTxnQueueLength,
}
}

var ErrAborted = fmt.Errorf("transaction aborted")
Expand Down
84 changes: 84 additions & 0 deletions txn/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,90 @@ func (s *S) TestTxnQueueStashStressTest(c *C) {
}
}

func (s *S) checkTxnQueueLength(c *C, expectedQueueLength int) {
txn.SetDebug(false)
txn.SetChaos(txn.Chaos{
KillChance: 1,
Breakpoint: "set-applying",
})
defer txn.SetChaos(txn.Chaos{})
err := s.accounts.Insert(M{"_id": 0, "balance": 100})
c.Assert(err, IsNil)
ops := []txn.Op{{
C: "accounts",
Id: 0,
Update: M{"$inc": M{"balance": 100}},
}}
for i := 0; i < expectedQueueLength; i++ {
err := s.runner.Run(ops, "", nil)
c.Assert(err, Equals, txn.ErrChaos)
}
txn.SetDebug(true)
// Now that we've filled up the queue, we should see that there are 1000
// items in the queue, and the error applying a new one will change.
var doc bson.M
err = s.accounts.FindId(0).One(&doc)
c.Assert(err, IsNil)
c.Check(len(doc["txn-queue"].([]interface{})), Equals, expectedQueueLength)
err = s.runner.Run(ops, "", nil)
c.Check(err, ErrorMatches, `txn-queue for 0 in "accounts" has too many transactions \(\d+\)`)
// The txn-queue should not have grown
err = s.accounts.FindId(0).One(&doc)
c.Assert(err, IsNil)
c.Check(len(doc["txn-queue"].([]interface{})), Equals, expectedQueueLength)
}

func (s *S) TestTxnQueueDefaultMaxSize(c *C) {
s.runner.SetOptions(txn.DefaultRunnerOptions())
s.checkTxnQueueLength(c, 1000)
}

func (s *S) TestTxnQueueCustomMaxSize(c *C) {
opts := txn.DefaultRunnerOptions()
opts.MaxTxnQueueLength = 100
s.runner.SetOptions(opts)
s.checkTxnQueueLength(c, 100)
}

func (s *S) TestTxnQueueUnlimited(c *C) {
opts := txn.DefaultRunnerOptions()
// A value of 0 should mean 'unlimited'
opts.MaxTxnQueueLength = 0
s.runner.SetOptions(opts)
// it isn't possible to actually prove 'unlimited' but we can prove that
// we at least can insert more than the default number of transactions
// without getting a 'too many transactions' failure.
txn.SetDebug(false)
txn.SetChaos(txn.Chaos{
KillChance: 1,
// Use set-prepared because we are adding more transactions than
// other tests, and this speeds up setup time a bit
Breakpoint: "set-prepared",
})
defer txn.SetChaos(txn.Chaos{})
err := s.accounts.Insert(M{"_id": 0, "balance": 100})
c.Assert(err, IsNil)
ops := []txn.Op{{
C: "accounts",
Id: 0,
Update: M{"$inc": M{"balance": 100}},
}}
for i := 0; i < 1100; i++ {
err := s.runner.Run(ops, "", nil)
c.Assert(err, Equals, txn.ErrChaos)
}
txn.SetDebug(true)
var doc bson.M
err = s.accounts.FindId(0).One(&doc)
c.Assert(err, IsNil)
c.Check(len(doc["txn-queue"].([]interface{})), Equals, 1100)
err = s.runner.Run(ops, "", nil)
c.Check(err, Equals, txn.ErrChaos)
err = s.accounts.FindId(0).One(&doc)
c.Assert(err, IsNil)
c.Check(len(doc["txn-queue"].([]interface{})), Equals, 1101)
}

func (s *S) TestPurgeMissingPipelineSizeLimit(c *C) {
// This test ensures that PurgeMissing can handle very large
// txn-queue fields. Previous iterations of PurgeMissing would
Expand Down