Skip to content

Commit

Permalink
Merge pull request #114379 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-23.2-114240

release-23.2: rangefeed: fix scheduler catchup iterator race
  • Loading branch information
erikgrinaker authored Nov 15, 2023
2 parents 35e118b + e2b7c6d commit de8d3d0
Showing 1 changed file with 23 additions and 2 deletions.
25 changes: 23 additions & 2 deletions pkg/kv/kvserver/rangefeed/scheduled_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/future"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -574,19 +575,39 @@ func (p *ScheduledProcessor) Filter() *Filter {
// is guaranteed that only single request is modifying processor at any given
// time. It is advisable to use provided processor reference for operations
// rather than using one within closure itself.
// If request can't be queued or processor stoppedC is closed then default
// value is returned.
//
// If the processor is stopped concurrently with the request queueing, it may or
// may not be processed. If the request is ever processed, its return value is
// guaranteed to be returned here. Otherwise, the zero value is returned and the
// request is never processed.
func runRequest[T interface{}](
p *ScheduledProcessor, f func(ctx context.Context, p *ScheduledProcessor) T,
) (r T) {
result := make(chan T, 1)
p.enqueueRequest(func(ctx context.Context) {
result <- f(ctx, p)
// Assert that we never process requests after stoppedC is closed. This is
// necessary to coordinate catchup iter ownership and avoid double-closing.
// Note that request/stop processing is always sequential, see process().
if buildutil.CrdbTestBuild {
select {
case <-p.stoppedC:
log.Fatalf(ctx, "processing request on stopped processor")
default:
}
}
})
select {
case r = <-result:
return r
case <-p.stoppedC:
// If a request and stop were processed in rapid succession, and the node is
// overloaded, this select may observe them happening at the same time and
// take this branch instead of the result with 50% probability. Check again.
select {
case r = <-result:
default:
}
return r
}
}
Expand Down

0 comments on commit de8d3d0

Please sign in to comment.