Skip to content

Commit

Permalink
Merge #142016
Browse files Browse the repository at this point in the history
142016: logstore: load last entry term r=tbg a=pav-kv

We already [read](https://github.com/cockroachdb/cockroach/blob/19290ae07afa1a296fca9eee9f77d2e9c60eb804/pkg/raft/log.go#L102-L106) the last entry term when initializing `RawNode`, so there is no additional overhead, except the fact that this entry is now not stashed into the raft entry cache. There appears to be low benefit in putting it into the cache though. Typically, we wouldn't need exactly the `LastIndex` entry post startup, it would more likely be an earlier entry or a future one. So putting `LastIndex` into the cache early on actually makes it ineffective because the cache [drops](https://github.com/cockroachdb/cockroach/blob/19290ae07afa1a296fca9eee9f77d2e9c60eb804/pkg/kv/kvserver/raftentry/ring_buffer.go#L207-L209) entries below the cached slice.

Eventually, on startup we will load the term cache instead of the last entry.

As a nice side-effect, this PR removes the `invalidLastTerm` oddity. It also introduces an invariant that `Replica` always knows the last raft log entry index/term. A panic [here](https://github.com/cockroachdb/cockroach/blob/19290ae07afa1a296fca9eee9f77d2e9c60eb804/pkg/raft/log.go#L102-L106) can be removed too, with an API tweak (not doing here).

Resolves #142013

Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
craig[bot] and pav-kv committed Feb 28, 2025
2 parents 154f10a + 5853627 commit 56b4bff
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 54 deletions.
7 changes: 5 additions & 2 deletions pkg/kv/kvserver/client_manual_proposal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,12 @@ LIMIT
require.NoError(t, err)
defer it.Close()
rsl := logstore.NewStateLoader(rangeID)
lastIndex, err := rsl.LoadLastIndex(ctx, eng)
ts, err := rsl.LoadRaftTruncatedState(ctx, eng)
require.NoError(t, err)
t.Logf("loaded LastIndex: %d", lastIndex)
lastEntryID, err := rsl.LoadLastEntryID(ctx, eng, ts)
require.NoError(t, err)
t.Logf("loaded LastEntryID: %+v", lastEntryID)
lastIndex := lastEntryID.Index
ok, err := it.SeekGE(lastIndex)
require.NoError(t, err)
require.True(t, ok)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/kvstorage/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,10 +405,10 @@ func (r Replica) Load(
}
sl := stateloader.Make(r.Desc.RangeID)
var err error
if ls.LastIndex, err = sl.LoadLastIndex(ctx, eng); err != nil {
if ls.TruncState, err = sl.LoadRaftTruncatedState(ctx, eng); err != nil {
return LoadedReplicaState{}, err
}
if ls.TruncState, err = sl.LoadRaftTruncatedState(ctx, eng); err != nil {
if ls.LastEntryID, err = sl.LoadLastEntryID(ctx, eng, ls.TruncState); err != nil {
return LoadedReplicaState{}, err
}
if ls.ReplState, err = sl.Load(ctx, eng, r.Desc); err != nil {
Expand Down
11 changes: 6 additions & 5 deletions pkg/kv/kvserver/kvstorage/replica_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -23,10 +24,10 @@ import (
// is used to initialize the in-memory Replica instance.
// TODO(pavelkalinnikov): integrate with kvstorage.Replica.
type LoadedReplicaState struct {
ReplicaID roachpb.ReplicaID
LastIndex kvpb.RaftIndex
ReplState kvserverpb.ReplicaState
TruncState kvserverpb.RaftTruncatedState
ReplicaID roachpb.ReplicaID
LastEntryID logstore.EntryID
ReplState kvserverpb.ReplicaState
TruncState kvserverpb.RaftTruncatedState

hardState raftpb.HardState
}
Expand Down Expand Up @@ -59,7 +60,7 @@ func LoadReplicaState(
if ls.TruncState, err = sl.LoadRaftTruncatedState(ctx, eng); err != nil {
return LoadedReplicaState{}, err
}
if ls.LastIndex, err = sl.LoadLastIndex(ctx, eng); err != nil {
if ls.LastEntryID, err = sl.LoadLastEntryID(ctx, eng, ls.TruncState); err != nil {
return LoadedReplicaState{}, err
}
if ls.ReplState, err = sl.Load(ctx, eng, desc); err != nil {
Expand Down
43 changes: 29 additions & 14 deletions pkg/kv/kvserver/logstore/stateloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
Expand Down Expand Up @@ -47,21 +48,30 @@ func NewStateLoader(rangeID roachpb.RangeID) StateLoader {
}
}

// LoadLastIndex loads the last index.
func (sl StateLoader) LoadLastIndex(
ctx context.Context, reader storage.Reader,
) (kvpb.RaftIndex, error) {
// EntryID is an (index, term) pair identifying a raft log entry.
//
// TODO(pav-kv): should be the other way around - RaftTruncatedState is an
// EntryID.
type EntryID = kvserverpb.RaftTruncatedState

// LoadLastEntryID loads the ID of the last entry in the raft log. Returns the
// passed in RaftTruncatedState if the log has no entries. RaftTruncatedState
// must have been just read, or otherwise exist in memory and be consistent with
// the content of the log.
func (sl StateLoader) LoadLastEntryID(
ctx context.Context, reader storage.Reader, ts kvserverpb.RaftTruncatedState,
) (EntryID, error) {
prefix := sl.RaftLogPrefix()
// NB: raft log has no intents.
iter, err := reader.NewMVCCIterator(
ctx, storage.MVCCKeyIterKind, storage.IterOptions{
LowerBound: prefix, ReadCategory: fs.ReplicationReadCategory})
if err != nil {
return 0, err
return EntryID{}, err
}
defer iter.Close()

var lastIndex kvpb.RaftIndex
var last EntryID
iter.SeekLT(storage.MakeMVCCMetadataKey(keys.RaftLogKeyFromPrefix(prefix, math.MaxUint64)))
if ok, _ := iter.Valid(); ok {
key := iter.UnsafeKey().Key
Expand All @@ -70,22 +80,27 @@ func (sl StateLoader) LoadLastIndex(
}
suffix := key[len(prefix):]
var err error
lastIndex, err = keys.DecodeRaftLogKeyFromSuffix(suffix)
last.Index, err = keys.DecodeRaftLogKeyFromSuffix(suffix)
if err != nil {
log.Fatalf(ctx, "unable to decode Raft log index key: %s; %v", key.String(), err)
}
v, err := iter.UnsafeValue()
if err != nil {
log.Fatalf(ctx, "unable to read Raft log entry %d (%s): %v", last.Index, key.String(), err)
}
entry, err := raftlog.RaftEntryFromRawValue(v)
if err != nil {
log.Fatalf(ctx, "unable to decode Raft log entry %d (%s): %v", last.Index, key.String(), err)
}
last.Term = kvpb.RaftTerm(entry.Term)
}

if lastIndex == 0 {
if last.Index == 0 {
// The log is empty, which means we are either starting from scratch
// or the entire log has been truncated away.
lastEnt, err := sl.LoadRaftTruncatedState(ctx, reader)
if err != nil {
return 0, err
}
lastIndex = lastEnt.Index
return ts, nil
}
return lastIndex, nil
return last, nil
}

// LoadRaftTruncatedState loads the truncated state.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/raftlog/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ func NewEntryFromRawValue(b []byte) (*Entry, error) {
return e, nil
}

// raftEntryFromRawValue decodes a raft.Entry from a raw MVCC value.
// RaftEntryFromRawValue decodes a raft.Entry from a raw MVCC value.
//
// Same as NewEntryFromRawValue, but doesn't decode the command and doesn't use
// the pool of entries.
func raftEntryFromRawValue(b []byte) (raftpb.Entry, error) {
func RaftEntryFromRawValue(b []byte) (raftpb.Entry, error) {
var meta enginepb.MVCCMetadata
if err := protoutil.Unmarshal(b, &meta); err != nil {
return raftpb.Entry{}, errors.Wrap(err, "decoding raft log MVCCMetadata")
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/raftlog/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (it *Iterator) load() (bool, error) {
if err != nil {
return false, err
}
if it.entry, err = raftEntryFromRawValue(v); err != nil {
if it.entry, err = RaftEntryFromRawValue(v); err != nil {
return false, err
}
return true, nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,8 @@ func (r *Replica) initRaftMuLockedReplicaMuLocked(s kvstorage.LoadedReplicaState
r.flowControlV2.ForceFlushIndexChangedLocked(context.TODO(), r.shMu.state.ForceFlushIndex.Index)
}
r.shMu.raftTruncState = s.TruncState
r.shMu.lastIndexNotDurable = s.LastIndex
r.shMu.lastTermNotDurable = invalidLastTerm
r.shMu.lastIndexNotDurable = s.LastEntryID.Index
r.shMu.lastTermNotDurable = s.LastEntryID.Term

// Initialize the Raft group. This may replace a Raft group that was installed
// for the uninitialized replica to process Raft requests or snapshots.
Expand Down
8 changes: 2 additions & 6 deletions pkg/kv/kvserver/replica_raftlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,7 @@ func (r *replicaLogStorage) Term(i uint64) (uint64, error) {
func (r *replicaLogStorage) termLocked(i kvpb.RaftIndex) (kvpb.RaftTerm, error) {
// TODO(pav-kv): make it possible to read with only raftMu held.
r.mu.AssertHeld()
// TODO(nvanbenschoten): should we set r.mu.lastTermNotDurable when
// r.mu.lastIndexNotDurable == i && r.mu.lastTermNotDurable == invalidLastTerm?
// TODO(pav-kv): we should rather always remember the last entry term, and
// remove invalidLastTerm special case.
if r.shMu.lastIndexNotDurable == i && r.shMu.lastTermNotDurable != invalidLastTerm {
if r.shMu.lastIndexNotDurable == i {
return r.shMu.lastTermNotDurable, nil
}
return logstore.LoadTerm(r.AnnotateCtx(context.TODO()),
Expand Down Expand Up @@ -248,7 +244,7 @@ func (r *replicaRaftMuLogSnap) termRaftMuLocked(i kvpb.RaftIndex) (kvpb.RaftTerm
r.raftMu.AssertHeld()
// NB: the r.mu fields accessed here are always written under both r.raftMu
// and r.mu, and the reads are safe under r.raftMu.
if r.shMu.lastIndexNotDurable == i && r.shMu.lastTermNotDurable != invalidLastTerm {
if r.shMu.lastIndexNotDurable == i {
return r.shMu.lastTermNotDurable, nil
}
return logstore.LoadTerm(r.AnnotateCtx(context.TODO()),
Expand Down
24 changes: 4 additions & 20 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ import (
"github.com/cockroachdb/redact"
)

// invalidLastTerm is an out-of-band value for r.mu.lastTermNotDurable that
// invalidates lastTermNotDurable caching and forces retrieval of
// Term(lastIndexNotDurable) from the raftEntryCache/Pebble.
const invalidLastTerm = 0

var snapshotIngestAsWriteThreshold = settings.RegisterByteSizeSetting(
settings.SystemOnly,
"kv.snapshot.ingest_as_write_threshold",
Expand Down Expand Up @@ -733,22 +728,11 @@ func (r *Replica) applySnapshot(
// without risking a lock-ordering deadlock.
r.store.mu.Unlock()

// We set the persisted last index to the last applied index. This is
// not a correctness issue, but means that we may have just transferred
// some entries we're about to re-request from the leader and overwrite.
// However, raft.MultiNode currently expects this behavior, and the
// performance implications are not likely to be drastic. If our
// feelings about this ever change, we can add a LastIndex field to
// raftpb.SnapshotMetadata.
// TODO(pav-kv): the above comment seems stale, and needs an update.
//
// TODO(sumeer): We should be able to set the last term to
// nonemptySnap.Metadata.Term. See
// https://github.com/cockroachdb/cockroach/pull/75675#pullrequestreview-867926687
// for a discussion regarding this.
// The log has been cleared and reset to start at the snapshot's applied
// index/term. Update the in-memory metadata accordingly.
r.asLogStorage().updateStateRaftMuLockedMuLocked(logstore.RaftState{
LastIndex: state.RaftAppliedIndex,
LastTerm: invalidLastTerm,
LastIndex: truncState.Index,
LastTerm: truncState.Term,
ByteSize: 0, // the log is empty now
})
r.shMu.raftTruncState = truncState
Expand Down

0 comments on commit 56b4bff

Please sign in to comment.