Skip to content

Commit 9d46451

Browse files
committed
storage: return one entry less in Entries
This works around the bug outlined in: etcd-io/etcd#10063 by matching Raft's internal implementation of commit pagination. Once the above PR lands, we can revert this commit (but I assume that it will take a little bit), and I think we should do that because the code hasn't gotten any nicer to look at. Fixes cockroachdb#28918. Release note: None
1 parent 3e62ce0 commit 9d46451

6 files changed

+71
-39
lines changed

pkg/storage/entry_cache.go

+16-11
Original file line numberDiff line numberDiff line change
@@ -173,15 +173,16 @@ func (rec *raftEntryCache) getTerm(rangeID roachpb.RangeID, index uint64) (uint6
173173
// getEntries returns entries between [lo, hi) for specified range.
174174
// If any entries are returned for the specified indexes, they will
175175
// start with index lo and proceed sequentially without gaps until
176-
// 1) all entries exclusive of hi are fetched, 2) > maxBytes of
177-
// entries data is fetched, or 3) a cache miss occurs.
176+
// 1) all entries exclusive of hi are fetched, 2) fetching another entry
177+
// would add up to more than maxBytes of data, or 3) a cache miss occurs.
178+
// The returned size reflects the size of the returned entries.
178179
func (rec *raftEntryCache) getEntries(
179180
ents []raftpb.Entry, rangeID roachpb.RangeID, lo, hi, maxBytes uint64,
180-
) ([]raftpb.Entry, uint64, uint64) {
181+
) (_ []raftpb.Entry, size uint64, nextIndex uint64, exceededMaxBytes bool) {
181182
rec.RLock()
182183
defer rec.RUnlock()
183184
var bytes uint64
184-
nextIndex := lo
185+
nextIndex = lo
185186

186187
fromKey := entryCacheKey{RangeID: rangeID, Index: lo}
187188
toKey := entryCacheKey{RangeID: rangeID, Index: hi}
@@ -191,16 +192,20 @@ func (rec *raftEntryCache) getEntries(
191192
return true
192193
}
193194
ent := v.(*raftpb.Entry)
194-
ents = append(ents, *ent)
195-
bytes += uint64(ent.Size())
196-
nextIndex++
197-
if maxBytes > 0 && bytes > maxBytes {
198-
return true
195+
size := uint64(ent.Size())
196+
if bytes+size > maxBytes {
197+
exceededMaxBytes = true
198+
if len(ents) > 0 {
199+
return true
200+
}
199201
}
200-
return false
202+
nextIndex++
203+
bytes += size
204+
ents = append(ents, *ent)
205+
return exceededMaxBytes
201206
}, &fromKey, &toKey)
202207

203-
return ents, bytes, nextIndex
208+
return ents, bytes, nextIndex, exceededMaxBytes
204209
}
205210

206211
// delEntries deletes entries between [lo, hi) for specified range.

pkg/storage/entry_cache_test.go

+9-6
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package storage
1616

1717
import (
18+
"math"
1819
"reflect"
1920
"testing"
2021

@@ -24,6 +25,8 @@ import (
2425
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2526
)
2627

28+
const noLimit = math.MaxUint64
29+
2730
func newEntry(index, size uint64) raftpb.Entry {
2831
return raftpb.Entry{
2932
Index: index,
@@ -48,12 +51,12 @@ func verifyGet(
4851
expEnts []raftpb.Entry,
4952
expNextIndex uint64,
5053
) {
51-
ents, _, nextIndex := rec.getEntries(nil, rangeID, lo, hi, 0)
54+
ents, _, nextIndex, _ := rec.getEntries(nil, rangeID, lo, hi, noLimit)
5255
if !(len(expEnts) == 0 && len(ents) == 0) && !reflect.DeepEqual(expEnts, ents) {
5356
t.Fatalf("expected entries %+v; got %+v", expEnts, ents)
5457
}
5558
if nextIndex != expNextIndex {
56-
t.Fatalf("expected next index %d; got %d", nextIndex, expNextIndex)
59+
t.Fatalf("expected next index %d; got %d", expNextIndex, nextIndex)
5760
}
5861
for _, e := range ents {
5962
term, ok := rec.getTerm(rangeID, e.Index)
@@ -115,10 +118,10 @@ func TestEntryCacheClearTo(t *testing.T) {
115118
rec.addEntries(rangeID, []raftpb.Entry{newEntry(2, 1)})
116119
rec.addEntries(rangeID, []raftpb.Entry{newEntry(20, 1), newEntry(21, 1)})
117120
rec.clearTo(rangeID, 21)
118-
if ents, _, _ := rec.getEntries(nil, rangeID, 2, 21, 0); len(ents) != 0 {
121+
if ents, _, _, _ := rec.getEntries(nil, rangeID, 2, 21, noLimit); len(ents) != 0 {
119122
t.Errorf("expected no entries after clearTo")
120123
}
121-
if ents, _, _ := rec.getEntries(nil, rangeID, 21, 22, 0); len(ents) != 1 {
124+
if ents, _, _, _ := rec.getEntries(nil, rangeID, 21, 22, noLimit); len(ents) != 1 {
122125
t.Errorf("expected entry 22 to remain in the cache clearTo")
123126
}
124127
}
@@ -128,13 +131,13 @@ func TestEntryCacheEviction(t *testing.T) {
128131
rangeID := roachpb.RangeID(1)
129132
rec := newRaftEntryCache(100)
130133
rec.addEntries(rangeID, []raftpb.Entry{newEntry(1, 40), newEntry(2, 40)})
131-
ents, _, hi := rec.getEntries(nil, rangeID, 1, 3, 0)
134+
ents, _, hi, _ := rec.getEntries(nil, rangeID, 1, 3, noLimit)
132135
if len(ents) != 2 || hi != 3 {
133136
t.Errorf("expected both entries; got %+v, %d", ents, hi)
134137
}
135138
// Add another entry to evict first.
136139
rec.addEntries(rangeID, []raftpb.Entry{newEntry(3, 40)})
137-
ents, _, hi = rec.getEntries(nil, rangeID, 2, 4, 0)
140+
ents, _, hi, _ = rec.getEntries(nil, rangeID, 2, 4, noLimit)
138141
if len(ents) != 2 || hi != 4 {
139142
t.Errorf("expected only two entries; got %+v, %d", ents, hi)
140143
}

pkg/storage/replica_raftstorage.go

+11-8
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,7 @@ func (r *replicaRaftStorage) InitialState() (raftpb.HardState, raftpb.ConfState,
7777

7878
// Entries implements the raft.Storage interface. Note that maxBytes is advisory
7979
// and this method will always return at least one entry even if it exceeds
80-
// maxBytes. Passing maxBytes equal to zero disables size checking. Sideloaded
81-
// proposals count towards maxBytes with their payloads inlined.
80+
// maxBytes. Sideloaded proposals count towards maxBytes with their payloads inlined.
8281
func (r *replicaRaftStorage) Entries(lo, hi, maxBytes uint64) ([]raftpb.Entry, error) {
8382
readonly := r.store.Engine().NewReadOnly()
8483
defer readonly.Close()
@@ -115,10 +114,11 @@ func entries(
115114
}
116115
ents := make([]raftpb.Entry, 0, n)
117116

118-
ents, size, hitIndex := eCache.getEntries(ents, rangeID, lo, hi, maxBytes)
117+
ents, size, hitIndex, exceededMaxBytes := eCache.getEntries(ents, rangeID, lo, hi, maxBytes)
118+
119119
// Return results if the correct number of results came back or if
120120
// we ran into the max bytes limit.
121-
if uint64(len(ents)) == hi-lo || (maxBytes > 0 && size > maxBytes) {
121+
if uint64(len(ents)) == hi-lo || exceededMaxBytes {
122122
return ents, nil
123123
}
124124

@@ -131,7 +131,6 @@ func entries(
131131
canCache := true
132132

133133
var ent raftpb.Entry
134-
exceededMaxBytes := false
135134
scanFunc := func(kv roachpb.KeyValue) (bool, error) {
136135
if err := kv.Value.GetProto(&ent); err != nil {
137136
return false, err
@@ -159,9 +158,13 @@ func entries(
159158

160159
// Note that we track the size of proposals with payloads inlined.
161160
size += uint64(ent.Size())
162-
161+
if size > maxBytes {
162+
exceededMaxBytes = true
163+
if len(ents) > 0 {
164+
return exceededMaxBytes, nil
165+
}
166+
}
163167
ents = append(ents, ent)
164-
exceededMaxBytes = maxBytes > 0 && size > maxBytes
165168
return exceededMaxBytes, nil
166169
}
167170

@@ -275,7 +278,7 @@ func term(
275278
) (uint64, error) {
276279
// entries() accepts a `nil` sideloaded storage and will skip inlining of
277280
// sideloaded entries. We only need the term, so this is what we do.
278-
ents, err := entries(ctx, rsl, eng, rangeID, eCache, nil /* sideloaded */, i, i+1, 0)
281+
ents, err := entries(ctx, rsl, eng, rangeID, eCache, nil /* sideloaded */, i, i+1, math.MaxUint64 /* maxBytes */)
279282
if err == raft.ErrCompacted {
280283
ts, err := rsl.LoadTruncatedState(ctx, eng)
281284
if err != nil {

pkg/storage/replica_sideload.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ func maybeInlineSideloadedRaftCommand(
185185
// We could unmarshal this yet again, but if it's committed we
186186
// are very likely to have appended it recently, in which case
187187
// we can save work.
188-
cachedSingleton, _, _ := entryCache.getEntries(
188+
cachedSingleton, _, _, _ := entryCache.getEntries(
189189
nil, rangeID, ent.Index, ent.Index+1, 1<<20,
190190
)
191191

pkg/storage/replica_sideload_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -849,7 +849,7 @@ func TestRaftSSTableSideloadingSnapshot(t *testing.T) {
849849
if len(entries) != 1 {
850850
t.Fatalf("no or too many entries returned from cache: %+v", entries)
851851
}
852-
ents, _, _ := tc.store.raftEntryCache.getEntries(nil, tc.repl.RangeID, sideloadedIndex, sideloadedIndex+1, 1<<20)
852+
ents, _, _, _ := tc.store.raftEntryCache.getEntries(nil, tc.repl.RangeID, sideloadedIndex, sideloadedIndex+1, 1<<20)
853853
if withSS {
854854
// We passed the sideload storage, so we expect to get our
855855
// inlined index back from the cache.

pkg/storage/replica_test.go

+33-12
Original file line numberDiff line numberDiff line change
@@ -7195,8 +7195,8 @@ func TestEntries(t *testing.T) {
71957195
}},
71967196
// Case 5: Get a single entry from cache.
71977197
{lo: indexes[5], hi: indexes[6], expResultCount: 1, expCacheCount: 1, setup: nil},
7198-
// Case 6: Use MaxUint64 instead of 0 for maxBytes.
7199-
{lo: indexes[5], hi: indexes[9], maxBytes: math.MaxUint64, expResultCount: 4, expCacheCount: 4, setup: nil},
7198+
// Case 6: Get range without size limitation. (Like case 4, without truncating).
7199+
{lo: indexes[5], hi: indexes[9], expResultCount: 4, expCacheCount: 4, setup: nil},
72007200
// Case 7: maxBytes is set low so only a single value should be
72017201
// returned.
72027202
{lo: indexes[5], hi: indexes[9], maxBytes: 1, expResultCount: 1, expCacheCount: 1, setup: nil},
@@ -7242,7 +7242,10 @@ func TestEntries(t *testing.T) {
72427242
if tc.setup != nil {
72437243
tc.setup()
72447244
}
7245-
cacheEntries, _, _ := repl.store.raftEntryCache.getEntries(nil, rangeID, tc.lo, tc.hi, tc.maxBytes)
7245+
if tc.maxBytes == 0 {
7246+
tc.maxBytes = math.MaxUint64
7247+
}
7248+
cacheEntries, _, _, hitLimit := repl.store.raftEntryCache.getEntries(nil, rangeID, tc.lo, tc.hi, tc.maxBytes)
72467249
if len(cacheEntries) != tc.expCacheCount {
72477250
t.Errorf("%d: expected cache count %d, got %d", i, tc.expCacheCount, len(cacheEntries))
72487251
}
@@ -7258,12 +7261,17 @@ func TestEntries(t *testing.T) {
72587261
}
72597262
if len(ents) != tc.expResultCount {
72607263
t.Errorf("%d: expected %d entries, got %d", i, tc.expResultCount, len(ents))
7264+
} else if tc.expResultCount > 0 {
7265+
expHitLimit := ents[len(ents)-1].Index < tc.hi-1
7266+
if hitLimit != expHitLimit {
7267+
t.Errorf("%d: unexpected hit limit: %t", i, hitLimit)
7268+
}
72617269
}
72627270
}
72637271

72647272
// Case 23: Lo must be less than or equal to hi.
72657273
repl.mu.Lock()
7266-
if _, err := repl.raftEntriesLocked(indexes[9], indexes[5], 0); err == nil {
7274+
if _, err := repl.raftEntriesLocked(indexes[9], indexes[5], math.MaxUint64); err == nil {
72677275
t.Errorf("23: error expected, got none")
72687276
}
72697277
repl.mu.Unlock()
@@ -7276,21 +7284,34 @@ func TestEntries(t *testing.T) {
72767284

72777285
repl.mu.Lock()
72787286
defer repl.mu.Unlock()
7279-
if _, err := repl.raftEntriesLocked(indexes[5], indexes[9], 0); err == nil {
7287+
if _, err := repl.raftEntriesLocked(indexes[5], indexes[9], math.MaxUint64); err == nil {
72807288
t.Errorf("24: error expected, got none")
72817289
}
72827290

7283-
// Case 25: don't hit the gap due to maxBytes.
7284-
ents, err := repl.raftEntriesLocked(indexes[5], indexes[9], 1)
7285-
if err != nil {
7286-
t.Errorf("25: expected no error, got %s", err)
7291+
// Case 25a: don't hit the gap due to maxBytes, cache populated.
7292+
{
7293+
ents, err := repl.raftEntriesLocked(indexes[5], indexes[9], 1)
7294+
if err != nil {
7295+
t.Errorf("25: expected no error, got %s", err)
7296+
}
7297+
if len(ents) != 1 {
7298+
t.Errorf("25: expected 1 entry, got %d", len(ents))
7299+
}
72877300
}
7288-
if len(ents) != 1 {
7289-
t.Errorf("25: expected 1 entry, got %d", len(ents))
7301+
// Case 25b: don't hit the gap due to maxBytes, cache cleared.
7302+
{
7303+
repl.store.raftEntryCache.delEntries(rangeID, indexes[5], indexes[5]+1)
7304+
ents, err := repl.raftEntriesLocked(indexes[5], indexes[9], 1)
7305+
if err != nil {
7306+
t.Errorf("25: expected no error, got %s", err)
7307+
}
7308+
if len(ents) != 1 {
7309+
t.Errorf("25: expected 1 entry, got %d", len(ents))
7310+
}
72907311
}
72917312

72927313
// Case 26: don't hit the gap due to truncation.
7293-
if _, err := repl.raftEntriesLocked(indexes[4], indexes[9], 0); err != raft.ErrCompacted {
7314+
if _, err := repl.raftEntriesLocked(indexes[4], indexes[9], math.MaxUint64); err != raft.ErrCompacted {
72947315
t.Errorf("26: expected error %s , got %s", raft.ErrCompacted, err)
72957316
}
72967317
}

0 commit comments

Comments
 (0)