Skip to content

Commit 2536910

Browse files
committed
storage: return one entry less in Entries
This works around the bug outlined in: etcd-io/etcd#10063 by matching Raft's internal implementation of commit pagination. Once the above PR lands, we can revert this commit (but I assume that it will take a little bit), and I think we should do that because the code hasn't gotten any nicer to look at. Fixes cockroachdb#28918. Release note: None # # Commit message recommendations: # # --- # <pkg>: <short description> # # <long description> # # Release note (category): <release note description> # --- # # Wrap long lines! 72 columns is best. # # The release note must be present if your commit has # user-facing changes. Leave the default above if not. # # Categories for release notes: # - cli change # - sql change # - admin ui change # - general change (e.g., change of required Go version) # - build change (e.g., compatibility with older CPUs) # - enterprise change (e.g., change to backup/restore) # - backwards-incompatible change # - performance improvement # - bug fix # # Commit message recommendations: # # --- # <pkg>: <short description> # # <long description> # # Release note (category): <release note description> # --- # # Wrap long lines! 72 columns is best. # # The release note must be present if your commit has # user-facing changes. Leave the default above if not. # # Categories for release notes: # - cli change # - sql change # - admin ui change # - general change (e.g., change of required Go version) # - build change (e.g., compatibility with older CPUs) # - enterprise change (e.g., change to backup/restore) # - backwards-incompatible change # - performance improvement # - bug fix
1 parent f28c2de commit 2536910

6 files changed

+72
-39
lines changed

pkg/storage/entry_cache.go

+16-11
Original file line numberDiff line numberDiff line change
@@ -137,15 +137,16 @@ func (rec *raftEntryCache) getTerm(rangeID roachpb.RangeID, index uint64) (uint6
137137
// getEntries returns entries between [lo, hi) for specified range.
138138
// If any entries are returned for the specified indexes, they will
139139
// start with index lo and proceed sequentially without gaps until
140-
// 1) all entries exclusive of hi are fetched, 2) > maxBytes of
141-
// entries data is fetched, or 3) a cache miss occurs.
140+
// 1) all entries exclusive of hi are fetched, 2) fetching another entry
141+
// would add up to more than maxBytes of data, or 3) a cache miss occurs.
142+
// The returned size reflects the size of the returned entries.
142143
func (rec *raftEntryCache) getEntries(
143144
ents []raftpb.Entry, rangeID roachpb.RangeID, lo, hi, maxBytes uint64,
144-
) ([]raftpb.Entry, uint64, uint64) {
145+
) (_ []raftpb.Entry, size uint64, nextIndex uint64, exceededMaxBytes bool) {
145146
rec.Lock()
146147
defer rec.Unlock()
147148
var bytes uint64
148-
nextIndex := lo
149+
nextIndex = lo
149150

150151
rec.fromKey = entryCacheKey{RangeID: rangeID, Index: lo}
151152
rec.toKey = entryCacheKey{RangeID: rangeID, Index: hi}
@@ -155,16 +156,20 @@ func (rec *raftEntryCache) getEntries(
155156
return true
156157
}
157158
ent := v.(*raftpb.Entry)
158-
ents = append(ents, *ent)
159-
bytes += uint64(ent.Size())
160-
nextIndex++
161-
if maxBytes > 0 && bytes > maxBytes {
162-
return true
159+
size := uint64(ent.Size())
160+
if bytes+size > maxBytes {
161+
exceededMaxBytes = true
162+
if len(ents) > 0 {
163+
return true
164+
}
163165
}
164-
return false
166+
nextIndex++
167+
bytes += size
168+
ents = append(ents, *ent)
169+
return exceededMaxBytes
165170
}, &rec.fromKey, &rec.toKey)
166171

167-
return ents, bytes, nextIndex
172+
return ents, bytes, nextIndex, exceededMaxBytes
168173
}
169174

170175
// delEntries deletes entries between [lo, hi) for specified range.

pkg/storage/entry_cache_test.go

+9-6
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package storage
1616

1717
import (
18+
"math"
1819
"reflect"
1920
"testing"
2021

@@ -24,6 +25,8 @@ import (
2425
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2526
)
2627

28+
const noLimit = math.MaxUint64
29+
2730
func newEntry(index, size uint64) raftpb.Entry {
2831
return raftpb.Entry{
2932
Index: index,
@@ -48,12 +51,12 @@ func verifyGet(
4851
expEnts []raftpb.Entry,
4952
expNextIndex uint64,
5053
) {
51-
ents, _, nextIndex := rec.getEntries(nil, rangeID, lo, hi, 0)
54+
ents, _, nextIndex, _ := rec.getEntries(nil, rangeID, lo, hi, noLimit)
5255
if !(len(expEnts) == 0 && len(ents) == 0) && !reflect.DeepEqual(expEnts, ents) {
5356
t.Fatalf("expected entries %+v; got %+v", expEnts, ents)
5457
}
5558
if nextIndex != expNextIndex {
56-
t.Fatalf("expected next index %d; got %d", nextIndex, expNextIndex)
59+
t.Fatalf("expected next index %d; got %d", expNextIndex, nextIndex)
5760
}
5861
for _, e := range ents {
5962
term, ok := rec.getTerm(rangeID, e.Index)
@@ -115,10 +118,10 @@ func TestEntryCacheClearTo(t *testing.T) {
115118
rec.addEntries(rangeID, []raftpb.Entry{newEntry(2, 1)})
116119
rec.addEntries(rangeID, []raftpb.Entry{newEntry(20, 1), newEntry(21, 1)})
117120
rec.clearTo(rangeID, 21)
118-
if ents, _, _ := rec.getEntries(nil, rangeID, 2, 21, 0); len(ents) != 0 {
121+
if ents, _, _, _ := rec.getEntries(nil, rangeID, 2, 21, noLimit); len(ents) != 0 {
119122
t.Errorf("expected no entries after clearTo")
120123
}
121-
if ents, _, _ := rec.getEntries(nil, rangeID, 21, 22, 0); len(ents) != 1 {
124+
if ents, _, _, _ := rec.getEntries(nil, rangeID, 21, 22, noLimit); len(ents) != 1 {
122125
t.Errorf("expected entry 22 to remain in the cache clearTo")
123126
}
124127
}
@@ -128,13 +131,13 @@ func TestEntryCacheEviction(t *testing.T) {
128131
rangeID := roachpb.RangeID(1)
129132
rec := newRaftEntryCache(100)
130133
rec.addEntries(rangeID, []raftpb.Entry{newEntry(1, 40), newEntry(2, 40)})
131-
ents, _, hi := rec.getEntries(nil, rangeID, 1, 3, 0)
134+
ents, _, hi, _ := rec.getEntries(nil, rangeID, 1, 3, noLimit)
132135
if len(ents) != 2 || hi != 3 {
133136
t.Errorf("expected both entries; got %+v, %d", ents, hi)
134137
}
135138
// Add another entry to evict first.
136139
rec.addEntries(rangeID, []raftpb.Entry{newEntry(3, 40)})
137-
ents, _, hi = rec.getEntries(nil, rangeID, 2, 4, 0)
140+
ents, _, hi, _ = rec.getEntries(nil, rangeID, 2, 4, noLimit)
138141
if len(ents) != 2 || hi != 4 {
139142
t.Errorf("expected only two entries; got %+v, %d", ents, hi)
140143
}

pkg/storage/replica_raftstorage.go

+12-8
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package storage
1717
import (
1818
"context"
1919
"fmt"
20+
"math"
2021
"time"
2122

2223
"github.com/coreos/etcd/raft"
@@ -76,8 +77,7 @@ func (r *replicaRaftStorage) InitialState() (raftpb.HardState, raftpb.ConfState,
7677

7778
// Entries implements the raft.Storage interface. Note that maxBytes is advisory
7879
// and this method will always return at least one entry even if it exceeds
79-
// maxBytes. Passing maxBytes equal to zero disables size checking. Sideloaded
80-
// proposals count towards maxBytes with their payloads inlined.
80+
// maxBytes. Sideloaded proposals count towards maxBytes with their payloads inlined.
8181
func (r *replicaRaftStorage) Entries(lo, hi, maxBytes uint64) ([]raftpb.Entry, error) {
8282
readonly := r.store.Engine().NewReadOnly()
8383
defer readonly.Close()
@@ -114,10 +114,11 @@ func entries(
114114
}
115115
ents := make([]raftpb.Entry, 0, n)
116116

117-
ents, size, hitIndex := eCache.getEntries(ents, rangeID, lo, hi, maxBytes)
117+
ents, size, hitIndex, exceededMaxBytes := eCache.getEntries(ents, rangeID, lo, hi, maxBytes)
118+
118119
// Return results if the correct number of results came back or if
119120
// we ran into the max bytes limit.
120-
if uint64(len(ents)) == hi-lo || (maxBytes > 0 && size > maxBytes) {
121+
if uint64(len(ents)) == hi-lo || exceededMaxBytes {
121122
return ents, nil
122123
}
123124

@@ -130,7 +131,6 @@ func entries(
130131
canCache := true
131132

132133
var ent raftpb.Entry
133-
exceededMaxBytes := false
134134
scanFunc := func(kv roachpb.KeyValue) (bool, error) {
135135
if err := kv.Value.GetProto(&ent); err != nil {
136136
return false, err
@@ -158,9 +158,13 @@ func entries(
158158

159159
// Note that we track the size of proposals with payloads inlined.
160160
size += uint64(ent.Size())
161-
161+
if size > maxBytes {
162+
exceededMaxBytes = true
163+
if len(ents) > 0 {
164+
return exceededMaxBytes, nil
165+
}
166+
}
162167
ents = append(ents, ent)
163-
exceededMaxBytes = maxBytes > 0 && size > maxBytes
164168
return exceededMaxBytes, nil
165169
}
166170

@@ -274,7 +278,7 @@ func term(
274278
) (uint64, error) {
275279
// entries() accepts a `nil` sideloaded storage and will skip inlining of
276280
// sideloaded entries. We only need the term, so this is what we do.
277-
ents, err := entries(ctx, rsl, eng, rangeID, eCache, nil /* sideloaded */, i, i+1, 0)
281+
ents, err := entries(ctx, rsl, eng, rangeID, eCache, nil /* sideloaded */, i, i+1, math.MaxUint64 /* maxBytes */)
278282
if err == raft.ErrCompacted {
279283
ts, err := rsl.LoadTruncatedState(ctx, eng)
280284
if err != nil {

pkg/storage/replica_sideload.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ func maybeInlineSideloadedRaftCommand(
185185
// We could unmarshal this yet again, but if it's committed we
186186
// are very likely to have appended it recently, in which case
187187
// we can save work.
188-
cachedSingleton, _, _ := entryCache.getEntries(
188+
cachedSingleton, _, _, _ := entryCache.getEntries(
189189
nil, rangeID, ent.Index, ent.Index+1, 1<<20,
190190
)
191191

pkg/storage/replica_sideload_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -823,7 +823,7 @@ func TestRaftSSTableSideloadingSnapshot(t *testing.T) {
823823
if len(entries) != 1 {
824824
t.Fatalf("no or too many entries returned from cache: %+v", entries)
825825
}
826-
ents, _, _ := tc.store.raftEntryCache.getEntries(nil, tc.repl.RangeID, sideloadedIndex, sideloadedIndex+1, 1<<20)
826+
ents, _, _, _ := tc.store.raftEntryCache.getEntries(nil, tc.repl.RangeID, sideloadedIndex, sideloadedIndex+1, 1<<20)
827827
if withSS {
828828
// We passed the sideload storage, so we expect to get our
829829
// inlined index back from the cache.

pkg/storage/replica_test.go

+33-12
Original file line numberDiff line numberDiff line change
@@ -6997,8 +6997,8 @@ func TestEntries(t *testing.T) {
69976997
}},
69986998
// Case 5: Get a single entry from cache.
69996999
{lo: indexes[5], hi: indexes[6], expResultCount: 1, expCacheCount: 1, setup: nil},
7000-
// Case 6: Use MaxUint64 instead of 0 for maxBytes.
7001-
{lo: indexes[5], hi: indexes[9], maxBytes: math.MaxUint64, expResultCount: 4, expCacheCount: 4, setup: nil},
7000+
// Case 6: Get range without size limitation. (Like case 4, without truncating).
7001+
{lo: indexes[5], hi: indexes[9], expResultCount: 4, expCacheCount: 4, setup: nil},
70027002
// Case 7: maxBytes is set low so only a single value should be
70037003
// returned.
70047004
{lo: indexes[5], hi: indexes[9], maxBytes: 1, expResultCount: 1, expCacheCount: 1, setup: nil},
@@ -7044,7 +7044,10 @@ func TestEntries(t *testing.T) {
70447044
if tc.setup != nil {
70457045
tc.setup()
70467046
}
7047-
cacheEntries, _, _ := repl.store.raftEntryCache.getEntries(nil, rangeID, tc.lo, tc.hi, tc.maxBytes)
7047+
if tc.maxBytes == 0 {
7048+
tc.maxBytes = math.MaxUint64
7049+
}
7050+
cacheEntries, _, _, hitLimit := repl.store.raftEntryCache.getEntries(nil, rangeID, tc.lo, tc.hi, tc.maxBytes)
70487051
if len(cacheEntries) != tc.expCacheCount {
70497052
t.Errorf("%d: expected cache count %d, got %d", i, tc.expCacheCount, len(cacheEntries))
70507053
}
@@ -7060,12 +7063,17 @@ func TestEntries(t *testing.T) {
70607063
}
70617064
if len(ents) != tc.expResultCount {
70627065
t.Errorf("%d: expected %d entries, got %d", i, tc.expResultCount, len(ents))
7066+
} else if tc.expResultCount > 0 {
7067+
expHitLimit := ents[len(ents)-1].Index < tc.hi-1
7068+
if hitLimit != expHitLimit {
7069+
t.Errorf("%d: unexpected hit limit: %t", i, hitLimit)
7070+
}
70637071
}
70647072
}
70657073

70667074
// Case 23: Lo must be less than or equal to hi.
70677075
repl.mu.Lock()
7068-
if _, err := repl.raftEntriesLocked(indexes[9], indexes[5], 0); err == nil {
7076+
if _, err := repl.raftEntriesLocked(indexes[9], indexes[5], math.MaxUint64); err == nil {
70697077
t.Errorf("23: error expected, got none")
70707078
}
70717079
repl.mu.Unlock()
@@ -7078,21 +7086,34 @@ func TestEntries(t *testing.T) {
70787086

70797087
repl.mu.Lock()
70807088
defer repl.mu.Unlock()
7081-
if _, err := repl.raftEntriesLocked(indexes[5], indexes[9], 0); err == nil {
7089+
if _, err := repl.raftEntriesLocked(indexes[5], indexes[9], math.MaxUint64); err == nil {
70827090
t.Errorf("24: error expected, got none")
70837091
}
70847092

7085-
// Case 25: don't hit the gap due to maxBytes.
7086-
ents, err := repl.raftEntriesLocked(indexes[5], indexes[9], 1)
7087-
if err != nil {
7088-
t.Errorf("25: expected no error, got %s", err)
7093+
// Case 25a: don't hit the gap due to maxBytes, cache populated.
7094+
{
7095+
ents, err := repl.raftEntriesLocked(indexes[5], indexes[9], 1)
7096+
if err != nil {
7097+
t.Errorf("25: expected no error, got %s", err)
7098+
}
7099+
if len(ents) != 1 {
7100+
t.Errorf("25: expected 1 entry, got %d", len(ents))
7101+
}
70897102
}
7090-
if len(ents) != 1 {
7091-
t.Errorf("25: expected 1 entry, got %d", len(ents))
7103+
// Case 25b: don't hit the gap due to maxBytes, cache cleared.
7104+
{
7105+
repl.store.raftEntryCache.delEntries(rangeID, indexes[5], indexes[5]+1)
7106+
ents, err := repl.raftEntriesLocked(indexes[5], indexes[9], 1)
7107+
if err != nil {
7108+
t.Errorf("25: expected no error, got %s", err)
7109+
}
7110+
if len(ents) != 1 {
7111+
t.Errorf("25: expected 1 entry, got %d", len(ents))
7112+
}
70927113
}
70937114

70947115
// Case 26: don't hit the gap due to truncation.
7095-
if _, err := repl.raftEntriesLocked(indexes[4], indexes[9], 0); err != raft.ErrCompacted {
7116+
if _, err := repl.raftEntriesLocked(indexes[4], indexes[9], math.MaxUint64); err != raft.ErrCompacted {
70967117
t.Errorf("26: expected error %s , got %s", raft.ErrCompacted, err)
70977118
}
70987119
}

0 commit comments

Comments
 (0)