Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: include Raft application on leaseholder in request trace #72738

Merged
merged 3 commits into from
Nov 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions pkg/kv/kvserver/apply/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ type Command interface {
// that were locally proposed typically have a client waiting on a
// response, so there is additional urgency to apply them quickly.
IsLocal() bool
// Ctx returns the Context in which operations on this Command should be
// performed.
//
// A Command does the unusual thing of capturing a Context because commands
// are generally processed in batches, but different commands might want their
// events going to different places. In particular, commands that have been
// proposed locally get a tracing span tied to the local proposal.
Ctx() context.Context
// AckErrAndFinish signals that the application of the command has been
// rejected due to the provided error. It also relays this rejection of
// the command to its client if it was proposed locally. An error will
Expand Down Expand Up @@ -167,12 +175,13 @@ func takeWhileCmdIter(iter CommandIterator, pred func(Command) bool) CommandIter
// responsible for converting Commands into CheckedCommand. The function
// closes the provided iterator.
func mapCmdIter(
iter CommandIterator, fn func(Command) (CheckedCommand, error),
iter CommandIterator, fn func(context.Context, Command) (CheckedCommand, error),
) (CheckedCommandIterator, error) {
defer iter.Close()
ret := iter.NewCheckedList()
for iter.Valid() {
checked, err := fn(iter.Cur())
cur := iter.Cur()
checked, err := fn(cur.Ctx(), cur)
if err != nil {
ret.Close()
return nil, err
Expand All @@ -188,12 +197,13 @@ func mapCmdIter(
// is responsible for converting CheckedCommand into AppliedCommand. The
// function closes the provided iterator.
func mapCheckedCmdIter(
iter CheckedCommandIterator, fn func(CheckedCommand) (AppliedCommand, error),
iter CheckedCommandIterator, fn func(context.Context, CheckedCommand) (AppliedCommand, error),
) (AppliedCommandIterator, error) {
defer iter.Close()
ret := iter.NewAppliedList()
for iter.Valid() {
applied, err := fn(iter.CurChecked())
curChecked := iter.CurChecked()
applied, err := fn(curChecked.Ctx(), curChecked)
if err != nil {
ret.Close()
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/apply/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type StateMachine interface {
// an untimely crash. This means that applying these side-effects will
// typically update the in-memory representation of the state machine
// to the same state that it would be in if the process restarted.
ApplySideEffects(CheckedCommand) (AppliedCommand, error)
ApplySideEffects(context.Context, CheckedCommand) (AppliedCommand, error)
}

// ErrRemoved can be returned from ApplySideEffects which will stop the task
Expand All @@ -67,7 +67,7 @@ var ErrRemoved = errors.New("replica removed")
type Batch interface {
// Stage inserts a Command into the Batch. In doing so, the Command is
// checked for rejection and a CheckedCommand is returned.
Stage(Command) (CheckedCommand, error)
Stage(context.Context, Command) (CheckedCommand, error)
// ApplyToStateMachine applies the persistent state transitions staged
// in the Batch to the StateMachine, atomically.
ApplyToStateMachine(context.Context) error
Expand Down Expand Up @@ -225,7 +225,7 @@ func (t *Task) AckCommittedEntriesBeforeApplication(ctx context.Context, maxInde
// want to retry the command instead of returning the error to the client.
return forEachCheckedCmdIter(ctx, stagedIter, func(cmd CheckedCommand, ctx context.Context) error {
if !cmd.Rejected() && cmd.IsLocal() && cmd.CanAckBeforeApplication() {
return cmd.AckSuccess(ctx)
return cmd.AckSuccess(cmd.Ctx())
}
return nil
})
Expand Down
11 changes: 6 additions & 5 deletions pkg/kv/kvserver/apply/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ type appliedCmd struct {
*checkedCmd
}

func (c *cmd) Index() uint64 { return c.index }
func (c *cmd) IsTrivial() bool { return !c.nonTrivial }
func (c *cmd) IsLocal() bool { return !c.nonLocal }
func (c *cmd) Index() uint64 { return c.index }
func (c *cmd) IsTrivial() bool { return !c.nonTrivial }
func (c *cmd) IsLocal() bool { return !c.nonLocal }
func (c *cmd) Ctx() context.Context { return context.Background() }
func (c *cmd) AckErrAndFinish(_ context.Context, err error) error {
c.acked = true
c.finished = true
Expand Down Expand Up @@ -138,7 +139,7 @@ func (sm *testStateMachine) NewBatch(ephemeral bool) apply.Batch {
return &testBatch{sm: sm, ephemeral: ephemeral}
}
func (sm *testStateMachine) ApplySideEffects(
cmdI apply.CheckedCommand,
_ context.Context, cmdI apply.CheckedCommand,
) (apply.AppliedCommand, error) {
cmd := cmdI.(*checkedCmd)
sm.appliedSideEffects = append(sm.appliedSideEffects, cmd.index)
Expand All @@ -160,7 +161,7 @@ type testBatch struct {
staged []uint64
}

func (b *testBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error) {
func (b *testBatch) Stage(_ context.Context, cmdI apply.Command) (apply.CheckedCommand, error) {
cmd := cmdI.(*cmd)
b.staged = append(b.staged, cmd.index)
ccmd := checkedCmd{cmd: cmd, rejected: cmd.shouldReject}
Expand Down
8 changes: 7 additions & 1 deletion pkg/kv/kvserver/replica_application_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ func (c *replicatedCmd) IsLocal() bool {
return c.proposal != nil
}

// Ctx implements the apply.Command interface.
func (c *replicatedCmd) Ctx() context.Context {
return c.ctx
}

// AckErrAndFinish implements the apply.Command interface.
func (c *replicatedCmd) AckErrAndFinish(ctx context.Context, err error) error {
if c.IsLocal() {
Expand Down Expand Up @@ -143,7 +148,7 @@ func (c *replicatedCmd) CanAckBeforeApplication() bool {
}

// AckSuccess implements the apply.CheckedCommand interface.
func (c *replicatedCmd) AckSuccess(_ context.Context) error {
func (c *replicatedCmd) AckSuccess(ctx context.Context) error {
if !c.IsLocal() {
return nil
}
Expand All @@ -158,6 +163,7 @@ func (c *replicatedCmd) AckSuccess(_ context.Context) error {
resp.Reply = &reply
resp.EncounteredIntents = c.proposal.Local.DetachEncounteredIntents()
resp.EndTxns = c.proposal.Local.DetachEndTxns(false /* alwaysOnly */)
log.Event(ctx, "ack-ing replication success to the client; application will continue async w.r.t. the client")
c.proposal.signalProposalResult(resp)
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_application_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (d *replicaDecoder) createTracingSpans(ctx context.Context) {
for it.init(&d.cmdBuf); it.Valid(); it.Next() {
cmd := it.cur()
if cmd.IsLocal() {
cmd.ctx, cmd.sp = tracing.ForkSpan(cmd.proposal.ctx, opName)
cmd.ctx, cmd.sp = tracing.ChildSpan(cmd.proposal.ctx, opName)
} else if cmd.raftCmd.TraceData != nil {
// The proposal isn't local, and trace data is available. Extract
// the remote span and start a server-side span that follows from it.
Expand All @@ -159,7 +159,7 @@ func (d *replicaDecoder) createTracingSpans(ctx context.Context) {
)
}
} else {
cmd.ctx, cmd.sp = tracing.ForkSpan(ctx, opName)
cmd.ctx, cmd.sp = tracing.ChildSpan(ctx, opName)
}
}
}
Expand Down
21 changes: 12 additions & 9 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,10 @@ type replicaAppBatch struct {
// the batch. This allows the batch to make an accurate determination about
// whether to accept or reject the next command that is staged without needing
// to actually update the replica state machine in between.
func (b *replicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error) {
func (b *replicaAppBatch) Stage(
ctx context.Context, cmdI apply.Command,
) (apply.CheckedCommand, error) {
cmd := cmdI.(*replicatedCmd)
ctx := cmd.ctx
if cmd.ent.Index == 0 {
return nil, makeNonDeterministicFailure("processRaftCommand requires a non-zero index")
}
Expand All @@ -457,7 +458,7 @@ func (b *replicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error
cmd.raftCmd.LogicalOpLog = nil
cmd.raftCmd.ClosedTimestamp = nil
} else {
if err := b.assertNoCmdClosedTimestampRegression(cmd); err != nil {
if err := b.assertNoCmdClosedTimestampRegression(ctx, cmd); err != nil {
return nil, err
}
if err := b.assertNoWriteBelowClosedTimestamp(cmd); err != nil {
Expand Down Expand Up @@ -992,7 +993,9 @@ func (b *replicaAppBatch) assertNoWriteBelowClosedTimestamp(cmd *replicatedCmd)

// Assert that the closed timestamp carried by the command is not below one from
// previous commands.
func (b *replicaAppBatch) assertNoCmdClosedTimestampRegression(cmd *replicatedCmd) error {
func (b *replicaAppBatch) assertNoCmdClosedTimestampRegression(
ctx context.Context, cmd *replicatedCmd,
) error {
if !raftClosedTimestampAssertionsEnabled {
return nil
}
Expand All @@ -1012,7 +1015,7 @@ func (b *replicaAppBatch) assertNoCmdClosedTimestampRegression(cmd *replicatedCm
prevReq.SafeString("<unknown; not leaseholder or not lease request>")
}

logTail, err := b.r.printRaftTail(cmd.ctx, 100 /* maxEntries */, 2000 /* maxCharsPerEntry */)
logTail, err := b.r.printRaftTail(ctx, 100 /* maxEntries */, 2000 /* maxCharsPerEntry */)
if err != nil {
if logTail != "" {
logTail = logTail + "\n; error printing log: " + err.Error()
Expand Down Expand Up @@ -1043,9 +1046,10 @@ type ephemeralReplicaAppBatch struct {
}

// Stage implements the apply.Batch interface.
func (mb *ephemeralReplicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error) {
func (mb *ephemeralReplicaAppBatch) Stage(
ctx context.Context, cmdI apply.Command,
) (apply.CheckedCommand, error) {
cmd := cmdI.(*replicatedCmd)
ctx := cmd.ctx

mb.r.shouldApplyCommand(ctx, cmd, &mb.state)
mb.state.LeaseAppliedIndex = cmd.leaseIndex
Expand All @@ -1071,10 +1075,9 @@ func (mb *ephemeralReplicaAppBatch) Close() {
// side effects of commands, such as finalizing splits/merges and informing
// raft about applied config changes.
func (sm *replicaStateMachine) ApplySideEffects(
cmdI apply.CheckedCommand,
ctx context.Context, cmdI apply.CheckedCommand,
) (apply.AppliedCommand, error) {
cmd := cmdI.(*replicatedCmd)
ctx := cmd.ctx

// Deal with locking during side-effect handling, which is sometimes
// associated with complex commands such as splits and merged.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_application_state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestReplicaStateMachineChangeReplicas(t *testing.T) {
},
}

checkedCmd, err := b.Stage(cmd)
checkedCmd, err := b.Stage(cmd.ctx, cmd)
require.NoError(t, err)
require.Equal(t, !add, b.changeRemovesReplica)
require.Equal(t, b.state.RaftAppliedIndex, cmd.ent.Index)
Expand All @@ -129,7 +129,7 @@ func TestReplicaStateMachineChangeReplicas(t *testing.T) {
require.NoError(t, err)

// Apply the side effects of the command to the StateMachine.
_, err = sm.ApplySideEffects(checkedCmd)
_, err = sm.ApplySideEffects(checkedCmd.Ctx(), checkedCmd)
if add {
require.NoError(t, err)
} else {
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8359,7 +8359,6 @@ func TestFailureToProcessCommandClearsLocalResult(t *testing.T) {
r.mu.Unlock()

tr := tc.store.cfg.AmbientCtx.Tracer
tr.TestingRecordAsyncSpans() // we assert on async span traces in this test
opCtx, getRecAndFinish := tracing.ContextWithRecordingSpan(ctx, tr, "test-recording")
defer getRecAndFinish()

Expand Down