-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
flowinfra: refactor locking around FlowRegistry #72998
Conversation
2bd29e3
to
17c2740
Compare
d67f721
to
33fab06
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for looking into this!
Reviewed 2 of 2 files at r1, all commit messages.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @jordanlewis, @RaduBerinde, and @yuzefovich)
pkg/sql/flowinfra/flow_registry.go, line 77 at r1 (raw file):
receiver InboundStreamHandler // waitGroup to signal on when finished. waitGroup *sync.WaitGroup
Drive-by comment on old code: it's a little strange to see an object hold on to a reference to a WaitGroup
. At a minimum, that's coupling the InboundStreamInfo
implementation with the orchestration layer above it. It also allows the InboundStreamInfo
to call Add
or Wait
on the waitGroup
, which would be an issue.
Consider replacing the concrete type with an interface { Done() }
.
pkg/sql/flowinfra/flow_registry.go, line 108 at r1 (raw file):
// inboundStreams are streams that receive data from other hosts, through // the FlowStream API. Each InboundStreamInfo has its own mutex, separate // from the FlowRegistry mutex. This map is set in Flow.Setup(), so it is
Could we talk about lock ordering here? Do the mutexes ever need to be held at the same time? If not, let's say that. If yes, then let's talk about which order they should the acquired in.
pkg/sql/flowinfra/flow_registry.go, line 515 at r1 (raw file):
return nil, nil, nil, err } fr.Lock()
Should we push this locking into waitForFlow
? And I guess the same question applies to up above — should we push FlowRegistry
locking into a getEntry
method?
pkg/sql/flowinfra/flow_registry.go, line 534 at r1 (raw file):
return nil, nil, nil, errors.Errorf("flow %s: no inbound stream %d", flowID, streamID) } s.mu.Lock()
Consider pulling some of these checked critical sections into methods on the InboundStreamInfo
. For instance, this looks like a func (*InboundStreamInfo) connect() error
method.
pkg/sql/flowinfra/flow_registry.go, line 548 at r1 (raw file):
s.mu.connected = true s.mu.Unlock() defer func() {
nit: this may be cleaner to just put in the single err
path down below.
pkg/sql/flowinfra/flow_registry.go, line 575 at r1 (raw file):
// The mutex of the streamEntry must be held when calling this method. func (fr *FlowRegistry) finishInboundStreamLocked(streamEntry *InboundStreamInfo) {
We're now mixing some "locked" methods on FlowRegistry
that refer to the FlowRegistry
's mutex and some that refer to the InboundStreamInfo
's mutex. Consider changing the name here to make that more clear.
But also, does this even need to be a method on FlowRegistry
? Can't it just be a finishLocked
method on InboundStreamInfo
? And then maybe add a finish
method alongside it that acquires the lock. Then the cleanup
closure becomes cleanup := s.finish
.
dfffddb
to
e892a32
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review!
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @jordanlewis, @nvanbenschoten, and @RaduBerinde)
pkg/sql/flowinfra/flow_registry.go, line 77 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
Drive-by comment on old code: it's a little strange to see an object hold on to a reference to a
WaitGroup
. At a minimum, that's coupling theInboundStreamInfo
implementation with the orchestration layer above it. It also allows theInboundStreamInfo
to callAdd
orWait
on thewaitGroup
, which would be an issue.Consider replacing the concrete type with an
interface { Done() }
.
Hm, I think the comments are pretty tightly coupled with the idea of using the wait group here. I do see your point though, so I implemented your suggestion and updated the comments. Let me know what you think.
pkg/sql/flowinfra/flow_registry.go, line 108 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
Could we talk about lock ordering here? Do the mutexes ever need to be held at the same time? If not, let's say that. If yes, then let's talk about which order they should the acquired in.
Good point, updated the comment.
pkg/sql/flowinfra/flow_registry.go, line 515 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
Should we push this locking into
waitForFlow
? And I guess the same question applies to up above — should we pushFlowRegistry
locking into agetEntry
method?
I have gone back and forth on pushing locking into waitForFlow
, and I think with my original version it was getting more complicated. Now it seems to be much cleaner, so done.
However, I think we should keep getEntryLocked
unchanged because the FlowRegistry
's mutex protects flowEntry
objects (e.g. flow
field), and in several callsites of getEntryLocked
we are holding the lock for other reasons anyway.
pkg/sql/flowinfra/flow_registry.go, line 575 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
We're now mixing some "locked" methods on
FlowRegistry
that refer to theFlowRegistry
's mutex and some that refer to theInboundStreamInfo
's mutex. Consider changing the name here to make that more clear.But also, does this even need to be a method on
FlowRegistry
? Can't it just be afinishLocked
method onInboundStreamInfo
? And then maybe add afinish
method alongside it that acquires the lock. Then thecleanup
closure becomescleanup := s.finish
.
Your second paragraph is spot on - it definitely makes sense to make this method on InboundStreamInfo
, and things become much cleaner. I didn't change the naming since now with different receivers in place it should be clear which mutex should be help.
e892a32
to
00507f9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
, though this should probably also get a look from someone more familiar with this code.
Reviewed 2 of 2 files at r2, 2 of 2 files at r3, all commit messages.
Reviewable status:complete! 1 of 0 LGTMs obtained (waiting on @jordanlewis and @RaduBerinde)
pkg/sql/flowinfra/flow_registry.go, line 108 at r1 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
Good point, updated the comment.
Aren't the two locks held at the same time in cancelPendingStreamsLocked
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 2 of 2 files at r2, 2 of 2 files at r3, all commit messages.
Reviewable status:complete! 1 of 0 LGTMs obtained (waiting on @jordanlewis, @RaduBerinde, and @yuzefovich)
pkg/sql/flowinfra/flow_registry.go, line 355 at r2 (raw file):
is.cancelLocked() } is.mu.Unlock()
should we use defer
for this and move it up, in case cancelLocked
panics?
00507f9
to
66954ce
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 1 of 0 LGTMs obtained (waiting on @jordanlewis, @nvanbenschoten, @RaduBerinde, and @rytaft)
pkg/sql/flowinfra/flow_registry.go, line 108 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
Aren't the two locks held at the same time in
cancelPendingStreamsLocked
?
Indeed, nice catch! I refactored the code there to not hold the FlowRegistry's lock. This actually simplified the code a bit, PTAL.
pkg/sql/flowinfra/flow_registry.go, line 355 at r2 (raw file):
Previously, rytaft (Rebecca Taft) wrote…
should we use
defer
for this and move it up, in casecancelLocked
panics?
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(although I'm also not familiar with this code)
Reviewed 3 of 3 files at r4, 2 of 2 files at r5, all commit messages.
Reviewable status:complete! 2 of 0 LGTMs obtained (waiting on @jordanlewis and @RaduBerinde)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 3 of 0 LGTMs obtained (waiting on @jordanlewis and @yuzefovich)
pkg/sql/flowinfra/flow_registry.go, line 161 at r4 (raw file):
// inboundStreams are streams that receive data from other hosts, through // the FlowStream API. This map is set in Flow.Setup(), so it is safe to // lookup into concurrently later.
I think we need a similar comment in FlowBase.inboundStreams
and maybeAddRemoteStream
.
pkg/sql/flowinfra/flow_registry.go, line 346 at r4 (raw file):
fr.Unlock() if inboundStreams == nil { return numTimedOutReceivers
return 0
pkg/sql/flowinfra/flow_registry.go, line 354 at r4 (raw file):
var pendingReceiver InboundStreamHandler is.mu.Lock() defer is.mu.Unlock()
Strange to lock all of them, then unlock all of them. I'd just do us.mu.Unlock()
at the end of the block.
66954ce
to
96e2cf9
Compare
This commit refactors the locking done in `FlowRegistry`. Previously, the `FlowRegistry`'s mutex (global process-wide) was protecting three pieces of information: - `flows` map that stores which flows have already been scheduled on the node - `flowEntry` objects (information about each scheduled flow) - `InboundStreamInfo` objects (information about endpoints to serve `FlowStream` RPCs). This commit refactors the `InboundStreamInfo` objects so that each has its own mutex. This cleans up the things a bit and allows us more cleanly refactor the code in `ConnectInboundStream` so that we don't perform any gRPC calls (which might be blocking, rarely) while holding the `FlowRegistry`'s mutex. Release note: None
This limits the coupling of `InboundStreamInfo` and `FlowBase`. Release note: None
96e2cf9
to
26ef4ec
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TFTRs!
bors r+
Reviewable status:
complete! 1 of 0 LGTMs obtained (and 2 stale) (waiting on @jordanlewis, @RaduBerinde, and @rytaft)
pkg/sql/flowinfra/flow_registry.go, line 354 at r4 (raw file):
Previously, RaduBerinde wrote…
Strange to lock all of them, then unlock all of them. I'd just do
us.mu.Unlock()
at the end of the block.
finishLocked
(called by cancelLocked
below) might panic, so Becca suggested to defer the unlocking. I've refactored the code to add a function scope for this defer.
Build succeeded: |
flowinfra: refactor locking around FlowRegistry
This commit refactors the locking done in
FlowRegistry
. Previously,the
FlowRegistry
's mutex (global process-wide) was protecting threepieces of information:
flows
map that stores which flows have already been scheduled on thenode
flowEntry
objects (information about each scheduled flow)InboundStreamInfo
objects (information about endpoints to serveFlowStream
RPCs).This commit refactors the
InboundStreamInfo
objects so that each hasits own mutex. This cleans up the things a bit and allows us more
cleanly refactor the code in
ConnectInboundStream
so that we don'tperform any gRPC calls (which might be blocking, rarely) while holding
the
FlowRegistry
's mutex.Fixes: #72964.
Release note: None
flowinfra: remove the reference to waitGroup from InboundStreamInfo
This limits the coupling of
InboundStreamInfo
andFlowBase
.Release note: None