From c92af32586840fb4de3c2167d6685a8af5a26245 Mon Sep 17 00:00:00 2001 From: Bilal Akhtar Date: Wed, 23 Aug 2017 16:48:31 -0400 Subject: [PATCH] distsqlrun: Fix RowChannel race in outbox upon context cancellation. The race was being caused in the `RunSyncFlow()` case only: when the flow's `syncFlowConsumer` is an outbox, not a distSQLReceiver. If `flow.cancel` runs after `outbox.ProducerDone()` has been called by a router/processor, and tries to push an error into it, the outbox panics since its `RowChannel` is already closed. This PR fixes this race by adding the ability to mark the `distSQLReceiver` on the gateway node as cancelled asynchronously of the `Push`/`ProducerDone` calls, and more importantly, doing nothing when the `syncFlowConsumer` is an outbox (or anything other than a distSQLReceiver). Fixes #17851, fixes #17864. --- docs/RFCS/query_cancellation.md | 2 +- pkg/sql/distsql_running.go | 18 +++++++++++++++++- pkg/sql/distsqlrun/base.go | 11 +++++++++++ pkg/sql/distsqlrun/flow.go | 14 ++++++-------- 4 files changed, 35 insertions(+), 10 deletions(-) diff --git a/docs/RFCS/query_cancellation.md b/docs/RFCS/query_cancellation.md index ef299a8489b2..d1ebecdf220e 100644 --- a/docs/RFCS/query_cancellation.md +++ b/docs/RFCS/query_cancellation.md @@ -208,7 +208,7 @@ to any other nodes are also cancelled (by grpc). - Producer processor to consumer: This is a special case only for those processors that do a lot of processing before emitting any rows (such as the sorter). These processors will check the local flow context for cancellation, and return an error to their consumer if it gets cancelled. -- syncFlowConsumer special case: Since `syncFlowConsumer` does not have any streams that +- syncFlowConsumer special case: Since `syncFlowConsumer` on the gateway node does not have any streams that cross node boundaries or call `FlowStream`, an error will be manually pushed to it upon a cancellation request on the gateway node, marking it as closed to all of its producers. diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 398ac89c8982..3e9df03dd334 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -15,10 +15,11 @@ package sql import ( - "golang.org/x/net/context" + "sync/atomic" opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" + "golang.org/x/net/context" "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/kv" @@ -224,6 +225,11 @@ type distSQLReceiver struct { // Once set, no more rows are accepted. err error + // cancelled is atomically set to 1 when this distSQL receiver has been marked + // as cancelled. Upon the next Push(), err is set to a non-nil + // value, and ConsumerClosed is the ConsumerStatus. + cancelled int32 + row parser.Datums status distsqlrun.ConsumerStatus alloc sqlbase.DatumAlloc @@ -257,6 +263,7 @@ type rowResultWriter interface { } var _ distsqlrun.RowReceiver = &distSQLReceiver{} +var _ distsqlrun.CancellableRowReceiver = &distSQLReceiver{} // makeDistSQLReceiver creates a distSQLReceiver. // @@ -325,6 +332,10 @@ func (r *distSQLReceiver) Push( } return r.status } + if r.err == nil && atomic.LoadInt32(&r.cancelled) == 1 { + // Set the error to reflect query cancellation. + r.err = sqlbase.NewQueryCanceledError() + } if r.err != nil { // TODO(andrei): We should drain here. return distsqlrun.ConsumerClosed @@ -370,6 +381,11 @@ func (r *distSQLReceiver) ProducerDone() { r.closed = true } +// SetCancelled is part of the CancellableRowReceiver interface. +func (r *distSQLReceiver) SetCancelled() { + atomic.StoreInt32(&r.cancelled, 1) +} + // updateCaches takes information about some ranges that were mis-planned and // updates the range descriptor and lease-holder caches accordingly. // diff --git a/pkg/sql/distsqlrun/base.go b/pkg/sql/distsqlrun/base.go index 456588bda32f..2806ae6c93e8 100644 --- a/pkg/sql/distsqlrun/base.go +++ b/pkg/sql/distsqlrun/base.go @@ -91,6 +91,17 @@ type RowReceiver interface { ProducerDone() } +// CancellableRowReceiver is a special type of a RowReceiver that can be set to +// cancelled asynchronously (i.e. concurrently or after Push()es and ProducerDone()s). +// Once cancelled, subsequent Push()es return ConsumerClosed. Implemented by distSQLReceiver +// which is the final RowReceiver, and the origin point for propagation of ConsumerClosed +// consumer statuses. +type CancellableRowReceiver interface { + // SetCancelled sets this RowReceiver as cancelled. Subsequent Push()es (if any) + // return a ConsumerStatus of ConsumerClosed. + SetCancelled() +} + // RowSource is any component of a flow that produces rows that cam be consumed // by another component. type RowSource interface { diff --git a/pkg/sql/distsqlrun/flow.go b/pkg/sql/distsqlrun/flow.go index da3c9254ed36..95dc448b8838 100644 --- a/pkg/sql/distsqlrun/flow.go +++ b/pkg/sql/distsqlrun/flow.go @@ -424,9 +424,9 @@ func (f *Flow) RunSync(ctx context.Context) { } // cancel iterates through all unconnected streams of this flow and marks them cancelled. -// It also closes the syncFlowConsumer, if it exists. This function is called in Wait() -// after the associated context has been cancelled. In order to cancel a flow, call -// f.ctxCancel() instead of this function. +// If the syncFlowConsumer is of type CancellableRowReceiver, mark it as cancelled. +// This function is called in Wait() after the associated context has been cancelled. +// In order to cancel a flow, call f.ctxCancel() instead of this function. // // For a detailed description of the distsql query cancellation mechanism, // read docs/RFCS/query_cancellation.md. @@ -452,11 +452,9 @@ func (f *Flow) cancel() { } if f.syncFlowConsumer != nil { - // Push an error to the sync flow consumer. If this is a distSQLReceiver, - // it will call ConsumerClosed when it sees a non-nil error. - f.syncFlowConsumer.Push( - nil, /* row */ - ProducerMetadata{Err: sqlbase.NewQueryCanceledError()}) + if recv, ok := f.syncFlowConsumer.(CancellableRowReceiver); ok { + recv.SetCancelled() + } } }