Skip to content

Commit

Permalink
(wip)raft: add comments, update logging
Browse files Browse the repository at this point in the history
References: #136296
Epic: None
Release note: None
  • Loading branch information
hakuuww committed Mar 1, 2025
1 parent 9876826 commit 32382cc
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 70 deletions.
10 changes: 4 additions & 6 deletions pkg/kv/kvserver/logstore/logstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,8 @@ func (s *LogStore) storeEntriesAndCommitBatch(
}
}

// Update raft log entry cache. We clear any older, uncommitted log entries
// and cache the latest ones.
// Update both the term cache and raft log entry cache.
// We clear any older, uncommitted log entries and cache the latest ones.
//
// In the blocking log sync case, these entries are already durable. In the
// non-blocking case, these entries have been written to the pebble engine (so
Expand All @@ -397,10 +397,8 @@ func (s *LogStore) storeEntriesAndCommitBatch(
// splitting its log into an unstable portion for entries that are not known
// to be durable and a stable portion for entries that are known to be
// durable.
s.TermCache.ScanAppend(ctx, m.Entries)
s.EntryCache.Add(s.RangeID, m.Entries, true /* truncate */)
// lock Replica.mu here
_ = s.TermCache.ScanAppend(m.Entries, true /* truncate */)
// unlock it

return state, nil
}
Expand Down Expand Up @@ -630,7 +628,7 @@ func LoadTerm(
metrics Metrics,
) (kvpb.RaftTerm, error) {
metrics.TermCacheAccesses.Inc(1)
term, err := tc.Term(uint64(index))
term, err := tc.Term(ctx, uint64(index))
if err == nil {
// found
metrics.TermCacheHits.Inc(1)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ func (r *Replica) handleTruncatedStateResult(
// Clear any entries in the Raft log entry cache for this range up
// to and including the most recently truncated index.
r.store.raftEntryCache.Clear(r.RangeID, t.Index+1)
_ = r.raftMu.logStorage.TermCache.ClearTo(uint64(t.Index + 1))
r.raftMu.logStorage.TermCache.ClearTo(uint64(t.Index + 1))

// Truncate the sideloaded storage. This is safe only if the new truncated
// state is durably stored on disk, i.e. synced.
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,8 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
return stats, errors.Wrap(err, "while applying snapshot")
}

// Successfully applied the snapshot,
// update the term cache to reflect the new state.
r.raftMu.logStorage.TermCache.ResetWithFirst(snap.Metadata.Term, snap.Metadata.Index)

for _, msg := range app.Responses {
Expand Down
2 changes: 2 additions & 0 deletions pkg/raft/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/kv/kvpb",
"//pkg/raft/confchange",
"//pkg/raft/quorum",
"//pkg/raft/raftlogger",
"//pkg/raft/raftpb",
"//pkg/raft/raftstoreliveness",
"//pkg/raft/tracker",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
Expand Down
131 changes: 68 additions & 63 deletions pkg/raft/termCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,64 @@
package raft

import (
"fmt"
"context"

pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)

type TermCache struct {
cache []entryID
// lastIndex is the last index known to the TermCache that is in the raftLog
// lastIndex is the last index known to the TermCache that is in the raftLog.
// the entry at lastIndex has the same term as TermCache.cache's
// last element's term
// lastIndex == 0 means TermCache is empty
// last entry's term.
//
// lastIndex is set to 0 when TermCache is created.
//
// lastIndex is first updated with the storage engine's
// term: r.shMu.lastTermNotDurable and index: r.shMu.lastIndexNotDurable
// as the first entry. This entry is the last entry in the raftLog's
// persisted part.
//
// in the special case when raftLog is completely empty,
// (can happen when deploying the database for the first time with no data)
// the TruncatedState == LastEntryID
// (the TruncatedState represents the highest applied-to-state machine entry)
//
//
// lastIndex is updated when entries are appended to the TermCache.
//
// Invariants (except for on the creation of the TermCache):
//
// For callers, lastIndex is always populated (lastIndex >= 0),
// and there is always at least one entry being covered in the termCache.
//
// lastIndex mirrors the last index in the storage part of raftLog
lastIndex uint64
// max size of the term cache slice
maxSize uint64
// mu is used to protect the term cache from concurrent accesses.
// Which can happen for example when the following is called concurrently:
// logstore.LoadTerm() in replicaLogStorage.termLocked(),
// and ScanAppend() in logstore/storeEntriesAndCommitBatch() concurrently.
// - logstore.LoadTerm() in replicaLogStorage.termLocked(),
// - ScanAppend() in logstore/storeEntriesAndCommitBatch().
// TODO(hakuuww): using a RWMutex is one way, and can guarantee safeness.
// But maybe we can just lock replica.mu externally when calling ScanAppend()
// and avoid using locks when calling LoadTerm().
mu syncutil.RWMutex
}

// ErrInvalidEntryID is returned when the supplied entryID
// is invalid for the operation.
var ErrInvalidEntryID = errors.New("invalid entry ID")

// ErrUnavailableInTermCache is returned when the term is unavailable in cache.
// It can potentially still be found in a lower level cache (raft entry cache)
var ErrUnavailableInTermCache = errors.New("term not available")

// ErrTermCacheEmpty is returned when the term cache is empty
var ErrTermCacheEmpty = errors.New("termCache is empty")
var ErrTermCacheEmpty = errors.New("term cache is empty")

// errInvalidEntryID is returned when the supplied entryID
// is invalid for the operation. Indicates inconsistent log entries.
var errInvalidEntryID = errors.New("invalid entry ID")

// NewTermCache initializes a TermCache with a fixed maxSize.
func NewTermCache(size uint64) *TermCache {
Expand All @@ -62,12 +87,13 @@ func NewTermCache(size uint64) *TermCache {
}

// Term returns the entry term based on the given entry index.
// Returns error if not in the termCache.
func (tc *TermCache) Term(index uint64) (term uint64, err error) {
// Returns error if not in the term cache.
func (tc *TermCache) Term(ctx context.Context, index uint64) (term uint64, err error) {
tc.mu.RLock()
defer tc.mu.RUnlock()

if len(tc.cache) == 0 {
log.Fatalf(ctx, "term cache is empty when calling Term(), should not happen")
return 0, ErrTermCacheEmpty
}

Expand All @@ -76,7 +102,7 @@ func (tc *TermCache) Term(index uint64) (term uint64, err error) {
return 0, ErrUnavailableInTermCache
}

// in last term of termCache, index <= tc.lastIndex
// in last term of term cache, index <= tc.lastIndex
if index >= tc.lastEntry().index {
return tc.lastEntry().term, nil
}
Expand All @@ -93,97 +119,95 @@ func (tc *TermCache) Term(index uint64) (term uint64, err error) {
// ClearTo clears entries from the TermCache with index strictly less than hi.
// If hi is above the lastIndex, the whole term cache is cleared.
// Mirrors the clearTo function in raftentry/cache.go
func (tc *TermCache) ClearTo(hi uint64) error {
func (tc *TermCache) ClearTo(hi uint64) {
tc.mu.Lock()
defer tc.mu.Unlock()

if len(tc.cache) == 0 || hi <= tc.firstEntry().index {
return nil
return
}

// special cases:
// keep the last entry in storage
if hi > tc.lastIndex {
tc.reset()
return nil
tc.resetWithFirstNoLock(tc.lastEntry().term, tc.lastIndex)
}

// hi is above last entry's index, but lower or equal to lastIndex
if hi > tc.lastEntry().index {
tc.resetWithFirstNoLock(tc.lastEntry().term, hi)
return nil
}

// only keep the last entry
if hi == tc.lastEntry().index {
tc.cache = tc.cache[len(tc.cache)-1:]
return nil
}

// general cases
for i := 0; i < len(tc.cache)-1; i++ {
// hi matches a term flip index
if hi == tc.cache[i].index {
tc.cache = tc.cache[i:]
return nil
return
}

// Allow the first entry in the termCache to not represent a term flip point
// Allow the first entry in the term cache to not represent a term flip point
// cache[0] only tells us entries are in term cache[0].term
// starting from cache[0].index up to
// min(cache[i+1].index-1 /* if not nil*/, lastIndex)
if hi > tc.cache[i].index && hi < tc.cache[i+1].index {
tc.cache[i].index = hi
tc.cache = tc.cache[i:]
return nil
return
}
}
return nil
return
}

// ScanAppend appends a list of raft entries to the TermCache
// It is the caller's responsibility to ensure entries is a valid raftLog.
func (tc *TermCache) ScanAppend(entries []pb.Entry, truncate bool) error {
func (tc *TermCache) ScanAppend(ctx context.Context, entries []pb.Entry) {
tc.mu.Lock()
defer tc.mu.Unlock()

if len(entries) == 0 {
return nil
log.Fatalf(ctx, "Term cache should not be empty when appending entries in batch: \n %v", entries)
return
}

if truncate {
truncIdx := entries[0].Index
_ = tc.truncateFrom(truncIdx)
}
// Reset the term cache accordingly if needed before appending.
truncIdx := entries[0].Index
tc.truncateFrom(truncIdx)

for _, ent := range entries {
if err := tc.append(entryID{ent.Term, ent.Index}); errors.Is(err, ErrInvalidEntryID) {
// This should never happen
panic("invalid entry ID in ScanAppend")
if err := tc.append(entryID{ent.Term, ent.Index}); errors.Is(err, errInvalidEntryID) {
// This should never happen.
log.Fatalf(ctx, "Invalid raftLog detected when trying to append entry %v to TermCache", ent.String())
}
}
return nil
}

// truncateFrom clears all entries from the termCache with index equal to or
// truncateFrom clears all entries from the term cache with index equal to or
// greater than lo. Note that lo itself may or may not be in the cache.
// If lo is lower than the first entry index, the whole term cache is cleared.
// No overwrite if lo is above the lastIndex.
// Mirrors the truncateFrom function in raftentry/cache.go
func (tc *TermCache) truncateFrom(lo uint64) error {
func (tc *TermCache) truncateFrom(lo uint64) {
if len(tc.cache) == 0 || lo > tc.lastIndex {
return nil
return
}

if lo <= tc.firstEntry().index {
tc.reset()
return nil
return
}

for i := len(tc.cache) - 1; i >= 0; i-- {
// lo is in between tc.cache[i].index and tc.cache[i+1].index
if lo > tc.cache[i].index {
tc.cache = tc.cache[:i+1]
tc.lastIndex = lo - 1
return nil
return
}
// lo matches a term flip index
if lo == tc.cache[i].index {
Expand All @@ -197,10 +221,10 @@ func (tc *TermCache) truncateFrom(lo uint64) error {
// invariant after above assignment:
// tc.lastIndex >= tc.cache[i-1].index
}
return nil
return
}
}
return nil
return
}

// append adds a new entryID to the cache.
Expand All @@ -216,7 +240,7 @@ func (tc *TermCache) append(newEntry entryID) error {
// the entry term should be increasing
if newEntry.index <= tc.lastIndex ||
newEntry.term < tc.lastEntry().term {
return ErrInvalidEntryID
return errInvalidEntryID
}

defer func() {
Expand All @@ -236,6 +260,7 @@ func (tc *TermCache) append(newEntry entryID) error {
}

tc.cache = append(tc.cache, newEntry)

return nil
}

Expand All @@ -261,7 +286,7 @@ func (tc *TermCache) ResetWithFirst(term uint64, index uint64) {
tc.append(entryID{term, index})
}

// resetWithFirstNoLock is like ResetWithFirst but does not hold the lock
// resetWithFirstNoLock is like ResetWithFirst but does not hold the lock.
func (tc *TermCache) resetWithFirstNoLock(term uint64, index uint64) {
tc.reset()
tc.append(entryID{term, index})
Expand All @@ -272,23 +297,3 @@ func (tc *TermCache) reset() {
tc.cache = tc.cache[:0]
tc.lastIndex = 0
}

func (tc *TermCache) printTermCache() {
fmt.Print("printTermCache")
fmt.Print("[")
for _, entry := range tc.cache {
fmt.Print(" (")
fmt.Print(entry.index, entry.term)
fmt.Print(")")
}
fmt.Print("] ")
fmt.Print("lastIndex:")
fmt.Println(tc.lastIndex)
}

// PrintEntries prints a slice of Entry structs without printing the Data field
func PrintEntries(entries []pb.Entry) {
for _, entry := range entries {
fmt.Printf("Term: %d, Index: %d, Type: %v\n", entry.Term, entry.Index, entry.Type)
}
}

0 comments on commit 32382cc

Please sign in to comment.