-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kvserver: add closedts side-transport consumer #61305
kvserver: add closedts side-transport consumer #61305
Conversation
5b4dc2c
to
5d1f1f3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good!
Reviewed 19 of 19 files at r1, 6 of 6 files at r2.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @andreimatei)
pkg/kv/kvserver/replica.go, line 1225 at r1 (raw file):
} var update replicaUpdate
nit: pull this above now :=
, because this is equivalent to the pattern to the shouldExtend
stuff.
pkg/kv/kvserver/replica_follower_read.go, line 162 at r1 (raw file):
optionally takes a optional hint
Needs rewording.
pkg/kv/kvserver/replica_follower_read.go, line 171 at r1 (raw file):
) (_ hlc.Timestamp, ok bool, _ replicaUpdate) { appliedLAI := ctpb.LAI(r.mu.state.LeaseAppliedIndex) lease := *r.mu.state.Lease
nit: this is all locked, so no need to copy the lease. Let's just remove the *
.
pkg/kv/kvserver/replica_follower_read.go, line 191 at r1 (raw file):
} if lease.Expiration != nil {
Leave a note here that we now support closed timestamps on ranges with expiration-based leases, but we'll just wait until v21.2 to avoid migration concerns.
pkg/kv/kvserver/store.go, line 668 at r1 (raw file):
ClosedTimestamp *container.Container ClosedTimestampSender *sidetransport.Sender ClosedTimestampReceiver ClosedTimestampSideTransportProvider
The interface doesn't seem useful here. Why not just *sidetransport.Receiver
?
pkg/kv/kvserver/store.go, line 1775 at r2 (raw file):
} // startClosedTimestampRangefeedSubscriber establishes a new ClosedTimestamp
Reminder that we still need to do something about this.
pkg/kv/kvserver/stores.go, line 128 at r1 (raw file):
ctx context.Context, rangeID roachpb.RangeID, closedTS hlc.Timestamp, lai ctpb.LAI, ) { if err := ls.VisitStores(func(s *Store) error {
👍 very clean.
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 1 at r1 (raw file):
// Copyright 2021 The Cockroach Authors.
Rename to receiver.go
.
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 40 at r1 (raw file):
mu struct { syncutil.Mutex connections map[roachpb.NodeID]*incomingStream
nit: conns
to mirror the Sender
.
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 63 at r1 (raw file):
func (s *Receiver) PushUpdates(stream ctpb.SideTransport_PushUpdatesServer) error { // Create a steam to service this connection. The stream will call back into the server // through onFirstMsg to register itself once it finds out its node id.
s/its/the sender's
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 64 at r1 (raw file):
// Create a steam to service this connection. The stream will call back into the server // through onFirstMsg to register itself once it finds out its node id. ctx := s.AnnotateCtx(context.Background())
Doesn't stream
have a stream.Context()
?
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 72 at r1 (raw file):
// order to use this closed timestamp. // // leaseholderNode is the last known leaseholder for the range.
Justify why we only need to look at the stream for this leaseholder.
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 104 at r1 (raw file):
} // onRecvErr is called when one of the inbound stream errors out. The stream is
s/stream/streams/
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 132 at r1 (raw file):
// lastTimestamps maintains the last (highest) closed timestamps // communicated by the client node. lastTimestamps [roachpb.MAX_CLOSED_TIMESTAMP_POLICY]hlc.Timestamp
lastClosed
to mirror the Sender?
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 136 at r1 (raw file):
// for. These are the ranges that the closed timestamps in lastTimestamps // apply to. ranges map[roachpb.RangeID]rangeInfo
tracked
to mirror the Sender?
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 138 at r1 (raw file):
ranges map[roachpb.RangeID]rangeInfo // lastSeq is the sequence number of the last message received. lastSeq ctpb.SeqNum
Actually, isn't this all the same as sender.trackedMu
? Should we pull out a shared struct. Something like trackingState
struct that looks exactly like sender.trackedMu
does right now.
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 142 at r1 (raw file):
} type rangeInfo struct {
Can we remove this and use trackedRange
?
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 147 at r1 (raw file):
} // StoresInterface is the Stores interface needed by the
nit: Just call this Stores
, and leave a comment like the one on Replica
in sender.go.
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 151 at r1 (raw file):
type StoresInterface interface { // ForwardSideTransportClosedTimestampForRange forwards the side-transport // closed timestamp for the local replicas of the given range.
minor nit: replica(s)
just to hint that there is usually only one replica per range.
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 164 at r1 (raw file):
} func (r *incomingStream) String() string {
Do you still want this or was it for debugging purposes? Either way, more it below all other incomingStream
methods - it's very distracting.
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 222 at r1 (raw file):
} if msg.NodeID != r.nodeID {
else if
?
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 240 at r1 (raw file):
// Handle the removed ranges. In order to not lose closed ts info, before we // can remove a range from our tracking, we copy the info about its closed // timestamp to the local replica(s).
Add to the comment that we must do this before updating r.mu.lastTimestamps
.
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 248 at r1 (raw file):
r.stores.ForwardSideTransportClosedTimestampForRange( ctx, rangeID, r.mu.lastTimestamps[info.policy], info.lai) delete(r.mu.ranges, rangeID)
Pull this up before ForwardSideTransportClosedTimestampForRange
.
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 269 at r1 (raw file):
stream ctpb.SideTransport_PushUpdatesServer, ) error { // We have to do the stream processing on a separate goroutine because Recv()
Is this common? I'm a little surprised we have to do this. I wonder if it's because we were ignoring the stream's context (see comment above).
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 212 at r2 (raw file):
// processUpdate processes one update received on the stream, updating the local // state. func (r *incomingStream) processUpdate(ctx context.Context, msg *ctpb.Update) {
This seems straightforward to unit test. We'll just need to mock out the Stores
interface.
pkg/kv/kvserver/closedts/sidetransport/sender.go, line 190 at r1 (raw file):
} else { // Disable the side-transport. timer.Stop()
Reminder: I think you're fixing something here right now.
5d1f1f3
to
0a1ef9d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @andreimatei and @nvanbenschoten)
pkg/kv/kvserver/replica.go, line 1225 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
nit: pull this above
now :=
, because this is equivalent to the pattern to theshouldExtend
stuff.
done
pkg/kv/kvserver/replica_follower_read.go, line 162 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
optionally takes a optional hint
Needs rewording.
done
pkg/kv/kvserver/replica_follower_read.go, line 171 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
nit: this is all locked, so no need to copy the lease. Let's just remove the
*
.
done
pkg/kv/kvserver/replica_follower_read.go, line 191 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
Leave a note here that we now support closed timestamps on ranges with expiration-based leases, but we'll just wait until v21.2 to avoid migration concerns.
done. I had hesitated before because of concerns that even longer term these expiration-based ranges don't necessarily have a lease - but then again neither do the regular ranges necessarily.
pkg/kv/kvserver/store.go, line 668 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
The interface doesn't seem useful here. Why not just
*sidetransport.Receiver
?
done
pkg/kv/kvserver/store.go, line 1775 at r2 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
Reminder that we still need to do something about this.
see now, in the 2nd commit
pkg/kv/kvserver/closedts/sidetransport/sender.go, line 190 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
Reminder: I think you're fixing something here right now.
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 1 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
Rename to
receiver.go
.
done
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 40 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
nit:
conns
to mirror theSender
.
done
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 63 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
s/its/the sender's
done
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 64 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
Doesn't
stream
have astream.Context()
?
done
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 72 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
Justify why we only need to look at the stream for this leaseholder.
see now
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 104 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
s/stream/streams/
done
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 132 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
lastClosed
to mirror the Sender?
done
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 136 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
tracked
to mirror the Sender?
n/a
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 138 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
Actually, isn't this all the same as
sender.trackedMu
? Should we pull out a shared struct. Something liketrackingState
struct that looks exactly likesender.trackedMu
does right now.
Done. I think I like.
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 142 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
Can we remove this and use
trackedRange
?
done
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 147 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
nit: Just call this
Stores
, and leave a comment like the one onReplica
in sender.go.
done
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 151 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
minor nit:
replica(s)
just to hint that there is usually only one replica per range.
done
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 164 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
Do you still want this or was it for debugging purposes? Either way, more it below all other
incomingStream
methods - it's very distracting.
still want it, although it's not yet used.
Moved.
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 222 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
else if
?
mmm I don't know. If it was a panic instread of log.Fatal()
above, the linter wouldn't even let me do else
.
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 240 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
Add to the comment that we must do this before updating
r.mu.lastTimestamps
.
done
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 248 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
Pull this up before
ForwardSideTransportClosedTimestampForRange
.
done
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 269 at r1 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
Is this common? I'm a little surprised we have to do this. I wonder if it's because we were ignoring the stream's context (see comment above).
I was surprised too and spent an hour in old, frustrating gRPC threads. The only way to interrupt a Recv() is to return from the RPC handler. I'm not sure under what circumstances the stream's context gets canceled, but it certainly does not happen in direct relation with our stopper quiescence. It's possible that it happens in connection to calling some type of drain/stop on the gRPC server, but I dunno.
d87ab98
to
6e5a2f2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good. I think we'll still want a few basic unit tests right around incomingStream.processUpdate
, but after that and once CI is happy, I think we can merge.
Reviewed 13 of 16 files at r3, 11 of 11 files at r4.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @andreimatei)
pkg/kv/kvserver/closedts/sidetransport/sender.go, line 37 at r4 (raw file):
// Sender represents the sending-side of the closed timestamps "side-transport". // Its role is to periodically advance the closed timestamps of all the tracked
Looks like a rename from tracked
to ranges
went bad all over the file.
pkg/kv/kvserver/closedts/sidetransport/sender_test.go, line 194 at r4 (raw file):
func TestSenderConnectionChanges(t *testing.T) { // TODO: Two tracked.
Rename went bad here too.
4b6f507
to
def7a80
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed the deadlock between removals and replicas querying the closed timestamp.
Added some tests.
Moved the Receiver to an RWMutex.
Appended a commit mucking with a test.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @andreimatei and @nvanbenschoten)
pkg/kv/kvserver/closedts/sidetransport/sender.go, line 37 at r4 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
Looks like a rename from
tracked
toranges
went bad all over the file.
fixed
pkg/kv/kvserver/closedts/sidetransport/sender_test.go, line 194 at r4 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
Rename went bad here too.
fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 8 of 19 files at r5, 11 of 11 files at r6, 2 of 2 files at r7.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @andreimatei)
pkg/kv/kvserver/closedts/sidetransport/sender_test.go, line 52 at r6 (raw file):
func (m *mockReplica) StoreID() roachpb.StoreID { return m.storeID } func (m *mockReplica) GetRangeID() roachpb.RangeID { return m.rangeID } func (m *mockReplica) Desc() *roachpb.RangeDescriptor {
This has me wondering whether we should return the descriptor from BumpSideTransportClosed
to avoid locking each replica twice. What do you think? Doesn't need to be in this change.
pkg/kv/kvserver/closedts/sidetransport/sender_test.go, line 505 at r6 (raw file):
// 1) and yet that Receiver doesn't leak goroutine, which proves that nobody // connected to it (a node doesn't connect to itself). go func() {
Is this a goroutine because we don't expect the receive to complete until after the stopper has been triggered?
pkg/kv/kvserver/closedts/sidetransport/receiver.go, line 148 at r6 (raw file):
} } type incomingStreamTestingKnobs struct {
nit: spacing above
pkg/kv/kvserver/closedts/sidetransport/receiver.go, line 262 at r6 (raw file):
for i, rangeID := range removed { info := infos[i] r.stores.ForwardSideTransportClosedTimestampForRange(
Why can't we do this under the read lock? I thought that was part of the point? It would be a shame if we needed to allocate the infos
slice for ranges that aren't even local.
Maybe this structure was built before the RWMutex split.
def7a80
to
cfde2bb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @andreimatei and @nvanbenschoten)
pkg/kv/kvserver/closedts/sidetransport/sender_test.go, line 52 at r6 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
This has me wondering whether we should return the descriptor from
BumpSideTransportClosed
to avoid locking each replica twice. What do you think? Doesn't need to be in this change.
Good idea. Will do separately.
pkg/kv/kvserver/closedts/sidetransport/sender_test.go, line 505 at r6 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
Is this a goroutine because we don't expect the receive to complete until after the stopper has been triggered?
yes. But this was pretty ugly. See now.
pkg/kv/kvserver/closedts/sidetransport/closed_timestamp_receiver.go, line 212 at r2 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
This seems straightforward to unit test. We'll just need to mock out the
Stores
interface.
done
pkg/kv/kvserver/closedts/sidetransport/receiver.go, line 148 at r6 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
nit: spacing above
done
pkg/kv/kvserver/closedts/sidetransport/receiver.go, line 262 at r6 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
Why can't we do this under the read lock? I thought that was part of the point? It would be a shame if we needed to allocate the
infos
slice for ranges that aren't even local.Maybe this structure was built before the RWMutex split.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 2 of 15 files at r8, 11 of 11 files at r9, 2 of 2 files at r10.
Reviewable status:complete! 1 of 0 LGTMs obtained (waiting on @andreimatei)
pkg/kv/kvserver/closedts/sidetransport/sender_test.go, line 52 at r6 (raw file):
Previously, andreimatei (Andrei Matei) wrote…
Good idea. Will do separately.
👍
pkg/kv/kvserver/closedts/sidetransport/receiver.go, line 262 at r6 (raw file):
Previously, andreimatei (Andrei Matei) wrote…
done
I think we can still do a bit better. Now that this is two phases, let's merge the delete(r.mu.tracked, rangeID)
critical section with the main critical section in processUpdate
. So then it's one RLock and one Lock. Up to you whether you want to still keep this method for the RLock part (but either way, the if len(removed) > 0 {
check is 👍 )
cfde2bb
to
f06f982
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @andreimatei and @nvanbenschoten)
pkg/kv/kvserver/closedts/sidetransport/receiver.go, line 262 at r6 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
I think we can still do a bit better. Now that this is two phases, let's merge the
delete(r.mu.tracked, rangeID)
critical section with the main critical section inprocessUpdate
. So then it's one RLock and one Lock. Up to you whether you want to still keep this method for the RLock part (but either way, theif len(removed) > 0 {
check is 👍 )
see now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 3 of 16 files at r11, 11 of 11 files at r12, 2 of 2 files at r13.
Reviewable status:complete! 1 of 0 LGTMs obtained (waiting on @andreimatei)
pkg/kv/kvserver/closedts/sidetransport/receiver.go, line 262 at r6 (raw file):
Previously, andreimatei (Andrei Matei) wrote…
see now
🔥
Add the consumer of closed timestamps communicated by the side transport (i.e. the gRPC server for our new push-based streaming protocol). This side-transport consumer accumulates closed timestamps communicated to it by other nodes (the leaseholders of the respective ranges). Its state is queried whenever a range needs a higher closed timestamp than what it has locally in the Replica state, at which point the Replica's state is lazily updated. Release note: None Release justification: Needed for GLOBAL tables.
When the new one is enabled, the old one will now be disabled. No point in running with both of them. In order to disable the old mechanism, a new background task is added to periodically call r.handleClosedTimestampUpdate() on all replicas with rangefeeds - to advance the feed. Release note: None Release justification: Needed for GLOBAL tables.
Make a test send larger proto messages, to fill network buffers fasters. Makes a difference under race. Release note: None Release justification: Tests only
f06f982
to
3ff3d17
Compare
bors r+ |
Build succeeded: |
Congrats on the merge! |
Add the consumer of closed timestamps communicated by the side transport
(i.e. the gRPC server for our new push-based streaming protocol).
This side-transport consumer accumulates closed timestamps communicated
to it by other nodes (the leaseholders of the respective ranges). Its
state is queried whenever a range needs a higher closed timestamp than
what it has locally in the Replica state, at which point the Replica's
state is lazily updated.
Release note: None
Release justification: Needed for GLOBAL tables.