Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: reject requests when quiescing #51566

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,8 @@ func extractIDs(ids []kvserverbase.CmdIDKey, ents []raftpb.Entry) []kvserverbase
return ids
}

// traceLocalProposals logs a trace event with the provided string for each
// locally proposed command which corresponds to an id in ids.
// traceProposals logs a trace event with the provided string for each proposed
// command which corresponds to an id in ids.
func traceProposals(r *Replica, ids []kvserverbase.CmdIDKey, event string) {
ctxs := make([]context.Context, 0, len(ids))
r.mu.RLock()
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/replica_application_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,11 @@ func (d *replicaDecoder) createTracingSpans(ctx context.Context) {
for it.init(&d.cmdBuf); it.Valid(); it.Next() {
cmd := it.cur()
if cmd.IsLocal() {
cmd.ctx, cmd.sp = tracing.ForkCtxSpan(cmd.proposal.ctx, opName)
cmd.sp = tracing.ForkSpan(cmd.proposal.ctx, opName)
// Wipe the cancelation from cmd.proposal.ctx, if any. Command application
// is not cancelable; nobody should be checking for this ctx's
// cancelation, but let's not tempt the users.
cmd.ctx = d.r.AnnotateCtx(opentracing.ContextWithSpan(context.Background(), cmd.sp))
} else if cmd.raftCmd.TraceData != nil {
// The proposal isn't local, and trace data is available. Extract
// the span context and start a server-side span.
Expand Down
8 changes: 5 additions & 3 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,9 @@ func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.Co
limiter := limit.NewLimiter(rate.Limit(consistencyCheckRate.Get(&r.store.ClusterSettings().SV)))

// Compute SHA asynchronously and store it in a map by UUID.
if err := stopper.RunAsyncTask(ctx, "storage.Replica: computing checksum", func(ctx context.Context) {

// Don't inherit the ctx cancelation.
if err := stopper.RunAsyncTaskNoCancelation(ctx, "storage.Replica: computing checksum", func(ctx context.Context) {
func() {
defer snap.Close()
var snapshot *roachpb.RaftSnapshotData
Expand All @@ -241,7 +243,7 @@ func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.Co

result, err := r.sha512(ctx, desc, snap, snapshot, cc.Mode, limiter)
if err != nil {
log.Errorf(ctx, "%v", err)
log.Errorf(ctx, "error computing checksum: %s", err)
result = nil
}
r.computeChecksumDone(ctx, cc.ChecksumID, result, snapshot)
Expand Down Expand Up @@ -642,7 +644,7 @@ func (r *Replica) handleReadWriteLocalEvalResult(ctx context.Context, lResult re
// blocks waiting for the lease acquisition to finish but it can't finish
// because we're not processing raft messages due to holding
// processRaftMu (and running on the processRaft goroutine).
if err := r.store.Stopper().RunAsyncTask(
if err := r.store.Stopper().RunAsyncTaskNoCancelation(
ctx, "storage.Replica: gossipping first range",
func(ctx context.Context) {
hasLease, pErr := r.getLeaseForGossip(ctx)
Expand Down
21 changes: 21 additions & 0 deletions pkg/kv/kvserver/store_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,27 @@ import (
// be used to update the client transaction object.
func (s *Store) Send(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
// Run the request as a task so that it blocks the closing of the stopper
// (e.g. we can't allow the storage engine to be closed while we're
// evaluationg requests).
// Note that, because of this WithCancelOnQuiescence(), async operations that
// need to outlive this request needs to pay attention to not inherit this
// context cancelation.
ctx, cancel := s.stopper.WithCancelOnQuiesce(ctx)
defer cancel()
var br *roachpb.BatchResponse
var pErr *roachpb.Error
if err := s.stopper.RunTask(ctx, "BatchRequest", func(ctx context.Context) {
br, pErr = s.sendInner(ctx, ba)
}); err != nil {
return nil, roachpb.NewError(err)
}
return br, pErr
}

func (s *Store) sendInner(
ctx context.Context, ba roachpb.BatchRequest,
) (br *roachpb.BatchResponse, pErr *roachpb.Error) {
// Attach any log tags from the store to the context (which normally
// comes from gRPC).
Expand Down
19 changes: 19 additions & 0 deletions pkg/util/stop/stopper.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
opentracing "github.com/opentracing/opentracing-go"
)

Expand Down Expand Up @@ -304,6 +305,10 @@ func (s *Stopper) RunTaskWithErr(

// RunAsyncTask is like RunTask, except the callback is run in a goroutine. The
// method doesn't block for the callback to finish execution.
//
// Note that the task inherits ctx's cancelation. If the caller doesn't intent
// to wait for this task, it needs to make sure it doesn't blindly pass a
// cancelable context. See RunAsyncTaskNoCancelation().
func (s *Stopper) RunAsyncTask(
ctx context.Context, taskName string, f func(context.Context),
) error {
Expand All @@ -325,6 +330,20 @@ func (s *Stopper) RunAsyncTask(
return nil
}

// RunAsyncTaskNoCancelation is like RunAsyncTask except that it runs the task
// with a context that doesn't inherit the caller's cancelation or timeout.
//
// TODO(andrei): Another thing that's reasonable for the caller to want is to
// not pass a cancelation, but for the take to get a cancelation on quiescing
// (WithCancelOnQuiesce). We should refactor this method into a set of options
// to RunAsyncTask.
func (s *Stopper) RunAsyncTaskNoCancelation(
ctx context.Context, taskName string, f func(context.Context),
) error {
asyncCtx := logtags.WithTags(context.Background(), logtags.FromContext(ctx))
return s.RunAsyncTask(asyncCtx, taskName, f)
}

// RunLimitedAsyncTask runs function f in a goroutine, using the given
// channel as a semaphore to limit the number of tasks that are run
// concurrently to the channel's capacity. If wait is true, blocks
Expand Down
15 changes: 10 additions & 5 deletions pkg/util/tracing/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,20 +637,25 @@ func FinishSpan(span opentracing.Span) {
//
// See also ChildSpan() for a "parent-child relationship".
func ForkCtxSpan(ctx context.Context, opName string) (context.Context, opentracing.Span) {
sp := ForkSpan(ctx, opName)
return opentracing.ContextWithSpan(ctx, sp), sp
}

// ForkSpan: see ForkCtxSpan.
func ForkSpan(ctx context.Context, opName string) opentracing.Span {
if span := opentracing.SpanFromContext(ctx); span != nil {
if _, noop := span.(*noopSpan); noop {
// Optimization: avoid ContextWithSpan call if tracing is disabled.
return ctx, span
return span
}
tr := span.Tracer()
if IsBlackHoleSpan(span) {
ns := &tr.(*Tracer).noopSpan
return opentracing.ContextWithSpan(ctx, ns), ns
return &tr.(*Tracer).noopSpan
}
newSpan := tr.StartSpan(opName, opentracing.FollowsFrom(span.Context()), LogTagsFromCtx(ctx))
return opentracing.ContextWithSpan(ctx, newSpan), newSpan
return newSpan
}
return ctx, nil
return nil
}

// ChildSpan opens a span as a child of the current span in the context (if
Expand Down