Skip to content

Commit 2e15d38

Browse files
committed
rename some inner types
1 parent 87694b9 commit 2e15d38

File tree

5 files changed

+33
-33
lines changed

5 files changed

+33
-33
lines changed

test/end2end_test.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -2361,7 +2361,7 @@ func testMetadataStreamingRPC(t *testing.T, e env) {
23612361
func TestServerStreaming(t *testing.T) {
23622362
serverRespSizes := [][]int{
23632363
[]int{27182, 8, 1828, 45904},
2364-
[]int{2000000, 2000000, 2000000, 2000000, 2000000, 2000000, 2000000, 2000000},
2364+
[]int{(1 << 21), (1 << 21), (1 << 21), (1 << 21), (1 << 21), (1 << 21), (1 << 21), (1 << 21)},
23652365
}
23662366
defer leakCheck(t)()
23672367
for _, s := range serverRespSizes {
@@ -2537,7 +2537,7 @@ func TestClientStreaming(t *testing.T) {
25372537
defer leakCheck(t)()
25382538
clientReqSizes := [][]int{
25392539
[]int{27182, 8, 1828, 45904},
2540-
[]int{20000000, 2000000, 2000000, 2000000, 2000000, 2000000, 2000000, 2000000},
2540+
[]int{(1 << 21), (1 << 21), (1 << 21), (1 << 21), (1 << 21), (1 << 21), (1 << 21), (1 << 21)},
25412541
}
25422542
for _, s := range clientReqSizes {
25432543
for _, e := range listTestEnv() {
@@ -2559,7 +2559,6 @@ func testClientStreaming(t *testing.T, e env, clientReqSizes []int) {
25592559

25602560
var sum int
25612561
for _, s := range clientReqSizes {
2562-
grpclog.Println("sending another message of size " + string(s))
25632562
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(s))
25642563
if err != nil {
25652564
t.Fatal(err)

transport/http2_client.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -300,13 +300,13 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
300300
// That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
301301
// So we use the original context here instead of creating a copy.
302302
s.ctx = ctx
303-
windowReader := func(n uint32) {
303+
consumeConnectionWindow := func(n uint32) {
304304
t.updateWindow(s, n)
305305
}
306-
streamWindowPreconsumer := func(n uint32) {
306+
preConsumeStreamWindow := func(n uint32) {
307307
t.preConsumeWindowSpace(s, n)
308308
}
309-
s.setClientStreamReader(windowReader, streamWindowPreconsumer)
309+
s.setClientStreamReader(consumeConnectionWindow, preConsumeStreamWindow)
310310
return s
311311
}
312312

transport/http2_server.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -296,13 +296,13 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
296296
}
297297
t.stats.HandleRPC(s.ctx, inHeader)
298298
}
299-
windowReader := func(n uint32) {
299+
consumeConnectionWindow := func(n uint32) {
300300
t.updateWindow(s, n)
301301
}
302-
streamWindowPreconsumer := func(n uint32) {
302+
preConsumeStreamWindow := func(n uint32) {
303303
t.preConsumeWindowSpace(s, n)
304304
}
305-
s.setServerStreamReader(windowReader, streamWindowPreconsumer)
305+
s.setServerStreamReader(consumeConnectionWindow, preConsumeStreamWindow)
306306
handle(s)
307307
return
308308
}

transport/transport.go

+21-21
Original file line numberDiff line numberDiff line change
@@ -221,33 +221,33 @@ type Stream struct {
221221
rstError http2.ErrCode
222222
}
223223

224-
func (s *Stream) setClientStreamReader(windowReader func(uint32), streamWindowPreConsumer func(uint32)) {
224+
func (s *Stream) setClientStreamReader(consumeConnectionWindow func(uint32), preConsumeStreamWindow func(uint32)) {
225225
if s.ctx == nil || s.goAway == nil || s.buf == nil {
226226
grpclog.Fatalf("Uninitialized stream. s.ctx == nil: %v; s.goAway == nil: %v; s.buf == nil: %v", s.ctx == nil, s.goAway == nil, s.buf == nil)
227227
}
228-
s.setStreamReader(windowReader, streamWindowPreConsumer)
228+
s.setStreamReader(consumeConnectionWindow, preConsumeStreamWindow)
229229
}
230230

231-
func (s *Stream) setServerStreamReader(windowReader func(uint32), streamWindowPreConsumer func(uint32)) {
231+
func (s *Stream) setServerStreamReader(consumeConnectionWindow func(uint32), preConsumeStreamWindow func(uint32)) {
232232
if s.ctx == nil || s.buf == nil {
233233
grpclog.Fatalf("Uninitialized stream. s.ctx == nil: %v; s.buf == nil: %v", s.ctx == nil, s.buf == nil)
234234
}
235235
if s.goAway != nil {
236236
grpclog.Fatalf("Unexpected initialization of s.goAway; got %v; want nil", s.goAway)
237237
}
238-
s.setStreamReader(windowReader, streamWindowPreConsumer)
238+
s.setStreamReader(consumeConnectionWindow, preConsumeStreamWindow)
239239
}
240240

241-
func (s *Stream) setStreamReader(windowReader func(uint32), streamWindowPreConsumer func(uint32)) {
242-
s.streamReader = &streamWindowUpdaterReader{
243-
streamWindowPreConsumer: streamWindowPreConsumer,
244-
twr: &transportWindowUpdaterReader{
241+
func (s *Stream) setStreamReader(consumeConnectionWindow func(uint32), preConsumeStreamWindow func(uint32)) {
242+
s.streamReader = &streamWindowPreconsumingReader{
243+
preConsumeStreamWindow: preConsumeStreamWindow,
244+
cwr: &connectionWindowConsumingReader{
245245
dec: &recvBufferReader{
246246
ctx: s.ctx,
247247
goAway: s.goAway,
248248
recv: s.buf,
249249
},
250-
windowReader: windowReader,
250+
consumeConnectionWindow: consumeConnectionWindow,
251251
},
252252
}
253253
}
@@ -273,39 +273,39 @@ func (s *Stream) ReadFull(p []byte) (n int, err error) {
273273
return io.ReadFull(s.streamReader, p)
274274
}
275275

276-
type streamWindowUpdaterReader struct {
276+
type streamWindowPreconsumingReader struct {
277277
// The handler to control the window update procedure for both this
278278
// particular stream and the associated transport.
279-
streamWindowPreConsumer func(uint32)
280-
twr *transportWindowUpdaterReader
281-
readFullErr error
279+
preConsumeStreamWindow func(uint32)
280+
cwr *connectionWindowConsumingReader
281+
readFullErr error
282282
}
283283

284-
func (r *streamWindowUpdaterReader) Read(p []byte) (n int, err error) {
284+
func (r *streamWindowPreconsumingReader) Read(p []byte) (n int, err error) {
285285
if r.readFullErr != nil {
286286
return 0, r.readFullErr
287287
}
288288
defer func() { r.readFullErr = err }()
289289
readAmount := min(maxSingleStreamWindowUpdate, uint32(len(p)))
290-
r.streamWindowPreConsumer(readAmount)
291-
return io.ReadFull(r.twr, p[:readAmount])
290+
r.preConsumeStreamWindow(readAmount)
291+
return io.ReadFull(r.cwr, p[:readAmount])
292292
}
293293

294-
type transportWindowUpdaterReader struct {
295-
dec io.Reader
296-
windowReader func(uint32)
294+
type connectionWindowConsumingReader struct {
295+
dec io.Reader
296+
consumeConnectionWindow func(uint32)
297297
}
298298

299299
// Read reads all the data available for this Stream from the transport and
300300
// passes them into the decoder, which converts them into a gRPC message stream.
301301
// The error is io.EOF when the stream is done or another non-nil error if
302302
// the stream broke.
303-
func (s *transportWindowUpdaterReader) Read(p []byte) (n int, err error) {
303+
func (s *connectionWindowConsumingReader) Read(p []byte) (n int, err error) {
304304
n, err = s.dec.Read(p)
305305
if err != nil {
306306
return
307307
}
308-
s.windowReader(uint32(n))
308+
s.consumeConnectionWindow(uint32(n))
309309
return
310310
}
311311

transport/transport_test.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -1112,7 +1112,7 @@ func TestClientWithMisbehavedServer(t *testing.T) {
11121112
t.Fatalf("Failed to write: %v", err)
11131113
}
11141114
// reflect to get the inner recvBufferReader, which reads without doing window updates
1115-
recvBufferReader := s.streamReader.(*io.LimitedReader).R.(*windowUpdatingReader).pr.dec
1115+
recvBufferReader := s.streamReader.(*streamWindowPreconsumingReader).cwr.dec
11161116
// Read without window update.
11171117
for {
11181118
p := make([]byte, http2MaxFrameLen)
@@ -1174,7 +1174,8 @@ func TestEncodingRequiredStatus(t *testing.T) {
11741174
}
11751175
p := make([]byte, http2MaxFrameLen)
11761176
// reflect to get the plain recvBufferReader from the stream's stream reader, which doesn't do window updates
1177-
if _, err := s.streamReader.(*io.LimitedReader).R.(*windowUpdatingReader).pr.dec.Read(p); err != io.EOF {
1177+
recvBufferReader := s.streamReader.(*streamWindowPreconsumingReader).cwr.dec
1178+
if _, err := recvBufferReader.Read(p); err != io.EOF {
11781179
t.Fatalf("Read got error %v, want %v", err, io.EOF)
11791180
}
11801181
if s.StatusCode() != encodingTestStatusCode || s.StatusDesc() != encodingTestStatusDesc {
@@ -1203,7 +1204,7 @@ func TestInvalidHeaderField(t *testing.T) {
12031204
}
12041205
p := make([]byte, http2MaxFrameLen)
12051206
// reflect to get the inner recvBufferReader, which reads without doing window updates
1206-
_, err = s.streamReader.(*io.LimitedReader).R.(*windowUpdatingReader).pr.dec.Read(p)
1207+
_, err = s.streamReader.(*streamWindowPreconsumingReader).cwr.dec.Read(p)
12071208
if se, ok := err.(StreamError); !ok || se.Code != codes.FailedPrecondition || !strings.Contains(err.Error(), expectedInvalidHeaderField) {
12081209
t.Fatalf("Read got error %v, want error with code %s and contains %q", err, codes.FailedPrecondition, expectedInvalidHeaderField)
12091210
}

0 commit comments

Comments
 (0)