-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathcontext_testutils.go
225 lines (204 loc) · 7.68 KB
/
context_testutils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package rpc
import (
"context"
"sync"
"time"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"google.golang.org/grpc"
)
// ContextTestingKnobs provides hooks to aid in testing the system. The testing
// knob functions are called at various points in the Context life cycle if they
// are non-nil.
type ContextTestingKnobs struct {
// StreamClient if non-nil will be called at dial time to provide
// the base stream interceptor for client connections.
// This function may return a nil interceptor to avoid injecting behavior
// for a given target and class.
//
// Note that this is not called for streaming RPCs using the
// internalClientAdapter - i.e. KV RPCs done against the local server.
StreamClientInterceptor func(target string, class ConnectionClass) grpc.StreamClientInterceptor
// UnaryClientInterceptor, if non-nil, will be called when invoking any
// unary RPC.
UnaryClientInterceptor func(target string, class ConnectionClass) grpc.UnaryClientInterceptor
// InjectedLatencyOracle if non-nil contains a map from target address
// (server.RPCServingAddr() of a remote node) to artificial latency in
// milliseconds to inject. Setting this will cause the server to pause for
// the given duration on every network write.
InjectedLatencyOracle InjectedLatencyOracle
// InjectedLatencyEnabled is used to turn on or off the InjectedLatencyOracle.
InjectedLatencyEnabled func() bool
// StorageClusterID initializes the Context's StorageClusterID container to
// this value if non-nil at construction time.
StorageClusterID *uuid.UUID
// NoLoopbackDialer, when set, indicates that a test does not care
// about the special loopback dial semantics.
// If this is left unset, the test is responsible for ensuring
// SetLoopbackDialer() has been called on the rpc.Context.
// (This is done automatically by server.Server/server.SQLServerWrapper.)
NoLoopbackDialer bool
}
// InjectedLatencyOracle is a testing mechanism used to inject artificial
// latency to an address.
type InjectedLatencyOracle interface {
GetLatency(addr string) time.Duration
}
// NewInsecureTestingContext creates an insecure rpc Context suitable for tests.
func NewInsecureTestingContext(
ctx context.Context, clock *hlc.Clock, stopper *stop.Stopper,
) *Context {
clusterID := uuid.MakeV4()
return NewInsecureTestingContextWithClusterID(ctx, clock, stopper, clusterID)
}
// NewInsecureTestingContextWithClusterID creates an insecure rpc Context
// suitable for tests. The context is given the provided storage cluster ID and
// will derive a logical cluster ID from it automatically.
func NewInsecureTestingContextWithClusterID(
ctx context.Context, clock *hlc.Clock, stopper *stop.Stopper, storageClusterID uuid.UUID,
) *Context {
return NewInsecureTestingContextWithKnobs(ctx,
clock, stopper, ContextTestingKnobs{
StorageClusterID: &storageClusterID,
})
}
// NewInsecureTestingContextWithKnobs creates an insecure rpc Context
// suitable for tests configured with the provided knobs.
func NewInsecureTestingContextWithKnobs(
ctx context.Context, clock *hlc.Clock, stopper *stop.Stopper, knobs ContextTestingKnobs,
) *Context {
opts := DefaultContextOptions()
opts.Insecure = true
opts.Clock = clock.WallClock()
opts.ToleratedOffset = clock.ToleratedOffset()
opts.Settings = cluster.MakeTestingClusterSettings()
opts.Knobs = knobs
opts.Stopper = stopper
return NewContext(ctx, opts)
}
// Embed the isPartitioned function into the stream and check it when we are
// sending or receiving a message.
type disablingClientStream struct {
grpc.ClientStream
partitionCheck func() error
}
func (d disablingClientStream) SendMsg(m interface{}) error {
if err := d.partitionCheck(); err != nil {
return err
}
return d.ClientStream.SendMsg(m)
}
func (d disablingClientStream) RecvMsg(m interface{}) error {
if err := d.partitionCheck(); err != nil {
return err
}
return d.ClientStream.RecvMsg(m)
}
// Partitioner is used to create partial partitions between nodes at the GRPC
// layer. It uses StreamInterceptors to fail requests to nodes that are not
// connected. Usage of it is something like the following:
//
// var p rpc.Partitioner
//
// for i := 0; i < numServers; i++ {
// knob := p.CreateTestingKnobs(id, partitions)
// }
//
// TestCluster.Start()
//
// for i := 0; i < numServers; i++ {
// p.RegisterNodeAddr()
// }
//
// p.EnablePartition(true)
// ... run operations
//
// TODO(baptist): This could be enhanced to allow dynamic partition injection.
type Partitioner struct {
partitionEnabled syncutil.AtomicBool
nodeAddrMap sync.Map
}
// EnablePartition will enable or disable the partition.
func (p *Partitioner) EnablePartition(enable bool) {
p.partitionEnabled.Set(enable)
}
// RegisterNodeAddr is called after the cluster is started, but before
// EnablePartition is called on every node to register the mapping from the
// address of the node to the NodeID.
func (p *Partitioner) RegisterNodeAddr(addr string, id roachpb.NodeID) {
if p.partitionEnabled.Get() {
panic("Can not register node addresses with a partition enabled")
}
p.nodeAddrMap.Store(addr, id)
}
// CreateTestingKnobs creates the testing knobs for this node.
// Specifically it will override both the Unary and Stream Interceptors to
// return errors if EnablePartition has been called.
func (p *Partitioner) CreateTestingKnobs(
id roachpb.NodeID, partition [][2]roachpb.NodeID,
) ContextTestingKnobs {
// Structure the partition list for indexed lookup. We are partitioned from
// the other node if we are found on either side of the pair.
partitionedServers := make(map[roachpb.NodeID]bool)
for _, p := range partition {
if p[0] == id {
partitionedServers[p[1]] = true
}
if p[1] == id {
partitionedServers[p[0]] = true
}
}
isPartitioned := func(addr string) error {
if !p.partitionEnabled.Get() {
return nil
}
id, ok := p.nodeAddrMap.Load(addr)
if !ok {
panic("address not mapped, call RegisterNodeAddr before enabling the partition" + addr)
}
if partitionedServers[id.(roachpb.NodeID)] {
return errors.Newf("partitioned from %s, n%d", addr, id)
}
return nil
}
unaryIntercept :=
func(target string, class ConnectionClass) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
if err := isPartitioned(target); err != nil {
return err
}
return invoker(ctx, method, req, reply, cc, opts...)
}
}
streamIntercept :=
func(target string, class ConnectionClass) grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
cs, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
return nil, err
}
return &disablingClientStream{
ClientStream: cs,
partitionCheck: func() error { return isPartitioned(target) },
}, nil
}
}
return ContextTestingKnobs{
UnaryClientInterceptor: unaryIntercept,
StreamClientInterceptor: streamIntercept,
}
}