Skip to content

Commit ca0352d

Browse files
committed
rename inner types again
1 parent f659e1f commit ca0352d

File tree

5 files changed

+55
-57
lines changed

5 files changed

+55
-57
lines changed

transport/control.go

+18-20
Original file line numberDiff line numberDiff line change
@@ -175,18 +175,19 @@ type inFlow struct {
175175
// window update for them. Used to reduce window update frequency.
176176
pendingUpdate uint32
177177

178-
// Pre-consumed window space. This is temporary extra space in the incoming flow control
179-
// window that is ok to for incoming data to fill up.
180-
preConsumedWindowSpace uint32
178+
// This is temporary space in the incoming flow control that can be granted at convenient times
179+
// to prevent the sender from stalling for lack flow control space.
180+
// If present, it is paid back when data is consumed from the window.
181+
loanedWindowSpace uint32
181182
}
182183

183184
// onData is invoked when some data frame is received. It updates pendingData.
184185
func (f *inFlow) onData(n uint32) error {
185186
f.mu.Lock()
186187
defer f.mu.Unlock()
187188
f.pendingData += n
188-
if f.pendingData+f.pendingUpdate > f.limit+f.preConsumedWindowSpace {
189-
return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate, f.limit+f.preConsumedWindowSpace)
189+
if f.pendingData+f.pendingUpdate > f.limit+f.loanedWindowSpace {
190+
return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate, f.limit+f.loanedWindowSpace)
190191
}
191192
return nil
192193
}
@@ -207,12 +208,11 @@ func (f *inFlow) onRead(n uint32) uint32 {
207208
grpclog.Fatalf("potential window update too large. onRead(n) where n is %v; max n is %v", f.pendingUpdate, http2MaxWindowUpdate)
208209
}
209210
f.pendingData -= n
210-
// add to "pendingUpdate" only if all data in "preConsumedWindowSpace" has been "consumed"
211-
if f.preConsumedWindowSpace > 0 {
212-
p := min(n, f.preConsumedWindowSpace)
213-
f.preConsumedWindowSpace -= p
214-
n -= p
215-
}
211+
// first use up remaining "loanedWindowSpace", add remaining Read to "pendingUpdate"
212+
windowSpaceDebtPayment := min(n, f.loanedWindowSpace)
213+
f.loanedWindowSpace -= windowSpaceDebtPayment
214+
n -= windowSpaceDebtPayment
215+
216216
f.pendingUpdate += n
217217
if f.pendingUpdate >= f.limit/4 {
218218
wu := f.pendingUpdate
@@ -222,18 +222,16 @@ func (f *inFlow) onRead(n uint32) uint32 {
222222
return 0
223223
}
224224

225-
func (f *inFlow) preConsumeWindowSpace(n uint32) uint32 {
225+
func (f *inFlow) loanWindowSpace(n uint32) uint32 {
226226
f.mu.Lock()
227227
defer f.mu.Unlock()
228-
if f.preConsumedWindowSpace > 0 {
229-
grpclog.Fatalf("pre-consuming window space while there is pre-consumed window space stil outstanding")
228+
if f.loanedWindowSpace > 0 {
229+
grpclog.Fatalf("pre-consuming window space while there is pre-consumed window space still outstanding")
230230
}
231-
f.preConsumedWindowSpace = n
232-
if f.preConsumedWindowSpace+f.pendingData > http2MaxWindowUpdate {
233-
grpclog.Fatalf("potential window of %v update too large. http2 max window update is %v", f.preConsumedWindowSpace+f.pendingData, http2MaxWindowUpdate)
234-
}
235-
if f.preConsumedWindowSpace+f.pendingUpdate >= f.limit/4 {
236-
wu := f.pendingUpdate + f.preConsumedWindowSpace
231+
f.loanedWindowSpace = n
232+
233+
if f.loanedWindowSpace+f.pendingUpdate >= f.limit/4 {
234+
wu := f.pendingUpdate + f.loanedWindowSpace
237235
f.pendingUpdate = 0
238236
return wu
239237
}

transport/http2_client.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -300,13 +300,13 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
300300
// That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
301301
// So we use the original context here instead of creating a copy.
302302
s.ctx = ctx
303-
consumeConnectionWindow := func(n uint32) {
303+
consumeConnAndStreamWindows := func(n uint32) {
304304
t.updateWindow(s, n)
305305
}
306-
preConsumeStreamWindow := func(n uint32) {
307-
t.preConsumeWindowSpace(s, n)
306+
loanSpaceInStreamWindow := func(n uint32) {
307+
t.loanSpaceInStreamWindow(s, n)
308308
}
309-
s.setClientStreamReader(consumeConnectionWindow, preConsumeStreamWindow)
309+
s.setClientStreamReader(consumeConnAndStreamWindows, loanSpaceInStreamWindow)
310310
return s
311311
}
312312

@@ -771,13 +771,13 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
771771
}
772772
}
773773

774-
func (t *http2Client) preConsumeWindowSpace(s *Stream, n uint32) {
774+
func (t *http2Client) loanSpaceInStreamWindow(s *Stream, n uint32) {
775775
s.mu.Lock()
776776
defer s.mu.Unlock()
777777
if s.state == streamDone {
778778
return
779779
}
780-
if w := s.fc.preConsumeWindowSpace(n); w > 0 {
780+
if w := s.fc.loanWindowSpace(n); w > 0 {
781781
t.controlBuf.put(&windowUpdate{s.id, w})
782782
}
783783
}

transport/http2_server.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -296,13 +296,13 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
296296
}
297297
t.stats.HandleRPC(s.ctx, inHeader)
298298
}
299-
consumeConnectionWindow := func(n uint32) {
299+
consumeConnAndStreamWindows := func(n uint32) {
300300
t.updateWindow(s, n)
301301
}
302-
preConsumeStreamWindow := func(n uint32) {
303-
t.preConsumeWindowSpace(s, n)
302+
loanSpaceInStreamWindow := func(n uint32) {
303+
t.loanSpaceInStreamWindow(s, n)
304304
}
305-
s.setServerStreamReader(consumeConnectionWindow, preConsumeStreamWindow)
305+
s.setServerStreamReader(consumeConnAndStreamWindows, loanSpaceInStreamWindow)
306306
handle(s)
307307
return
308308
}
@@ -421,13 +421,13 @@ func (t *http2Server) updateWindow(s *Stream, n uint32) {
421421
}
422422
}
423423

424-
func (t *http2Server) preConsumeWindowSpace(s *Stream, n uint32) {
424+
func (t *http2Server) loanSpaceInStreamWindow(s *Stream, n uint32) {
425425
s.mu.Lock()
426426
defer s.mu.Unlock()
427427
if s.state == streamDone {
428428
return
429429
}
430-
if w := s.fc.preConsumeWindowSpace(n); w > 0 {
430+
if w := s.fc.loanWindowSpace(n); w > 0 {
431431
t.controlBuf.put(&windowUpdate{s.id, w})
432432
}
433433
}

transport/transport.go

+22-22
Original file line numberDiff line numberDiff line change
@@ -221,33 +221,33 @@ type Stream struct {
221221
rstError http2.ErrCode
222222
}
223223

224-
func (s *Stream) setClientStreamReader(consumeConnectionWindow func(uint32), preConsumeStreamWindow func(uint32)) {
224+
func (s *Stream) setClientStreamReader(consumeConnAndStreamWindows func(uint32), loanSpaceInStreamWindow func(uint32)) {
225225
if s.ctx == nil || s.goAway == nil || s.buf == nil {
226226
grpclog.Fatalf("Uninitialized stream. s.ctx == nil: %v; s.goAway == nil: %v; s.buf == nil: %v", s.ctx == nil, s.goAway == nil, s.buf == nil)
227227
}
228-
s.setStreamReader(consumeConnectionWindow, preConsumeStreamWindow)
228+
s.setStreamReader(consumeConnAndStreamWindows, loanSpaceInStreamWindow)
229229
}
230230

231-
func (s *Stream) setServerStreamReader(consumeConnectionWindow func(uint32), preConsumeStreamWindow func(uint32)) {
231+
func (s *Stream) setServerStreamReader(consumeConnAndStreamWindows func(uint32), loanSpaceInStreamWindow func(uint32)) {
232232
if s.ctx == nil || s.buf == nil {
233233
grpclog.Fatalf("Uninitialized stream. s.ctx == nil: %v; s.buf == nil: %v", s.ctx == nil, s.buf == nil)
234234
}
235235
if s.goAway != nil {
236236
grpclog.Fatalf("Unexpected initialization of s.goAway; got %v; want nil", s.goAway)
237237
}
238-
s.setStreamReader(consumeConnectionWindow, preConsumeStreamWindow)
238+
s.setStreamReader(consumeConnAndStreamWindows, loanSpaceInStreamWindow)
239239
}
240240

241-
func (s *Stream) setStreamReader(consumeConnectionWindow func(uint32), preConsumeStreamWindow func(uint32)) {
242-
s.streamReader = &streamWindowPreconsumingReader{
243-
preConsumeStreamWindow: preConsumeStreamWindow,
244-
cwr: &connectionWindowConsumingReader{
241+
func (s *Stream) setStreamReader(consumeConnAndStreamWindows func(uint32), loanSpaceInStreamWindow func(uint32)) {
242+
s.streamReader = &streamWindowSpaceLoaningReader{
243+
loanSpaceInStreamWindow: loanSpaceInStreamWindow,
244+
wr: &connAndStreamWindowConsumingReader{
245245
dec: &recvBufferReader{
246246
ctx: s.ctx,
247247
goAway: s.goAway,
248248
recv: s.buf,
249249
},
250-
consumeConnectionWindow: consumeConnectionWindow,
250+
consumeConnAndStreamWindows: consumeConnAndStreamWindows,
251251
},
252252
}
253253
}
@@ -273,39 +273,39 @@ func (s *Stream) ReadFull(p []byte) (n int, err error) {
273273
return io.ReadFull(s.streamReader, p)
274274
}
275275

276-
type streamWindowPreconsumingReader struct {
276+
type streamWindowSpaceLoaningReader struct {
277277
// The handler to control the window update procedure for both this
278278
// particular stream and the associated transport.
279-
preConsumeStreamWindow func(uint32)
280-
cwr *connectionWindowConsumingReader
281-
readFullErr error
279+
loanSpaceInStreamWindow func(uint32)
280+
wr *connAndStreamWindowConsumingReader
281+
readFullErr error
282282
}
283283

284-
func (r *streamWindowPreconsumingReader) Read(p []byte) (n int, err error) {
284+
func (r *streamWindowSpaceLoaningReader) Read(p []byte) (n int, err error) {
285285
if r.readFullErr != nil {
286286
return 0, r.readFullErr
287287
}
288288
defer func() { r.readFullErr = err }()
289-
readAmount := min(maxSingleStreamWindowUpdate, uint32(len(p)))
290-
r.preConsumeStreamWindow(readAmount)
291-
return io.ReadFull(r.cwr, p[:readAmount])
289+
committedReadAmount := min(maxSingleStreamWindowUpdate, uint32(len(p)))
290+
r.loanSpaceInStreamWindow(committedReadAmount)
291+
return io.ReadFull(r.wr, p[:committedReadAmount])
292292
}
293293

294-
type connectionWindowConsumingReader struct {
295-
dec io.Reader
296-
consumeConnectionWindow func(uint32)
294+
type connAndStreamWindowConsumingReader struct {
295+
dec io.Reader
296+
consumeConnAndStreamWindows func(uint32)
297297
}
298298

299299
// Read reads all the data available for this Stream from the transport and
300300
// passes them into the decoder, which converts them into a gRPC message stream.
301301
// The error is io.EOF when the stream is done or another non-nil error if
302302
// the stream broke.
303-
func (s *connectionWindowConsumingReader) Read(p []byte) (n int, err error) {
303+
func (s *connAndStreamWindowConsumingReader) Read(p []byte) (n int, err error) {
304304
n, err = s.dec.Read(p)
305305
if err != nil {
306306
return
307307
}
308-
s.consumeConnectionWindow(uint32(n))
308+
s.consumeConnAndStreamWindows(uint32(n))
309309
return
310310
}
311311

transport/transport_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -1112,7 +1112,7 @@ func TestClientWithMisbehavedServer(t *testing.T) {
11121112
t.Fatalf("Failed to write: %v", err)
11131113
}
11141114
// reflect to get the inner recvBufferReader, which reads without doing window updates
1115-
recvBufferReader := s.streamReader.(*streamWindowPreconsumingReader).cwr.dec
1115+
recvBufferReader := s.streamReader.(*streamWindowSpaceLoaningReader).wr.dec
11161116
// Read without window update.
11171117
for {
11181118
p := make([]byte, http2MaxFrameLen)
@@ -1174,7 +1174,7 @@ func TestEncodingRequiredStatus(t *testing.T) {
11741174
}
11751175
p := make([]byte, http2MaxFrameLen)
11761176
// reflect to get the plain recvBufferReader from the stream's stream reader, which doesn't do window updates
1177-
recvBufferReader := s.streamReader.(*streamWindowPreconsumingReader).cwr.dec
1177+
recvBufferReader := s.streamReader.(*streamWindowSpaceLoaningReader).wr.dec
11781178
if _, err := recvBufferReader.Read(p); err != io.EOF {
11791179
t.Fatalf("Read got error %v, want %v", err, io.EOF)
11801180
}
@@ -1204,7 +1204,7 @@ func TestInvalidHeaderField(t *testing.T) {
12041204
}
12051205
p := make([]byte, http2MaxFrameLen)
12061206
// reflect to get the inner recvBufferReader, which reads without doing window updates
1207-
_, err = s.streamReader.(*streamWindowPreconsumingReader).cwr.dec.Read(p)
1207+
_, err = s.streamReader.(*streamWindowSpaceLoaningReader).wr.dec.Read(p)
12081208
if se, ok := err.(StreamError); !ok || se.Code != codes.FailedPrecondition || !strings.Contains(err.Error(), expectedInvalidHeaderField) {
12091209
t.Fatalf("Read got error %v, want error with code %s and contains %q", err, codes.FailedPrecondition, expectedInvalidHeaderField)
12101210
}

0 commit comments

Comments
 (0)