diff --git a/pkg/storage/minprop/doc.go b/pkg/storage/minprop/doc.go new file mode 100644 index 000000000000..d4abf86070aa --- /dev/null +++ b/pkg/storage/minprop/doc.go @@ -0,0 +1,28 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +// Package minprop exports a main data structure, Tracker, which checkpoints +// closed timestamps and associated Raft Lease Applied indexes positions for +// which (under additional conditions) it is legal to serve follower reads. It +// does so by maintaining a 'next' timestamp above which new command evaluations +// are forced, and by tracking when all in-flight evaluations below this +// timestamp have completed (at which point a call to the Close method succeeds: +// 'next' becomes closed, and a new 'next' is initialized with a future +// timestamp). +// +// In-flight command evaluations are tracked via the Track method which acquires +// a reference with the tracker, returns a minimum timestamp to be used for the +// proposal evaluation, and provides a closure that releases the reference with +// a lease applied index used for the proposal. +package minprop diff --git a/pkg/storage/minprop/doc_test.go b/pkg/storage/minprop/doc_test.go new file mode 100644 index 000000000000..e61161355799 --- /dev/null +++ b/pkg/storage/minprop/doc_test.go @@ -0,0 +1,192 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package minprop + +import ( + "context" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/util/hlc" +) + +func Example() { + ctx := context.TODO() + + tracker := NewTracker() + + fmt.Println("The newly initialized tracker has a zero closed timestamp:") + fmt.Println(tracker) + + fmt.Println("A first command arrives on range 12 (though the range isn't known yet to the Tracker).") + ts, done1 := tracker.Track(ctx) + fmt.Println("All commands initially start out on the right. The command has its timestamp forwarded to", ts, ".") + fmt.Println(tracker) + + fmt.Println("Two more commands arrive, on r1 and r12.") + _, done2 := tracker.Track(ctx) + _, done3 := tracker.Track(ctx) + fmt.Println(tracker) + + fmt.Println("The command on r1 finishes evaluating at Lease Applied Index 10 and lets the Tracker know.") + done2(ctx, 1, 10) + fmt.Println(tracker) + + fmt.Println("The command on r12 also finishes quickly, at LAI 77.") + done3(ctx, 12, 77) + fmt.Println(tracker) + + fmt.Println("The system closes out a timestamp (registering 1000 as the next timestamp to close out).") + closed1, mlai1 := tracker.Close(hlc.Timestamp{WallTime: 1E9}) + fmt.Println("No problem: nothing is tracked on the left side; returns:", closed1, "and", mlai1) + fmt.Println("Note how the items on the right have moved to the left, as they are relevant for the") + fmt.Println("next call to Close.") + fmt.Println(tracker) + + fmt.Println("Nothing happens for a while until the system tries to close out the next timestamp.") + fmt.Println("However, the very first proposal is still tracked and blocks progress.") + closed2, mlai2 := tracker.Close(hlc.Timestamp{WallTime: 2E9}) + fmt.Println("The call returns a no-op in the form", closed2, mlai2, ".") + fmt.Println(tracker) + + ts4, done4 := tracker.Track(ctx) + fmt.Println("A new command gets tracked on r12 (and is forwarded to", ts4, "(if necessary).") + fmt.Println("It terminates quickly, leaving an MLAI entry of 78 behind.") + done4(ctx, 12, 78) + fmt.Println(tracker) + + fmt.Println("Finally! The slow evaluation finishes and the command gets proposed at index 79.") + fmt.Println("Note that the right now tracks a smaller value of 78. Consumers have to keep the") + fmt.Println("maximum they've seen.") + done1(ctx, 12, 79) + fmt.Println(tracker) + + closed3, mlai3 := tracker.Close(hlc.Timestamp{WallTime: 3E9}) + fmt.Println("The next call to Close() is successful and returns:", closed3, "and", mlai3) + fmt.Println(tracker) + + // Output: + // The newly initialized tracker has a zero closed timestamp: + // + // closed=0.000000000,0 + // | next=0.000000000,1 + // | left | right + // | 0 # 0 + // v v + // ---------------------------------------------------------> time + // + // A first command arrives on range 12 (though the range isn't known yet to the Tracker). + // All commands initially start out on the right. The command has its timestamp forwarded to 0.000000000,2 . + // + // closed=0.000000000,0 + // | next=0.000000000,1 + // | left | right + // | 0 # 1 + // v v + // ---------------------------------------------------------> time + // + // Two more commands arrive, on r1 and r12. + // + // closed=0.000000000,0 + // | next=0.000000000,1 + // | left | right + // | 0 # 3 + // v v + // ---------------------------------------------------------> time + // + // The command on r1 finishes evaluating at Lease Applied Index 10 and lets the Tracker know. + // + // closed=0.000000000,0 + // | next=0.000000000,1 + // | left | right + // | 0 # 2 + // | @ 10 (r1) + // v v + // ---------------------------------------------------------> time + // + // The command on r12 also finishes quickly, at LAI 77. + // + // closed=0.000000000,0 + // | next=0.000000000,1 + // | left | right + // | 0 # 1 + // | @ 10 (r1) + // | @ 77 (r12) + // v v + // ---------------------------------------------------------> time + // + // The system closes out a timestamp (registering 1000 as the next timestamp to close out). + // No problem: nothing is tracked on the left side; returns: 0.000000000,1 and map[] + // Note how the items on the right have moved to the left, as they are relevant for the + // next call to Close. + // + // closed=0.000000000,1 + // | next=1.000000000,0 + // | left | right + // | 1 # 0 + // | 10 @ (r1) + // | 77 @ (r12) + // v v + // ---------------------------------------------------------> time + // + // Nothing happens for a while until the system tries to close out the next timestamp. + // However, the very first proposal is still tracked and blocks progress. + // The call returns a no-op in the form 0.000000000,1 map[] . + // + // closed=0.000000000,1 + // | next=1.000000000,0 + // | left | right + // | 1 # 0 + // | 10 @ (r1) + // | 77 @ (r12) + // v v + // ---------------------------------------------------------> time + // + // A new command gets tracked on r12 (and is forwarded to 1.000000000,1 (if necessary). + // It terminates quickly, leaving an MLAI entry of 78 behind. + // + // closed=0.000000000,1 + // | next=1.000000000,0 + // | left | right + // | 1 # 0 + // | 10 @ (r1) + // | 77 @ (r12) + // | @ 78 (r12) + // v v + // ---------------------------------------------------------> time + // + // Finally! The slow evaluation finishes and the command gets proposed at index 79. + // Note that the right now tracks a smaller value of 78. Consumers have to keep the + // maximum they've seen. + // + // closed=0.000000000,1 + // | next=1.000000000,0 + // | left | right + // | 0 # 0 + // | 10 @ (r1) + // | @ 78 (r12) + // | 79 @ (r12) + // v v + // ---------------------------------------------------------> time + // + // The next call to Close() is successful and returns: 1.000000000,0 and map[1:10 12:79] + // + // closed=1.000000000,0 + // | next=3.000000000,0 + // | left | right + // | 0 # 0 + // | 78 @ (r12) + // v v + // ---------------------------------------------------------> time +} diff --git a/pkg/storage/minprop/tracker.go b/pkg/storage/minprop/tracker.go new file mode 100644 index 000000000000..78d0b4b30059 --- /dev/null +++ b/pkg/storage/minprop/tracker.go @@ -0,0 +1,267 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package minprop + +import ( + "context" + "fmt" + "sort" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// A Tracker is part of the machinery enabling follower reads, that is, consistent +// reads served by replicas not holding the lease (for the requested timestamp). +// This data structure keeps tabs on ongoing command evaluations (which it +// forces to successively higher timestamps) and provides closed timestamp +// updates along with a map delta of minimum Lease Applied Indexes a replica +// wishing to serve a follower read must reach in order to do so correctly. +// +// See https://github.com/cockroachdb/cockroach/pull/26362 for more information. +// +// The methods exposed on Tracker are safe for concurrent use. +type Tracker struct { + mu struct { + syncutil.Mutex + // closed is the most recently closed timestamp. + closed hlc.Timestamp + + // The variables below track required information for the next closed + // timestamp and beyond. First, `next` is the timestamp that will be + // closed out next (i.e. will replace `closed`). + // + // "left" and "right" refers to how the timestamps at which the + // associated command evaluations take place relate to `next`. + // `left`-tracked proposals are taken into account for the next closed + // timestamp, i.e. they could mutate at timestamps <= `next`. `right` + // proposals affect only MVCC timestamps > `next` and thus will become + // relevant only after `next` has been closed out, at which point the + // "right" set will replace the "left". + // + // closed next + // | left | right + // | | + // | | + // v v + //---------------------------------------------------------> time + // + // A replica wishing to serve a follower read will first have to catch + // up to a lease applied index that is guaranteed to include all writes + // affecting the closed timestamp or below. When `next` is closed out, + // the set of relevant Lease Applied Indexes will be stored in `leftMLAI`. + // + // This is augmented by reference counts for the proposals currently in + // the process of evaluating. `next` can only be closed out once + // `leftRef` has been drained (i.e. has dropped to zero); new proposals + // are always forced above `next` and consequently count towards + // `rightRef`. + + next hlc.Timestamp + leftMLAI, rightMLAI map[roachpb.RangeID]int64 + leftRef, rightRef int + } +} + +// NewTracker returns a Tracker initialized to a closed timestamp of zero and +// a next closed timestamp of one logical tick past zero. +func NewTracker() *Tracker { + t := &Tracker{} + t.mu.next = hlc.Timestamp{Logical: 1} + t.mu.leftMLAI = map[roachpb.RangeID]int64{} + t.mu.rightMLAI = map[roachpb.RangeID]int64{} + + return t +} + +// String prints a string representation of the Tracker's state. +func (t *Tracker) String() string { + t.mu.Lock() + defer t.mu.Unlock() + closed, next := t.mu.closed, t.mu.next + leftRef, rightRef := t.mu.leftRef, t.mu.rightRef + + type item struct { + rangeID roachpb.RangeID + mlai int64 + left bool + } + + var lais []item + for rangeID, mlai := range t.mu.leftMLAI { + lais = append(lais, item{rangeID, mlai, true}) + } + for rangeID, mlai := range t.mu.rightMLAI { + lais = append(lais, item{rangeID, mlai, false}) + } + + sort.Slice(lais, func(i, j int) bool { + if lais[i].rangeID != lais[j].rangeID { + return lais[i].rangeID < lais[j].rangeID + } + return lais[i].mlai < lais[j].mlai + }) + + var lines string + for _, item := range lais { + var format string + if !item.left { + format = ` | @ %d (r%d) +` + } else { + format = ` | %11d @ (r%d) +` + } + lines += fmt.Sprintf(format, item.mlai, item.rangeID) + } + + return fmt.Sprintf(` + closed=%s + | next=%s + | left | right + | %3d # %d +`+lines+ + ` v v +---------------------------------------------------------> time +`, + closed, next, leftRef, rightRef, + ) +} + +// Close attempts to close out the current candidate timestamp (replacing it +// with the provided one). This is possible only if tracked proposals that were +// evaluating when Close was previously called have since completed. On success, +// all subsequent proposals will be forced to evaluate strictly above the +// provided timestamp, and the timestamp previously passed to Close is returned +// as a closed timestamp along with a map of minimum Lease Applied Indexes +// reflecting the updates for the past period. On failure, the previous closed +// timestamp is returned along with a nil map (which can be treated by callers +// like a successful call that happens to not return any new information). +// Similarly, failure to provide a timestamp strictly larger than that to be +// closed out next results in the same "idempotent" return values. +func (t *Tracker) Close(next hlc.Timestamp) (hlc.Timestamp, map[roachpb.RangeID]int64) { + t.mu.Lock() + defer t.mu.Unlock() + + var closed hlc.Timestamp + var mlai map[roachpb.RangeID]int64 + if log.V(3) { + log.Infof(context.TODO(), "close: leftRef=%d rightRef=%d next=%s closed=%s new=%s", t.mu.leftRef, t.mu.rightRef, t.mu.next, t.mu.closed, next) + } + + // Make sure to not let `t.mu.next` regress, or we'll accept proposals + // that violate earlier closed timestamps. (And if it stayed the same + // the logic in the closure returned from Track would fall apart). + if t.mu.leftRef == 0 && t.mu.next.Less(next) { + // NB: if rightRef is also zero, then nothing is in flight right now and + // we could theoretically close out `next`. However, we'd also have to + // merge the left and right MLAI maps, and would force followers to + // catch up to more commands much more rapidly than can be expected of + // them. If we want to make use of this optimization, we should emit + // two closed timestamp updates for this case. + t.mu.closed = t.mu.next + mlai = t.mu.leftMLAI // hold on to left MLAIs as return value + + // `next` moves forward to the provided timestamp, and picks up the + // right refcount and MLAIs (so that it is now responsible for tracking + // everything that's in-flight). + t.mu.leftMLAI = t.mu.rightMLAI + t.mu.leftRef = t.mu.rightRef + t.mu.rightMLAI = map[roachpb.RangeID]int64{} + t.mu.rightRef = 0 + + t.mu.next = next + } + closed = t.mu.closed + + return closed, mlai +} + +// Track is called before evaluating a proposal. It returns the minimum +// timestamp at which the proposal can be evaluated (i.e. the request timestamp +// needs to be forwarded if necessary), and acquires a reference with the +// Tracker. This reference is released by calling the returned closure either +// a) before proposing the command, supplying the Lease Applied Index at which +// the proposal will be carried out, or +// b) with zero arguments if the command won't end up being proposed (i.e. hit +// an error during evaluation). +// +// The closure is not thread safe. For convenience, it may be called with zero +// arguments once after a regular call. +func (t *Tracker) Track( + ctx context.Context, +) (hlc.Timestamp, func(context.Context, roachpb.RangeID, int64)) { + shouldLog := log.V(3) + + t.mu.Lock() + minProp := t.mu.next.Next() + t.mu.rightRef++ + t.mu.Unlock() + + if shouldLog { + log.Infof(ctx, "track: proposal on the right at minProp %s", minProp) + } + + var calls int + release := func(ctx context.Context, rangeID roachpb.RangeID, lai int64) { + calls++ + if calls != 1 { + if lai != 0 || rangeID != 0 || calls > 2 { + log.Fatal(ctx, log.Safe(fmt.Sprintf("command released %d times, this time with arguments (%d, %d)", calls, rangeID, lai))) + } + return + } + + t.mu.Lock() + defer t.mu.Unlock() + if minProp == t.mu.closed.Next() { + if shouldLog { + log.Infof(ctx, "release: minprop %s on r%d@%d tracked on the left", minProp, rangeID, lai) + } + t.mu.leftRef-- + if t.mu.leftRef < 0 { + log.Fatalf(ctx, "min proposal left ref count < 0") + } + if rangeID == 0 { + return + } + + if curLAI := t.mu.leftMLAI[rangeID]; curLAI < lai { + t.mu.leftMLAI[rangeID] = lai + } + } else if minProp == t.mu.next.Next() { + if shouldLog { + log.Infof(ctx, "release: minprop %s on r%d@%d tracked on the right", minProp, rangeID, lai) + } + t.mu.rightRef-- + if t.mu.rightRef < 0 { + log.Fatalf(ctx, "min proposal right ref count < 0") + } + if rangeID == 0 { + return + } + + if curLAI := t.mu.rightMLAI[rangeID]; curLAI < lai { + t.mu.rightMLAI[rangeID] = lai + } + } else { + log.Fatalf(ctx, "min proposal %s not tracked under closed (%s) or next (%s) timestamp", minProp, t.mu.closed, t.mu.next) + } + } + + return minProp, release +} diff --git a/pkg/storage/minprop/tracker_test.go b/pkg/storage/minprop/tracker_test.go new file mode 100644 index 000000000000..087cefe8cf48 --- /dev/null +++ b/pkg/storage/minprop/tracker_test.go @@ -0,0 +1,293 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package minprop + +import ( + "context" + "fmt" + "runtime" + "sync/atomic" + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + + "github.com/kr/pretty" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" +) + +func TestTrackerClosure(t *testing.T) { + ctx := context.Background() + tracker := NewTracker() + _, done := tracker.Track(ctx) + + done(ctx, 100, 200) + done(ctx, 0, 0) +} + +func ExampleTracker_Close() { + ctx := context.Background() + tracker := NewTracker() + _, slow := tracker.Track(ctx) + _, _ = tracker.Close(hlc.Timestamp{WallTime: 1E9}) + _, fast := tracker.Track(ctx) + + fmt.Println("Slow proposal finishes at LAI 2") + slow(ctx, 99, 2) + closed, m := tracker.Close(hlc.Timestamp{WallTime: 2E9}) + fmt.Println("Closed:", closed, m) + + fmt.Println("Fast proposal finishes at LAI 1") + fast(ctx, 99, 1) + fmt.Println(tracker) + + closed, m = tracker.Close(hlc.Timestamp{WallTime: 3E9}) + fmt.Println("Closed:", closed, m) + fmt.Println("Note how the MLAI has 'regressed' from 2 to 1. The consumer") + fmt.Println("needs to track the maximum over all deltas received.") + + // Output: + // Slow proposal finishes at LAI 2 + // Closed: 1.000000000,0 map[99:2] + // Fast proposal finishes at LAI 1 + // + // closed=1.000000000,0 + // | next=2.000000000,0 + // | left | right + // | 0 # 0 + // | 1 @ (r99) + // v v + // ---------------------------------------------------------> time + // + // Closed: 2.000000000,0 map[99:1] + // Note how the MLAI has 'regressed' from 2 to 1. The consumer + // needs to track the maximum over all deltas received. +} + +func TestTrackerDoubleRelease(t *testing.T) { + var exited bool + log.SetExitFunc(true /* hideStack */, func(int) { exited = true }) + defer log.ResetExitFunc() + + ctx := context.Background() + tracker := NewTracker() + + _, release := tracker.Track(ctx) + release(ctx, 0, 0) + release(ctx, 4, 10) + + if !exited { + t.Fatal("expected fatal error") + } +} + +type modelClient struct { + lai map[roachpb.RangeID]*int64 // read-only map, values accessed atomically + mu struct { + syncutil.Mutex + closed []hlc.Timestamp // closed timestamps + released []map[roachpb.RangeID]int64 // known released LAIs, rotated on Close + m map[roachpb.RangeID]int64 // max over all maps returned from Close() + } +} + +// Operate a Tracker concurrently and verify that closed timestamps don't regress +// and that the emitted MLAIs are not obviously inconsistent with commands we know +// finished. +func TestTrackerConcurrentUse(t *testing.T) { + ctx := context.Background() + tracker := NewTracker() + + const ( + numCmds = 1000 // operations to carry out in total + closeEvery = 20 // turn every i'th operation into a Close + numRanges = 5 + ) + + var mc modelClient + mc.mu.m = map[roachpb.RangeID]int64{} + mc.mu.closed = make([]hlc.Timestamp, 1) + mc.mu.released = []map[roachpb.RangeID]int64{{}, {}, {}} + + mc.lai = map[roachpb.RangeID]*int64{} + for i := roachpb.RangeID(1); i <= numRanges; i++ { + mc.lai[i] = new(int64) + } + + get := func(i int) (roachpb.RangeID, int64) { + rangeID := roachpb.RangeID(1 + (i % numRanges)) + return rangeID, atomic.AddInt64(mc.lai[rangeID], 1) + } + + // It becomes a lot more complicated to collect the released indexes + // correctly when multiple calls to Close are in-flight at any given time. + // The intended use case is for Close to be called from a single goroutine, + // so the test specializes to that situation. + // + // NB: The `mc.mu` sections are intentionally kept small to allow for more + // interleaving between tracked commands and close operations. + var closeMU syncutil.Mutex + close := func(newNext hlc.Timestamp) error { + closeMU.Lock() + defer closeMU.Unlock() + + mc.mu.Lock() + // Note last closed timestamp. + prevClosed := mc.mu.closed[len(mc.mu.closed)-1] + + mc.mu.Unlock() + + t.Log("before closing:", tracker) + closed, m := tracker.Close(newNext) + if closed.Less(prevClosed) { + return errors.Errorf("closed timestamp regressed from %s to %s", prevClosed, closed) + } else if prevClosed == closed && len(m) != 0 { + return errors.Errorf("closed timestamp %s not incremented, but MLAIs %v emitted", prevClosed, m) + } + + mc.mu.Lock() + defer mc.mu.Unlock() + + if closed != prevClosed { + // The released bucket is rotated after each call to Close (we can't + // really do it before because we only want to rotate when a new + // closed timestamp was established). + // + // Taking into account the call to Close we just performed, the + // - current bucket contains: commands that could be on the left + // (expected) or the right: A command could start after our call to + // Close but make it into the pre-rotation bucket. + // - previous bucket contains commands that could be on the left + // or emitted + // - bucket before that contains commands that definitely must have + // been emitted. + // + // So we check the latter bucket. Trying to close the synchronization + // gap would allow checking the middle bucket instead, but this would + // weaken the test overall. + released := mc.mu.released[len(mc.mu.released)-3] + // Rotate released commands bucket. + mc.mu.released = append(mc.mu.released, map[roachpb.RangeID]int64{}) + + for rangeID, mlai := range m { + // Intuitively you expect mc.mu.m[rangeID] < mlai, but this + // doesn't always hold. A slow proposal could get assigned a + // higher lease index on the left side than a "newer" + // proposal on the right. The client really has to track the + // maximum. + // + if mc.mu.m[rangeID] < mlai { + mc.mu.m[rangeID] = mlai + } + + if trackedMLAI, rMLAI := mc.mu.m[rangeID], released[rangeID]; rMLAI > trackedMLAI { + return errors.Errorf( + "incorrect MLAI %d for r%d does not reflect %d:\nemitted: %+v\n%s\nreleased: %s\naggregate: %s", + trackedMLAI, rangeID, rMLAI, m, tracker, pretty.Sprint(mc.mu.released), pretty.Sprint(mc.mu.m), + ) + } + } + } + + // Store latest closed timestamp. + mc.mu.closed = append(mc.mu.closed, closed) + return nil + } + + newNext := func(i int) hlc.Timestamp { + return hlc.Timestamp{WallTime: int64(i) * 1E9} + } + + run := func(i int) error { + if i%closeEvery == 1 { + return close(newNext(i)) + } + + mc.mu.Lock() + prevClosed := mc.mu.closed[len(mc.mu.closed)-1] + mc.mu.Unlock() + + ts, done := tracker.Track(ctx) + if ts.Less(prevClosed) { + return errors.Errorf("%d: proposal forwarded to %s, but closed %s", i, ts, prevClosed) + } + + runtime.Gosched() + + var rangeID roachpb.RangeID + var lai int64 + switch i % 3 { + case 0: + // Successful evaluation. + rangeID, lai = get(i) + done(ctx, rangeID, lai) + case 1: + // Successful evaluation followed by deferred zero call. + rangeID, lai = get(i) + done(ctx, rangeID, lai) + done(ctx, 0, 0) + case 2: + // Failed evaluation. Burns a LAI. + done(ctx, 0, 0) + default: + panic("the impossible happened") + } + + mc.mu.Lock() + if lai != 0 { + mc.mu.released[len(mc.mu.released)-1][rangeID] = lai + } + mc.mu.Unlock() + + return nil + } + + var g errgroup.Group + for i := 0; i < numCmds; i++ { + i := i + g.Go(func() error { + return run(i) + }) + } + + if err := g.Wait(); err != nil { + t.Fatal(err) + } + + // We'd like to at least assert something about the MLAIs below, namely that + // the final view of the client state is equivalent to the MLAIs that were + // actually used by the proposals. To get there, we need to close out twice: + // once to flush the right side to the left, and another time to force it + // to be output. + for i := 0; i < 2; i++ { + if err := close(newNext(numCmds + i)); err != nil { + t.Fatal(err) + } + } + + t.Log(tracker) + + for rangeID, addr := range mc.lai { + assignedMLAI := atomic.LoadInt64(addr) + mlai := mc.mu.m[rangeID] + + if assignedMLAI > mlai { + t.Errorf("r%d: assigned %d, but only %d reflected in final MLAI map", rangeID, assignedMLAI, mlai) + } + } +}