Skip to content

Commit

Permalink
Merge #93266
Browse files Browse the repository at this point in the history
93266: kvserver: refactor replicaAppBatch for standalone log application r=pavelkalinnikov a=tbg

This long (but individually small) sequence of commits moves `(*replicaAppBatch).Stage` close to the structure that was prototyped in #93265, where it has the following steps:

- command checks (standalone)
- testing interceptors (replica)
- pre-add triggers (standalone)
- pre-add triggers (replica)
- add (to pebble batch, standalone)
- post-add triggers (standalone)
- post-add triggers (replica)

In standalone application (e.g. for #93244) we'd use an `apply.Batch` that is an `appBatch`, instead of a `replicaAppBatch`, i.e. skip all of the (replica) steps above.

This PR doesn't get us all the way there - we still need to tease apart the `post-add triggers (replica)` step, which currently contains code that should be in `post-add triggers (standalone)`; this is best tackled in a separate PR since it's going to be quite a bit of work.

Touches #75729.

Epic: CRDB-220
Release note: None

Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
craig[bot] and tbg committed Dec 21, 2022
2 parents f61c55a + edf6976 commit e23ef86
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 146 deletions.
68 changes: 68 additions & 0 deletions pkg/kv/kvserver/app_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,37 @@ package kvserver
import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

type appBatchStats struct {
// TODO(sep-raft-log):
// numEntries
// numEntriesBytes
// numEntriesEmpty
numMutations int
numEntriesProcessed int
numEntriesProcessedBytes int64
numEmptyEntries int
followerStoreWriteBytes kvadmission.FollowerStoreWriteBytes
// NB: update `merge` when adding a new field.
}

func (s *appBatchStats) merge(ss appBatchStats) {
s.numMutations += ss.numMutations
s.numEntriesProcessed += ss.numEntriesProcessed
s.numEntriesProcessedBytes += ss.numEntriesProcessedBytes
ss.numEmptyEntries += ss.numEmptyEntries
s.followerStoreWriteBytes.Merge(ss.followerStoreWriteBytes)
}

// appBatch is the in-progress foundation for standalone log entry
// application[^1], i.e. the act of applying raft log entries to the state
// machine in a library-style fashion, without a running CockroachDB server.
Expand All @@ -44,6 +68,7 @@ import (
//
// [^1]: https://github.com/cockroachdb/cockroach/issues/75729
type appBatch struct {
appBatchStats
// TODO(tbg): this will absorb the following fields from replicaAppBatch:
//
// - batch
Expand Down Expand Up @@ -88,6 +113,49 @@ func (b *appBatch) toCheckedCmd(
cmd.Cmd.LogicalOpLog = nil
cmd.Cmd.ClosedTimestamp = nil
} else {
// If the command was using the deprecated version of the MVCCStats proto,
// migrate it to the new version and clear out the field.
res := cmd.ReplicatedResult()
if deprecatedDelta := res.DeprecatedDelta; deprecatedDelta != nil {
if res.Delta != (enginepb.MVCCStatsDelta{}) {
log.Fatalf(ctx, "stats delta not empty but deprecated delta provided: %+v", cmd)
}
res.Delta = deprecatedDelta.ToStatsDelta()
res.DeprecatedDelta = nil
}
log.Event(ctx, "applying command")
}
}

// runPreAddTriggers runs any triggers that must fire before the command is
// added to the appBatch's pebble batch. That is, the pebble batch at this point
// will have materialized the raft log up to but excluding the current command.
func (b *appBatch) runPreAddTriggers(ctx context.Context, cmd *raftlog.ReplicatedCmd) error {
// None currently.
return nil
}

// addWriteBatch adds the command's writes to the batch.
func (b *appBatch) addWriteBatch(
ctx context.Context, batch storage.Batch, cmd *replicatedCmd,
) error {
wb := cmd.Cmd.WriteBatch
if wb == nil {
return nil
}
if mutations, err := storage.PebbleBatchCount(wb.Data); err != nil {
log.Errorf(ctx, "unable to read header of committed WriteBatch: %+v", err)
} else {
b.numMutations += mutations
}
if err := batch.ApplyBatchRepr(wb.Data, false); err != nil {
return errors.Wrapf(err, "unable to apply WriteBatch")
}
return nil
}

func (b *appBatch) runPostAddTriggers(ctx context.Context, cmd *raftlog.ReplicatedCmd) error {
// TODO(sep-raft-log): currently they are commingled in runPostAddTriggersReplicaOnly,
// extract them from that method.
return nil
}
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/apply/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
// All state transitions performed by the state machine are expected to be
// deterministic, which ensures that if each instance is driven from the
// same consistent shared log, they will all stay in sync.
//
// The implementation may not be and commonly is not thread safe.
type StateMachine interface {
// NewEphemeralBatch creates an EphemeralBatch. This kind of batch is not able
// to make changes to the StateMachine, but can be used for the purpose of
Expand All @@ -39,6 +41,9 @@ type StateMachine interface {
// that a group of Commands will have on the replicated state machine.
// Commands are staged in the batch one-by-one and then the entire batch is
// applied to the StateMachine at once via its ApplyToStateMachine method.
//
// There must only be a single EphemeralBatch *or* Batch open at any given
// point in time.
NewBatch() Batch
// ApplySideEffects applies the in-memory side-effects of a Command to
// the replicated state machine. The method will be called in the order
Expand Down Expand Up @@ -66,6 +71,10 @@ var ErrRemoved = errors.New("replica removed")
type EphemeralBatch interface {
// Stage inserts a Command into the Batch. In doing so, the Command is
// checked for rejection and a CheckedCommand is returned.
//
// TODO(tbg): consider renaming this to Add, so that in implementations
// of this we less unambiguously refer to "staging" commands into the
// pebble batch.
Stage(context.Context, Command) (CheckedCommand, error)
// Close closes the batch and releases any resources that it holds.
Close()
Expand Down
Loading

0 comments on commit e23ef86

Please sign in to comment.