Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set an upper limit of how large we will let txn-queues grow. #463

Open
wants to merge 5 commits into
base: v2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions txn/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ var txnFields = bson.D{{"txn-queue", 1}, {"txn-revno", 1}, {"txn-remove", 1}, {"

var errPreReqs = fmt.Errorf("transaction has pre-requisites and force is false")

const maxTxnQueueLength = 1000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a sensible default but it might be good to add support for overriding it on a per session basis in case it's required for some application.


// prepare injects t's id onto txn-queue for all affected documents
// and collects the current txn-queue and txn-revno values during
// the process. If the prepared txn-queue indicates that there are
Expand Down Expand Up @@ -244,6 +246,16 @@ NextDoc:
change.Upsert = false
chaos("")
if _, err := cquery.Apply(change, &info); err == nil {
if len(info.Queue) > maxTxnQueueLength {
// abort with TXN Queue too long, but remove the entry we just added
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/TXN Queue/txn-queue/ ?

innerErr := c.UpdateId(dkey.Id,
bson.D{{"$pullAll", bson.D{{"txn-queue", []token{tt}}}}})
if innerErr != nil {
f.debugf("error while backing out of queue-too-long: %v", innerErr)
}
return nil, fmt.Errorf("txn-queue for %v in %q has too many transactions (%d)",
dkey.Id, dkey.C, len(info.Queue))
}
if info.Remove == "" {
// Fast path, unless workload is insert/remove heavy.
revno[dkey] = info.Revno
Expand Down
33 changes: 33 additions & 0 deletions txn/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,39 @@ func (s *S) TestTxnQueueStashStressTest(c *C) {
}
}

func (s *S) TestTxnQueueMaxSize(c *C) {
txn.SetDebug(false)
txn.SetChaos(txn.Chaos{
KillChance: 1,
Breakpoint: "set-applying",
})
defer txn.SetChaos(txn.Chaos{})
err := s.accounts.Insert(M{"_id": 0, "balance": 100})
c.Assert(err, IsNil)
ops := []txn.Op{{
C: "accounts",
Id: 0,
Update: M{"$inc": M{"balance": 100}},
}}
for i := 0; i < 1000; i++ {
err := s.runner.Run(ops, "", nil)
c.Assert(err, Equals, txn.ErrChaos)
}
txn.SetDebug(true)
// Now that we've filled up the queue, we should see that there are 1000
// items in the queue, and the error applying a new one will change.
var doc bson.M
err = s.accounts.FindId(0).One(&doc)
c.Assert(err, IsNil)
c.Check(len(doc["txn-queue"].([]interface{})), Equals, 1000)
err = s.runner.Run(ops, "", nil)
c.Check(err, ErrorMatches, `txn-queue for 0 in "accounts" has too many transactions \(1001\)`)
// The txn-queue should not have grown
err = s.accounts.FindId(0).One(&doc)
c.Assert(err, IsNil)
c.Check(len(doc["txn-queue"].([]interface{})), Equals, 1000)
}

func (s *S) TestPurgeMissingPipelineSizeLimit(c *C) {
// This test ensures that PurgeMissing can handle very large
// txn-queue fields. Previous iterations of PurgeMissing would
Expand Down