-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathreceiver.go
364 lines (327 loc) · 11.2 KB
/
receiver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package sidetransport
import (
"context"
"fmt"
"io"
"strings"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)
// Receiver is the gRPC server for the closed timestamp side-transport,
// receiving updates from remote nodes. It maintains the set of current
// streaming connections.
type Receiver struct {
log.AmbientContext
stop *stop.Stopper
stores Stores
testingKnobs receiverTestingKnobs
mu struct {
syncutil.RWMutex
conns map[roachpb.NodeID]*incomingStream
}
}
type receiverTestingKnobs map[roachpb.NodeID]incomingStreamTestingKnobs
var _ ctpb.SideTransportServer = &Receiver{}
// NewReceiver creates a Receiver, to be used as a gRPC server with
// ctpb.RegisterClosedTimestampSideTransportServer.
func NewReceiver(
nodeID *base.NodeIDContainer,
stop *stop.Stopper,
stores Stores,
testingKnobs receiverTestingKnobs,
) *Receiver {
r := &Receiver{
stop: stop,
stores: stores,
testingKnobs: testingKnobs,
}
r.AmbientContext.AddLogTag("n", nodeID)
r.mu.conns = make(map[roachpb.NodeID]*incomingStream)
return r
}
// PushUpdates is the streaming RPC handler.
func (s *Receiver) PushUpdates(stream ctpb.SideTransport_PushUpdatesServer) error {
// Create a steam to service this connection. The stream will call back into
// the Receiver through onFirstMsg to register itself once it finds out the
// sender's node id.
ctx := s.AnnotateCtx(stream.Context())
return newIncomingStream(s, s.stores).Run(ctx, s.stop, stream)
}
// GetClosedTimestamp returns the latest closed timestamp that the receiver
// knows for a particular range, together with the LAI needed to have applied in
// order to use this closed timestamp.
//
// leaseholderNode is the last known leaseholder for the range. For efficiency
// reasons, only the closed timestamp info received from that node is checked
// for closed timestamp info about this range.
func (s *Receiver) GetClosedTimestamp(
ctx context.Context, rangeID roachpb.RangeID, leaseholderNode roachpb.NodeID,
) (hlc.Timestamp, ctpb.LAI) {
s.mu.RLock()
conn, ok := s.mu.conns[leaseholderNode]
s.mu.RUnlock()
if !ok {
return hlc.Timestamp{}, 0
}
return conn.GetClosedTimestamp(ctx, rangeID)
}
// onFirstMsg is called when the first message on a stream is received. This is
// the point where the stream finds out what node it's receiving data from.
func (s *Receiver) onFirstMsg(ctx context.Context, r *incomingStream, nodeID roachpb.NodeID) error {
s.mu.Lock()
defer s.mu.Unlock()
log.VEventf(ctx, 2, "n%d opened a closed timestamps side-transport connection", nodeID)
// If we already have a connection from nodeID, we don't accept this one. The
// other one has to be zombie going away soon. The client is expected to retry
// to establish the new connection.
//
// We could figure out a way to signal the existing connection to terminate,
// but it doesn't seem worth it.
if _, ok := s.mu.conns[nodeID]; ok {
return errors.Errorf("connection from n%d already exists", nodeID)
}
s.mu.conns[nodeID] = r
r.testingKnobs = s.testingKnobs[nodeID]
return nil
}
// onRecvErr is called when one of the inbound streams errors out. The stream is
// removed from the Receiver's collection.
func (s *Receiver) onRecvErr(ctx context.Context, nodeID roachpb.NodeID, err error) {
s.mu.Lock()
defer s.mu.Unlock()
if err != io.EOF {
log.Warningf(ctx, "closed timestamps side-transport connection dropped from node: %d", nodeID)
} else {
log.VEventf(ctx, 2, "closed timestamps side-transport connection dropped from node: %d (%s)", nodeID, err)
}
if nodeID != 0 {
delete(s.mu.conns, nodeID)
}
}
// incomingStream represents an inbound connection to a node publishing closed
// timestamp information. It maintains the latest closed timestamps communicated
// by the sender node.
type incomingStream struct {
// The server that created this stream.
server *Receiver
stores Stores
testingKnobs incomingStreamTestingKnobs
// The node that's sending info on this stream.
nodeID roachpb.NodeID
mu struct {
syncutil.RWMutex
streamState
}
}
type incomingStreamTestingKnobs struct {
onFirstMsg chan struct{}
onRecvErr chan error
onMsg chan *ctpb.Update
}
// Stores is the interface of *Stores needed by incomingStream.
type Stores interface {
// ForwardSideTransportClosedTimestampForRange forwards the side-transport
// closed timestamp for the local replica(s) of the given range.
ForwardSideTransportClosedTimestampForRange(
ctx context.Context, rangeID roachpb.RangeID, closedTS hlc.Timestamp, lai ctpb.LAI)
}
func newIncomingStream(s *Receiver, stores Stores) *incomingStream {
r := &incomingStream{
server: s,
stores: stores,
}
return r
}
// GetClosedTimestamp returns the latest closed timestamp that the receiver
// knows for a particular range, together with the LAI needed to have applied in
// order to use this closed timestamp. Returns an empty timestamp if the stream
// does not have state for the range.
func (r *incomingStream) GetClosedTimestamp(
ctx context.Context, rangeID roachpb.RangeID,
) (hlc.Timestamp, ctpb.LAI) {
r.mu.RLock()
defer r.mu.RUnlock()
info, ok := r.mu.tracked[rangeID]
if !ok {
return hlc.Timestamp{}, 0
}
return r.mu.lastClosed[info.policy], info.lai
}
// processUpdate processes one update received on the stream, updating the local
// state.
func (r *incomingStream) processUpdate(ctx context.Context, msg *ctpb.Update) {
log.VEventf(ctx, 4, "received side-transport update: %v", msg)
if msg.NodeID == 0 {
log.Fatalf(ctx, "missing NodeID in message: %s", msg)
}
if msg.NodeID != r.nodeID {
log.Fatalf(ctx, "wrong NodeID; expected %d, got %d", r.nodeID, msg.NodeID)
}
r.handleRemovals(ctx, msg.Removed)
r.mu.Lock()
defer r.mu.Unlock()
// Reset all the state on snapshots.
if msg.Snapshot {
for i := range r.mu.lastClosed {
r.mu.lastClosed[i] = hlc.Timestamp{}
}
r.mu.tracked = make(map[roachpb.RangeID]trackedRange, len(r.mu.tracked))
} else if msg.SeqNum != r.mu.lastSeqNum+1 {
log.Fatalf(ctx, "expected closed timestamp side-transport message with sequence number "+
"%d, got %d", r.mu.lastSeqNum+1, msg.SeqNum)
}
r.mu.lastSeqNum = msg.SeqNum
for _, rng := range msg.AddedOrUpdated {
r.mu.tracked[rng.RangeID] = trackedRange{
lai: rng.LAI,
policy: rng.Policy,
}
}
for _, update := range msg.ClosedTimestamps {
r.mu.lastClosed[update.Policy] = update.ClosedTimestamp
}
}
// Handle the removed ranges. In order to not lose closed ts info, before we
// can remove a range from our tracking, we copy the info about its closed
// timestamp to the local replica(s). Note that it's important to do this
// before updating lastClosed below since, by definition, the closed
// timestamps in this message don't apply to the Removed ranges.
func (r *incomingStream) handleRemovals(ctx context.Context, removed []roachpb.RangeID) {
if len(removed) == 0 {
return
}
// We're going to remove in three distinct phases, because of locking
// concerns.
// Phase 1: Take the lock and read the data for the ranges we're about to
// remove.
// Phase 2: *Drop* the mutex and call into each Replica, telling it to hold on
// locally to the the info we're about to remove from the stream.
// We can't do this with the mutex locked because replicas call
// GetClosedTimestamp() independently, with r.mu held (=> deadlock).
// Phase 3: Take the lock again and remove the state.
// Phase 1.
infos := make([]trackedRange, len(removed))
r.mu.RLock()
for i, rangeID := range removed {
info, ok := r.mu.tracked[rangeID]
if !ok {
log.Fatalf(ctx, "attempting to unregister a missing range: r%d", rangeID)
}
infos[i] = info
}
r.mu.RUnlock()
// Phase 2. We're not holding the mutex.
for i, rangeID := range removed {
info := infos[i]
r.stores.ForwardSideTransportClosedTimestampForRange(
ctx, rangeID, r.mu.lastClosed[info.policy], info.lai)
}
// Phase 3.
r.mu.Lock()
for _, rangeID := range removed {
delete(r.mu.tracked, rangeID)
}
r.mu.Unlock()
}
// Run handles an incoming stream of closed timestamps.
func (r *incomingStream) Run(
ctx context.Context,
stopper *stop.Stopper,
// The gRPC stream with incoming messages.
stream ctpb.SideTransport_PushUpdatesServer,
) error {
// We have to do the stream processing on a separate goroutine because Recv()
// is blocking, with no way to interrupt it other than returning from the RPC
// handler (i.e. this Run function).
// The main goroutine remains in charge of listening for stopper quiescence.
streamDone := make(chan struct{})
if err := stopper.RunAsyncTask(ctx, "closedts side-transport server conn", func(ctx context.Context) {
// On exit, signal the other goroutine to terminate.
defer close(streamDone)
for {
msg, err := stream.Recv()
if err != nil {
if ch := r.testingKnobs.onRecvErr; ch != nil {
ch <- err
}
r.server.onRecvErr(ctx, r.nodeID, err)
return
}
if r.nodeID == 0 {
r.nodeID = msg.NodeID
if err := r.server.onFirstMsg(ctx, r, r.nodeID); err != nil {
log.Warningf(ctx, "%s", err.Error())
return
} else if ch := r.testingKnobs.onFirstMsg; ch != nil {
ch <- struct{}{}
}
if !msg.Snapshot {
log.Fatal(ctx, "expected the first message to be a snapshot")
}
}
r.processUpdate(ctx, msg)
if ch := r.testingKnobs.onMsg; ch != nil {
select {
case ch <- msg:
default:
}
}
}
}); err != nil {
return err
}
// Block until the client terminates (or there's another stream error) or
// the stopper signals us to bail.
select {
case <-streamDone:
case <-stopper.ShouldQuiesce():
}
// Returning causes a blocked stream.Recv() (if there still is one) to return.
return nil
}
func (r *incomingStream) String() string {
r.mu.Lock()
defer r.mu.Unlock()
var s strings.Builder
s.WriteString(fmt.Sprintf("n%d closed timestamps: ", r.nodeID))
now := timeutil.Now()
rangesByPoicy := make(map[roachpb.RangeClosedTimestampPolicy]*strings.Builder)
for pol, ts := range r.mu.lastClosed {
if pol != 0 {
s.WriteString(", ")
}
policy := roachpb.RangeClosedTimestampPolicy(pol)
s.WriteString(fmt.Sprintf("%s: %s (lead/lag: %s)", policy, ts, now.Sub(ts.GoTime())))
rangesByPoicy[policy] = &strings.Builder{}
}
s.WriteRune('\n')
for rid, info := range r.mu.tracked {
rangesByPoicy[info.policy].WriteString(fmt.Sprintf("%d, ", rid))
}
first := true
for policy, sb := range rangesByPoicy {
if !first {
s.WriteRune('\n')
} else {
first = false
}
s.WriteString(fmt.Sprintf("%s tracked: %s", policy, sb.String()))
}
return s.String()
}