diff --git a/README.md b/README.md index ade8e2538..6d8cf8cc1 100644 --- a/README.md +++ b/README.md @@ -15,8 +15,8 @@ Further PR's (with tests) are welcome, but please maintain backwards compatibili * Support majority read concerns ([details](https://github.com/globalsign/mgo/pull/2)) * Improved connection handling ([details](https://github.com/globalsign/mgo/pull/5)) * Hides SASL warnings ([details](https://github.com/globalsign/mgo/pull/7)) -* Improved multi-document transaction performance ([details](https://github.com/globalsign/mgo/pull/10), [more](https://github.com/globalsign/mgo/pull/11)) * Fixes timezone handling ([details](https://github.com/go-mgo/mgo/pull/464)) +* Improved multi-document transaction performance ([details](https://github.com/globalsign/mgo/pull/10), [more](https://github.com/globalsign/mgo/pull/11), [more](https://github.com/globalsign/mgo/pull/16)) * Fixes cursor timeouts ([detials](https://jira.mongodb.org/browse/SERVER-24899)) --- diff --git a/txn/flusher.go b/txn/flusher.go index 63b03e51c..852a1696e 100644 --- a/txn/flusher.go +++ b/txn/flusher.go @@ -271,6 +271,16 @@ NextDoc: change.Upsert = false chaos("") if _, err := cquery.Apply(change, &info); err == nil { + if f.opts.MaxTxnQueueLength > 0 && len(info.Queue) > f.opts.MaxTxnQueueLength { + // abort with TXN Queue too long, but remove the entry we just added + innerErr := c.UpdateId(dkey.Id, + bson.D{{"$pullAll", bson.D{{"txn-queue", []token{tt}}}}}) + if innerErr != nil { + f.debugf("error while backing out of queue-too-long: %v", innerErr) + } + return nil, fmt.Errorf("txn-queue for %v in %q has too many transactions (%d)", + dkey.Id, dkey.C, len(info.Queue)) + } if info.Remove == "" { // Fast path, unless workload is insert/remove heavy. revno[dkey] = info.Revno diff --git a/txn/txn.go b/txn/txn.go index d9a9f9657..140c7a6c2 100644 --- a/txn/txn.go +++ b/txn/txn.go @@ -217,11 +217,14 @@ const ( // A Runner applies operations as part of a transaction onto any number // of collections within a database. See the Run method for details. type Runner struct { - tc *mgo.Collection // txns - sc *mgo.Collection // stash - lc *mgo.Collection // log + tc *mgo.Collection // txns + sc *mgo.Collection // stash + lc *mgo.Collection // log + opts RunnerOptions // runtime options } +const defaultMaxTxnQueueLength = 1000 + // NewRunner returns a new transaction runner that uses tc to hold its // transactions. // @@ -233,7 +236,36 @@ type Runner struct { // will be used for implementing the transactional behavior of insert // and remove operations. func NewRunner(tc *mgo.Collection) *Runner { - return &Runner{tc, tc.Database.C(tc.Name + ".stash"), nil} + return &Runner{ + tc: tc, + sc: tc.Database.C(tc.Name + ".stash"), + lc: nil, + opts: DefaultRunnerOptions(), + } +} + +// RunnerOptions encapsulates ways you can tweak transaction Runner behavior. +type RunnerOptions struct { + // MaxTxnQueueLength is a way to limit bad behavior. Many operations on + // transaction queues are O(N^2), and transaction queues growing too large + // are usually indicative of a bug in behavior. This should be larger + // than the maximum number of concurrent operations to a single document. + // Normal operations are likely to only ever hit 10 or so, we use a default + // maximum length of 1000. + MaxTxnQueueLength int +} + +// SetOptions allows people to change some of the internal behavior of a Runner. +func (r *Runner) SetOptions(opts RunnerOptions) { + r.opts = opts +} + +// DefaultRunnerOptions defines default behavior for a Runner. +// Users can use the DefaultRunnerOptions to only override specific behavior. +func DefaultRunnerOptions() RunnerOptions { + return RunnerOptions{ + MaxTxnQueueLength: defaultMaxTxnQueueLength, + } } var ErrAborted = fmt.Errorf("transaction aborted") diff --git a/txn/txn_test.go b/txn/txn_test.go index 291537802..8b85986b5 100644 --- a/txn/txn_test.go +++ b/txn/txn_test.go @@ -621,6 +621,90 @@ func (s *S) TestTxnQueueStashStressTest(c *C) { } } +func (s *S) checkTxnQueueLength(c *C, expectedQueueLength int) { + txn.SetDebug(false) + txn.SetChaos(txn.Chaos{ + KillChance: 1, + Breakpoint: "set-applying", + }) + defer txn.SetChaos(txn.Chaos{}) + err := s.accounts.Insert(M{"_id": 0, "balance": 100}) + c.Assert(err, IsNil) + ops := []txn.Op{{ + C: "accounts", + Id: 0, + Update: M{"$inc": M{"balance": 100}}, + }} + for i := 0; i < expectedQueueLength; i++ { + err := s.runner.Run(ops, "", nil) + c.Assert(err, Equals, txn.ErrChaos) + } + txn.SetDebug(true) + // Now that we've filled up the queue, we should see that there are 1000 + // items in the queue, and the error applying a new one will change. + var doc bson.M + err = s.accounts.FindId(0).One(&doc) + c.Assert(err, IsNil) + c.Check(len(doc["txn-queue"].([]interface{})), Equals, expectedQueueLength) + err = s.runner.Run(ops, "", nil) + c.Check(err, ErrorMatches, `txn-queue for 0 in "accounts" has too many transactions \(\d+\)`) + // The txn-queue should not have grown + err = s.accounts.FindId(0).One(&doc) + c.Assert(err, IsNil) + c.Check(len(doc["txn-queue"].([]interface{})), Equals, expectedQueueLength) +} + +func (s *S) TestTxnQueueDefaultMaxSize(c *C) { + s.runner.SetOptions(txn.DefaultRunnerOptions()) + s.checkTxnQueueLength(c, 1000) +} + +func (s *S) TestTxnQueueCustomMaxSize(c *C) { + opts := txn.DefaultRunnerOptions() + opts.MaxTxnQueueLength = 100 + s.runner.SetOptions(opts) + s.checkTxnQueueLength(c, 100) +} + +func (s *S) TestTxnQueueUnlimited(c *C) { + opts := txn.DefaultRunnerOptions() + // A value of 0 should mean 'unlimited' + opts.MaxTxnQueueLength = 0 + s.runner.SetOptions(opts) + // it isn't possible to actually prove 'unlimited' but we can prove that + // we at least can insert more than the default number of transactions + // without getting a 'too many transactions' failure. + txn.SetDebug(false) + txn.SetChaos(txn.Chaos{ + KillChance: 1, + // Use set-prepared because we are adding more transactions than + // other tests, and this speeds up setup time a bit + Breakpoint: "set-prepared", + }) + defer txn.SetChaos(txn.Chaos{}) + err := s.accounts.Insert(M{"_id": 0, "balance": 100}) + c.Assert(err, IsNil) + ops := []txn.Op{{ + C: "accounts", + Id: 0, + Update: M{"$inc": M{"balance": 100}}, + }} + for i := 0; i < 1100; i++ { + err := s.runner.Run(ops, "", nil) + c.Assert(err, Equals, txn.ErrChaos) + } + txn.SetDebug(true) + var doc bson.M + err = s.accounts.FindId(0).One(&doc) + c.Assert(err, IsNil) + c.Check(len(doc["txn-queue"].([]interface{})), Equals, 1100) + err = s.runner.Run(ops, "", nil) + c.Check(err, Equals, txn.ErrChaos) + err = s.accounts.FindId(0).One(&doc) + c.Assert(err, IsNil) + c.Check(len(doc["txn-queue"].([]interface{})), Equals, 1101) +} + func (s *S) TestPurgeMissingPipelineSizeLimit(c *C) { // This test ensures that PurgeMissing can handle very large // txn-queue fields. Previous iterations of PurgeMissing would