Skip to content

Commit

Permalink
sql: clean up ieResultChannel concepts and fix race condition
Browse files Browse the repository at this point in the history
The async and sync implementations were too close to justify two structs.
Also, the async behavior of not stopping the writer in case the reader
called close wasn't desireable. This commit unifies the implementation.
It also ensures that we propagate context errors in all cases triggered
by the closure of the done channel. It also makes closing the channel
idempotent.

Fixes cockroachdb#62948

Release note: None
  • Loading branch information
ajwerner committed Apr 2, 2021
1 parent a5f2143 commit 2d3c4aa
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 100 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ var rowsAffectedResultColumns = colinfo.ResultColumns{
func (ie *InternalExecutor) execInternal(
ctx context.Context,
opName string,
rw ieResultChannel,
rw *ieResultChannel,
txn *kv.Txn,
sessionDataOverride sessiondata.InternalExecutorOverride,
stmt string,
Expand Down
174 changes: 75 additions & 99 deletions pkg/sql/internal_result_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,6 @@ import (
"github.com/cockroachdb/errors"
)

// ieResultChannel is used to coordinate passing results from an
// internalExecutor to its corresponding iterator.
type ieResultChannel interface {
ieResultReader
ieResultWriter
}

// ieResultReader is used to read internalExecutor results.
// It is managed by the rowsIterator.
type ieResultReader interface {
Expand Down Expand Up @@ -67,73 +60,17 @@ var asyncIEResultChannelBufferSize = util.ConstantWithMetamorphicTestRange(

// newAsyncIEResultChannel returns an ieResultChannel which does not attempt to
// synchronize the writer with the reader.
func newAsyncIEResultChannel() ieResultChannel {
return &asyncIEResultChannel{
func newAsyncIEResultChannel() *ieResultChannel {
return &ieResultChannel{
dataCh: make(chan ieIteratorResult, asyncIEResultChannelBufferSize),
doneCh: make(chan struct{}),
}
}

type asyncIEResultChannel struct {
dataCh chan ieIteratorResult
}

var _ ieResultChannel = &asyncIEResultChannel{}

func (c *asyncIEResultChannel) firstResult(
ctx context.Context,
) (_ ieIteratorResult, done bool, err error) {
select {
case <-ctx.Done():
return ieIteratorResult{}, true, ctx.Err()
case res, ok := <-c.dataCh:
if !ok {
return ieIteratorResult{}, true, nil
}
return res, false, nil
}
}

func (c *asyncIEResultChannel) nextResult(
ctx context.Context,
) (_ ieIteratorResult, done bool, err error) {
return c.firstResult(ctx)
}

func (c *asyncIEResultChannel) close() error {
var firstErr error
for {
res, done, err := c.nextResult(context.TODO())
if firstErr == nil {
if res.err != nil {
firstErr = res.err
} else if err != nil {
firstErr = err
}
}
if done {
return firstErr
}
}
}

func (c *asyncIEResultChannel) addResult(ctx context.Context, result ieIteratorResult) error {
select {
case <-ctx.Done():
return ctx.Err()
case c.dataCh <- result:
return nil
}
}

func (c *asyncIEResultChannel) finish() {
close(c.dataCh)
}

// syncIEResultChannel is used to ensure that in execution scenarios which
// do not permit concurrency that there is none. It works by blocking the
// writing goroutine immediately upon sending on the data channel and only
// unblocking it after the reader signals.
type syncIEResultChannel struct {
// ieResultChannel is used to coordinate passing results from an
// internalExecutor to its corresponding iterator. It can be constructed to
// ensure that there is no concurrency between the reader and writer.
type ieResultChannel struct {

// dataCh is the channel on which the connExecutor goroutine sends the rows
// (in addResult) and will block on waitCh after each send. The iterator
Expand All @@ -145,96 +82,135 @@ type syncIEResultChannel struct {
// iterator.
dataCh chan ieIteratorResult

// waitCh is never closed. In all places where the caller may interact with it
// the doneCh is also used. This policy is in place to make it safe to unblock
// both the reader and the writer without any hazards of a blocked reader
// attempting to send on a closed channel.
// waitCh is nil for async ieResultChannels. It is never closed. In all places
// where the caller may interact with it the doneCh is also used. This policy
// is in place to make it safe to unblock both the reader and the writer
// without any hazards of a blocked reader attempting to send on a closed
// channel.
waitCh chan struct{}

// doneCh is used to indicate that the ReadWriter has been closed.
// doneCh is closed under the doneOnce. The doneCh is only used for the
// syncIEResultChannel. This is crucial to ensure that a synchronous writer
// ieResultChannel. This is crucial to ensure that a synchronous writer
// does not attempt to continue to operate after the reader has called close.
doneCh chan struct{}
doneErr error
doneOnce sync.Once
}

var _ ieResultChannel = &syncIEResultChannel{}

// newSyncIEResultChannel returns an ieResultChannel which synchronizes the
// writer with the reader.
func newSyncIEResultChannel() ieResultChannel {
return &syncIEResultChannel{
// newSyncIEResultChannel is used to ensure that in execution scenarios which
// do not permit concurrency that there is none. It works by blocking the
// writing goroutine immediately upon sending on the data channel and only
// unblocking it after the reader signals.
func newSyncIEResultChannel() *ieResultChannel {
return &ieResultChannel{
dataCh: make(chan ieIteratorResult),
waitCh: make(chan struct{}),
doneCh: make(chan struct{}),
}
}

func (i *syncIEResultChannel) firstResult(
func (i *ieResultChannel) firstResult(
ctx context.Context,
) (_ ieIteratorResult, done bool, err error) {
select {
case <-ctx.Done():
return ieIteratorResult{}, true, ctx.Err()
case <-i.doneCh:
return ieIteratorResult{}, true, nil
return ieIteratorResult{}, true, ctx.Err()
case res, ok := <-i.dataCh:
if !ok {
return ieIteratorResult{}, true, nil
return ieIteratorResult{}, true, ctx.Err()
}
return res, false, nil
}
}

func (i *syncIEResultChannel) unblockWriter(ctx context.Context) (done bool, err error) {
func (i *ieResultChannel) maybeUnblockWriter(ctx context.Context) (done bool, err error) {
if i.async() {
return false, nil
}
select {
case <-ctx.Done():
return true, ctx.Err()
case <-i.doneCh:
return true, nil
return true, ctx.Err()
case i.waitCh <- struct{}{}:
return false, nil
}
}

func (i *syncIEResultChannel) finish() {
close(i.dataCh)
func (i *ieResultChannel) async() bool {
return i.waitCh == nil
}

func (i *syncIEResultChannel) nextResult(
func (i *ieResultChannel) nextResult(
ctx context.Context,
) (_ ieIteratorResult, done bool, err error) {
if done, err = i.unblockWriter(ctx); done {
if done, err = i.maybeUnblockWriter(ctx); done {
return ieIteratorResult{}, done, err
}
return i.firstResult(ctx)
}

func (i *syncIEResultChannel) close() error {
i.doneOnce.Do(func() { close(i.doneCh) })
return nil
func (i *ieResultChannel) close() error {
i.doneOnce.Do(func() {
close(i.doneCh)
for {
res, done, err := i.nextResult(context.TODO())
if i.doneErr == nil {
if res.err != nil {
i.doneErr = res.err
} else if err != nil {
i.doneErr = err
}
}
if done {
return
}
}
})
return i.doneErr
}

// errSyncIEResultReaderCanceled is returned by the writer when the reader has
// closed syncIEResultChannel. The error indicates to the writer to shut down
// errIEResultChannelClosed is returned by the writer when the reader has
// closed ieResultChannel. The error indicates to the writer to shut down
// the query execution, but the reader won't propagate it further.
var errSyncIEResultReaderCanceled = errors.New("synchronous ieResultReader closed")
var errIEResultChannelClosed = errors.New("ieResultReader closed")

func (i *syncIEResultChannel) addResult(ctx context.Context, result ieIteratorResult) error {
func (i *ieResultChannel) addResult(ctx context.Context, result ieIteratorResult) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-i.doneCh:
return errSyncIEResultReaderCanceled
// Prefer the context error if there is one.
if ctxErr := ctx.Err(); ctxErr != nil {
return ctxErr
}
return errIEResultChannelClosed
case i.dataCh <- result:
}
return i.maybeBlock(ctx)
}

func (i *ieResultChannel) maybeBlock(ctx context.Context) error {
if i.async() {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-i.doneCh:
return errSyncIEResultReaderCanceled
// Prefer the context error if there is one.
if ctxErr := ctx.Err(); ctxErr != nil {
return ctxErr
}
return errIEResultChannelClosed
case <-i.waitCh:
return nil
}
}

func (i *ieResultChannel) finish() {
close(i.dataCh)
}

0 comments on commit 2d3c4aa

Please sign in to comment.