Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: document raft Storage mental model #131041

Merged
merged 5 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/logstore/sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ type SideloadStorage interface {
// files that remain, or an error.
TruncateTo(_ context.Context, index kvpb.RaftIndex) (freed, retained int64, _ error)
// BytesIfTruncatedFromTo returns the number of bytes that would be freed,
// if one were to truncate [from, to). Additionally, it returns the the
// number of bytes that would be retained >= to.
// if one were to truncate [from, to). Additionally, it returns the number
// of bytes that would be retained >= to.
BytesIfTruncatedFromTo(_ context.Context, from kvpb.RaftIndex, to kvpb.RaftIndex) (freed, retained int64, _ error)
// Returns an absolute path to the file that Get() would return the contents
// of. Does not check whether the file actually exists.
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,6 @@ type Replica struct {
// depending on which lock is being held.
stateLoader stateloader.StateLoader
// on-disk storage for sideloaded SSTables. Always non-nil.
// TODO(pavelkalinnikov): remove sideloaded == nil checks.
sideloaded logstore.SideloadStorage
// stateMachine is used to apply committed raft entries.
stateMachine replicaStateMachine
Expand Down
10 changes: 2 additions & 8 deletions pkg/kv/kvserver/replica_destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@ func (s destroyStatus) Removed() bool {
const mergedTombstoneReplicaID roachpb.ReplicaID = math.MaxInt32

func (r *Replica) postDestroyRaftMuLocked(ctx context.Context, ms enginepb.MVCCStats) error {
// NB: we need the nil check below because it's possible that we're GC'ing a
// Replica without a replicaID, in which case it does not have a sideloaded
// storage.
//
// TODO(tschottdorf): at node startup, we should remove all on-disk
// directories belonging to replicas which aren't present. A crash before a
// call to postDestroyRaftMuLocked will currently leave the files around
Expand All @@ -88,10 +84,8 @@ func (r *Replica) postDestroyRaftMuLocked(ctx context.Context, ms enginepb.MVCCS
// TODO(pavelkalinnikov): coming back in 2023, the above may still happen if:
// (1) state machine syncs, (2) OS crashes before (3) sideloaded was able to
// sync the files removal. The files should be cleaned up on restart.
if r.raftMu.sideloaded != nil {
if err := r.raftMu.sideloaded.Clear(ctx); err != nil {
return err
}
if err := r.raftMu.sideloaded.Clear(ctx); err != nil {
return err
}

// Release the reference to this tenant in metrics, we know the tenant ID is
Expand Down
16 changes: 5 additions & 11 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -2841,8 +2841,6 @@ func handleTruncatedStateBelowRaftPreApply(
// storage engine. This will iterate over the Raft log and sideloaded files, so
// depending on the size of these it can be mildly to extremely expensive and
// thus should not be called frequently.
//
// The sideloaded storage may be nil, in which case it is treated as empty.
func ComputeRaftLogSize(
ctx context.Context,
rangeID roachpb.RangeID,
Expand All @@ -2855,15 +2853,11 @@ func ComputeRaftLogSize(
if err != nil {
return 0, err
}
var totalSideloaded int64
if sideloaded != nil {
var err error
// The remaining bytes if one were to truncate [0, 0) gives us the total
// number of bytes in sideloaded files.
_, totalSideloaded, err = sideloaded.BytesIfTruncatedFromTo(ctx, 0, 0)
if err != nil {
return 0, err
}
// The remaining bytes if one were to truncate [0, 0) gives us the total
// number of bytes in sideloaded files.
_, totalSideloaded, err := sideloaded.BytesIfTruncatedFromTo(ctx, 0, 0)
if err != nil {
return 0, err
}
return ms.SysBytes + totalSideloaded, nil
}
Expand Down
170 changes: 128 additions & 42 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,50 +57,107 @@ var snapshotIngestAsWriteThreshold = settings.RegisterByteSizeSetting(
}())

// replicaRaftStorage implements the raft.Storage interface.
//
// All mutating calls to raft.RawNode require that r.mu is held. All read-only
// calls to raft.RawNode require that r.mu is held at least for reads.
//
// All methods implementing raft.Storage are called from within, or on behalf of
// a RawNode. When called from within RawNode, r.mu is held necessarily (and
pav-kv marked this conversation as resolved.
Show resolved Hide resolved
// maybe r.raftMu). Conceptually, r.mu only needs to be locked for reading, but
// implementation details may require an exclusive lock (see method comments).
// When called from outside RawNode (on behalf of a RawNode "snapshot"), the
// caller must hold r.raftMu and/or r.mu.
//
// RawNode has the in-memory "unstable" state which services most of its needs.
// Most RawNode.Step updates are completed in memory, while only holding r.mu.
//
// RawNode falls back to reading from Storage when it does not have the needed
// state in memory. For example, the leader may need to read log entries from
// storage to construct a log append request for a follower, or a follower may
// need to interact with its storage upon receiving such a request to check
// whether the appended log slice is consistent with raft rules.
//
// (1) RawNode guarantees that everything it reads from Storage has no in-flight
// writes. Raft always reads state that it knows to be stable (meaning it does
// not have pending writes) and, in some cases, also synced / durable. Storage
// acknowledges completed writes / syncs back to RawNode, under r.mu, so that
// RawNode can correctly implement this guarantee.
//
// (2) The content of raft.Storage is always mutated while holding r.raftMu,
// which is an un-contended "IO" mutex and is allowed to be held longer. Most
// writes are extracted from RawNode while holding r.raftMu and r.mu (in the
// Ready() loop), and handed over to storage under r.raftMu. There are a few
// cases when CRDB synthesizes the writes (e.g. during a range split / merge, or
// raft log truncations) under r.raftMu.
//
// The guarantees explain why holding only r.mu is sufficient for RawNode or its
// snapshot to be in a consistent state. Under r.mu, new writes are blocked,
// because of (2), and by (1) reads never conflict with the in-flight writes.
//
// However, r.mu is a widely used mutex, and not recommended for IO. When doing
// work on behalf RawNode that involves IO (like constructing log appends for a
// follower), we would like to release r.mu. The two guarantees make it possible
// to observe a consistent RawNode snapshot while only holding r.raftMu.
//
// While both r.raftMu and r.mu are held, we can take a shallow / COW copy of
// the RawNode or its relevant subset (e.g. the raft log; the Ready struct is
// also considered such). A subsequent release of r.mu allows RawNode to resume
// making progress. The raft.Storage does not observe any new writes while
// r.raftMu is still held, by the guarantee (2). Combined with guarantee (1), it
// means that both the original and the snapshot RawNode remain consistent. The
// shallow copy represents a valid past state of the RawNode.
//
// TODO(pav-kv): the snapshotting with only r.raftMu held is not implemented,
// but should be done soon.
//
// All the implementation methods assume that the required locks are held, and
// don't acquire them. The specific locking requirements are noted in each
// method's comment. The method names do not follow our "Locked" naming
// conventions, due to being an implementation of raft.Storage interface from a
// different package.
//
// Many of the methods defined in this file are wrappers around static
// functions. This is done to facilitate their use from Replica.Snapshot(),
// where it is important that all the data that goes into the snapshot comes
// from a consistent view of the database, and not the replica's in-memory state
// or via a reference to Replica.store.Engine().
type replicaRaftStorage Replica

var _ raft.Storage = (*replicaRaftStorage)(nil)

// All calls to raft.RawNode require that both Replica.raftMu and
// Replica.mu are held. All of the functions exposed via the
// raft.Storage interface will in turn be called from RawNode, so none
// of these methods may acquire either lock, but they may require
// their caller to hold one or both locks (even though they do not
// follow our "Locked" naming convention). Specific locking
// requirements (e.g. whether r.mu must be held for reading or writing)
// are noted in each method's comments.
//
// Many of the methods defined in this file are wrappers around static
// functions. This is done to facilitate their use from
// Replica.Snapshot(), where it is important that all the data that
// goes into the snapshot comes from a consistent view of the
// database, and not the replica's in-memory state or via a reference
// to Replica.store.Engine().

// InitialState implements the raft.Storage interface.
// InitialState requires that r.mu is held for writing because it requires
// exclusive access to r.mu.stateLoader.
func (r *replicaRaftStorage) InitialState() (raftpb.HardState, raftpb.ConfState, error) {
// The call must synchronize with raft IO. Called when raft is initialized
// under both r.raftMu and r.mu. We don't technically need r.mu here, but we
// know it is held.
r.raftMu.AssertHeld()
r.mu.AssertHeld()

ctx := r.AnnotateCtx(context.TODO())
hs, err := r.mu.stateLoader.LoadHardState(ctx, r.store.TODOEngine())
// For uninitialized ranges, membership is unknown at this point.
if raft.IsEmptyHardState(hs) || err != nil {
if err != nil {
r.reportRaftStorageError(err)
}
hs, err := r.raftMu.stateLoader.LoadHardState(ctx, r.store.TODOEngine())
if err != nil {
r.reportRaftStorageError(err)
return raftpb.HardState{}, raftpb.ConfState{}, err
}
// For uninitialized ranges, membership is unknown at this point.
if raft.IsEmptyHardState(hs) {
return raftpb.HardState{}, raftpb.ConfState{}, nil
}
// NB: r.mu.state is guarded by both r.raftMu and r.mu.
cs := r.mu.state.Desc.Replicas().ConfState()
return hs, cs, nil
}

// Entries implements the raft.Storage interface. Note that maxBytes is advisory
// and this method will always return at least one entry even if it exceeds
// maxBytes. Sideloaded proposals count towards maxBytes with their payloads inlined.
// Entries requires that r.mu is held for writing because it requires exclusive
// access to r.mu.stateLoader.
// Entries implements the raft.Storage interface.
//
// NB: maxBytes is advisory, and this method returns at least one entry (unless
// there are none in the requested interval), even if its size exceeds maxBytes.
// Sideloaded entries count towards maxBytes with their payloads inlined.
//
// Entries can return log entries that are not yet stable in durable storage.
// Entries can return log entries that are not yet durable / synced in storage.
//
// Requires that r.mu is held for writing.
// TODO(pav-kv): make it possible to call with only raftMu held.
func (r *replicaRaftStorage) Entries(lo, hi uint64, maxBytes uint64) ([]raftpb.Entry, error) {
entries, err := r.TypedEntries(kvpb.RaftIndex(lo), kvpb.RaftIndex(hi), maxBytes)
if err != nil {
Expand All @@ -112,14 +169,37 @@ func (r *replicaRaftStorage) Entries(lo, hi uint64, maxBytes uint64) ([]raftpb.E
func (r *replicaRaftStorage) TypedEntries(
lo, hi kvpb.RaftIndex, maxBytes uint64,
) ([]raftpb.Entry, error) {
ctx := r.AnnotateCtx(context.TODO())
if r.raftMu.sideloaded == nil {
return nil, errors.New("sideloaded storage is uninitialized")
}
ents, _, loadedSize, err := logstore.LoadEntries(ctx, r.mu.stateLoader.StateLoader, r.store.TODOEngine(), r.RangeID,
r.store.raftEntryCache, r.raftMu.sideloaded, lo, hi, maxBytes, &r.raftMu.bytesAccount)
// The call is always initiated by RawNode, under r.mu. Need it locked for
// writes, for r.mu.stateLoader.
//
// TODO(pav-kv): we have a large class of cases when we would rather only hold
// raftMu while reading the entries. The r.mu lock should be narrow.
r.mu.AssertHeld()
// Writes to the storage engine and the sideloaded storage are made under
// raftMu only. Since we are holding r.mu, but may or may not be holding
// raftMu, this read could be racing with a write.
//
// Such races are prevented at a higher level, in RawNode. Raft never reads at
// a log index for which there is at least one in-flight entry (possibly
// multiple, issued at different leader terms) to storage. It always reads
// "stable" entries.
//
// NB: without this guarantee, there would be a concern with the sideloaded
// storage: it doesn't provide a consistent snapshot to the reader, unlike the
// storage engine. Its Put method writes / syncs a file sequentially, so a
// racing reader would be able to read partial entries.
//
// TODO(pav-kv): we need better safety guardrails here. The log storage type
// can remember the readable bounds, and assert that reads do not cross them.
// TODO(pav-kv): r.raftMu.bytesAccount is broken - can't rely on raftMu here.
entries, _, loadedSize, err := logstore.LoadEntries(
r.AnnotateCtx(context.TODO()),
r.mu.stateLoader.StateLoader, r.store.TODOEngine(), r.RangeID,
r.store.raftEntryCache, r.raftMu.sideloaded, lo, hi, maxBytes,
&r.raftMu.bytesAccount,
)
r.store.metrics.RaftStorageReadBytes.Inc(int64(loadedSize))
return ents, err
return entries, err
}

// raftEntriesLocked requires that r.mu is held for writing.
Expand All @@ -143,17 +223,23 @@ func (r *replicaRaftStorage) Term(i uint64) (uint64, error) {
return uint64(term), err
}

// TypedTerm requires that r.mu is held for writing because it requires exclusive
// access to r.mu.stateLoader.
// TypedTerm requires that r.mu is held for writing because it requires
// exclusive access to r.mu.stateLoader.
//
// TODO(pav-kv): make it possible to read with only raftMu held.
func (r *replicaRaftStorage) TypedTerm(i kvpb.RaftIndex) (kvpb.RaftTerm, error) {
r.mu.AssertHeld()
// TODO(nvanbenschoten): should we set r.mu.lastTermNotDurable when
// r.mu.lastIndexNotDurable == i && r.mu.lastTermNotDurable == invalidLastTerm?
// TODO(pav-kv): we should rather always remember the last entry term, and
// remove invalidLastTerm special case.
if r.mu.lastIndexNotDurable == i && r.mu.lastTermNotDurable != invalidLastTerm {
return r.mu.lastTermNotDurable, nil
}
ctx := r.AnnotateCtx(context.TODO())
return logstore.LoadTerm(ctx, r.mu.stateLoader.StateLoader, r.store.TODOEngine(), r.RangeID,
r.store.raftEntryCache, i)
return logstore.LoadTerm(r.AnnotateCtx(context.TODO()),
r.mu.stateLoader.StateLoader, r.store.TODOEngine(), r.RangeID,
r.store.raftEntryCache, i,
)
}

// raftTermLocked requires that r.mu is locked for writing.
Expand Down
Loading