Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distsqlrun: Fix RowChannel race in outbox upon context cancellation. #17870

Merged
merged 1 commit into from
Aug 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/RFCS/query_cancellation.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ to any other nodes are also cancelled (by grpc).
- Producer processor to consumer: This is a special case only for those processors that
do a lot of processing before emitting any rows (such as the sorter). These processors will check the
local flow context for cancellation, and return an error to their consumer if it gets cancelled.
- syncFlowConsumer special case: Since `syncFlowConsumer` does not have any streams that
- syncFlowConsumer special case: Since `syncFlowConsumer` on the gateway node does not have any streams that
cross node boundaries or call `FlowStream`, an error will be manually pushed to it
upon a cancellation request on the gateway node, marking it as closed to all of its producers.

Expand Down
18 changes: 17 additions & 1 deletion pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
package sql

import (
"golang.org/x/net/context"
"sync/atomic"

opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand Down Expand Up @@ -224,6 +225,11 @@ type distSQLReceiver struct {
// Once set, no more rows are accepted.
err error

// cancelled is atomically set to 1 when this distSQL receiver has been marked
// as cancelled. Upon the next Push(), err is set to a non-nil
// value, and ConsumerClosed is the ConsumerStatus.
cancelled int32

row parser.Datums
status distsqlrun.ConsumerStatus
alloc sqlbase.DatumAlloc
Expand Down Expand Up @@ -257,6 +263,7 @@ type rowResultWriter interface {
}

var _ distsqlrun.RowReceiver = &distSQLReceiver{}
var _ distsqlrun.CancellableRowReceiver = &distSQLReceiver{}

// makeDistSQLReceiver creates a distSQLReceiver.
//
Expand Down Expand Up @@ -325,6 +332,10 @@ func (r *distSQLReceiver) Push(
}
return r.status
}
if r.err == nil && atomic.LoadInt32(&r.cancelled) == 1 {
// Set the error to reflect query cancellation.
r.err = sqlbase.NewQueryCanceledError()
}
if r.err != nil {
// TODO(andrei): We should drain here.
return distsqlrun.ConsumerClosed
Expand Down Expand Up @@ -370,6 +381,11 @@ func (r *distSQLReceiver) ProducerDone() {
r.closed = true
}

// SetCancelled is part of the CancellableRowReceiver interface.
func (r *distSQLReceiver) SetCancelled() {
atomic.StoreInt32(&r.cancelled, 1)
}

// updateCaches takes information about some ranges that were mis-planned and
// updates the range descriptor and lease-holder caches accordingly.
//
Expand Down
11 changes: 11 additions & 0 deletions pkg/sql/distsqlrun/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ type RowReceiver interface {
ProducerDone()
}

// CancellableRowReceiver is a special type of a RowReceiver that can be set to
// cancelled asynchronously (i.e. concurrently or after Push()es and ProducerDone()s).
// Once cancelled, subsequent Push()es return ConsumerClosed. Implemented by distSQLReceiver
// which is the final RowReceiver, and the origin point for propagation of ConsumerClosed
// consumer statuses.
type CancellableRowReceiver interface {
// SetCancelled sets this RowReceiver as cancelled. Subsequent Push()es (if any)
// return a ConsumerStatus of ConsumerClosed.
SetCancelled()
}

// RowSource is any component of a flow that produces rows that cam be consumed
// by another component.
type RowSource interface {
Expand Down
14 changes: 6 additions & 8 deletions pkg/sql/distsqlrun/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,9 +424,9 @@ func (f *Flow) RunSync(ctx context.Context) {
}

// cancel iterates through all unconnected streams of this flow and marks them cancelled.
// It also closes the syncFlowConsumer, if it exists. This function is called in Wait()
// after the associated context has been cancelled. In order to cancel a flow, call
// f.ctxCancel() instead of this function.
// If the syncFlowConsumer is of type CancellableRowReceiver, mark it as cancelled.
// This function is called in Wait() after the associated context has been cancelled.
// In order to cancel a flow, call f.ctxCancel() instead of this function.
//
// For a detailed description of the distsql query cancellation mechanism,
// read docs/RFCS/query_cancellation.md.
Expand All @@ -452,11 +452,9 @@ func (f *Flow) cancel() {
}

if f.syncFlowConsumer != nil {
// Push an error to the sync flow consumer. If this is a distSQLReceiver,
// it will call ConsumerClosed when it sees a non-nil error.
f.syncFlowConsumer.Push(
nil, /* row */
ProducerMetadata{Err: sqlbase.NewQueryCanceledError()})
if recv, ok := f.syncFlowConsumer.(CancellableRowReceiver); ok {
recv.SetCancelled()
}
}
}

Expand Down