diff --git a/transport/control.go b/transport/control.go index 3db471a08085..beff034c83da 100644 --- a/transport/control.go +++ b/transport/control.go @@ -68,6 +68,7 @@ const ( type windowUpdate struct { streamID uint32 increment uint32 + flush bool } func (*windowUpdate) item() {} @@ -240,3 +241,11 @@ func (f *inFlow) onRead(n uint32) uint32 { } return 0 } + +func (f *inFlow) resetPendingUpdate() uint32 { + f.mu.Lock() + defer f.mu.Unlock() + n := f.pendingUpdate + f.pendingUpdate = 0 + return n +} diff --git a/transport/http2_client.go b/transport/http2_client.go index 27a72a0e5566..92b5180e7f9c 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -813,7 +813,11 @@ func (t *http2Client) adjustWindow(s *Stream, n uint32) { return } if w := s.fc.maybeAdjust(n); w > 0 { - t.controlBuf.put(&windowUpdate{s.id, w}) + // Piggyback conneciton's window update along. + if cw := t.fc.resetPendingUpdate(); cw > 0 { + t.controlBuf.put(&windowUpdate{0, cw, false}) + } + t.controlBuf.put(&windowUpdate{s.id, w, true}) } } @@ -827,7 +831,10 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) { return } if w := s.fc.onRead(n); w > 0 { - t.controlBuf.put(&windowUpdate{s.id, w}) + if cw := t.fc.resetPendingUpdate(); cw > 0 { + t.controlBuf.put(&windowUpdate{0, cw, false}) + } + t.controlBuf.put(&windowUpdate{s.id, w, true}) } } @@ -846,7 +853,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) { // active(fast) streams from starving in presence of slow or // inactive streams. if w := t.fc.onRead(uint32(size)); w > 0 { - t.controlBuf.put(&windowUpdate{0, w}) + t.controlBuf.put(&windowUpdate{0, w, true}) } // Select the right stream to dispatch. s, ok := t.getStream(f) @@ -869,7 +876,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) { } if f.Header().Flags.Has(http2.FlagDataPadded) { if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 { - t.controlBuf.put(&windowUpdate{s.id, w}) + t.controlBuf.put(&windowUpdate{s.id, w, true}) } } s.mu.Unlock() @@ -1185,7 +1192,7 @@ func (t *http2Client) controller() { case <-t.writableChan: switch i := i.(type) { case *windowUpdate: - t.framer.writeWindowUpdate(true, i.streamID, i.increment) + t.framer.writeWindowUpdate(i.flush, i.streamID, i.increment) case *settings: if i.ack { t.framer.writeSettingsAck(true) diff --git a/transport/http2_server.go b/transport/http2_server.go index 85590d2086c8..3a18f30798c9 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -449,7 +449,10 @@ func (t *http2Server) adjustWindow(s *Stream, n uint32) { return } if w := s.fc.maybeAdjust(n); w > 0 { - t.controlBuf.put(&windowUpdate{s.id, w}) + if cw := t.fc.resetPendingUpdate(); cw > 0 { + t.controlBuf.put(&windowUpdate{0, cw, false}) + } + t.controlBuf.put(&windowUpdate{s.id, w, true}) } } @@ -463,7 +466,10 @@ func (t *http2Server) updateWindow(s *Stream, n uint32) { return } if w := s.fc.onRead(n); w > 0 { - t.controlBuf.put(&windowUpdate{s.id, w}) + if cw := t.fc.resetPendingUpdate(); cw > 0 { + t.controlBuf.put(&windowUpdate{0, cw, false}) + } + t.controlBuf.put(&windowUpdate{s.id, w, true}) } } @@ -483,7 +489,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) { // active(fast) streams from starving in presence of slow or // inactive streams. if w := t.fc.onRead(uint32(size)); w > 0 { - t.controlBuf.put(&windowUpdate{0, w}) + t.controlBuf.put(&windowUpdate{0, w, true}) } // Select the right stream to dispatch. s, ok := t.getStream(f) @@ -504,7 +510,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) { } if f.Header().Flags.Has(http2.FlagDataPadded) { if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 { - t.controlBuf.put(&windowUpdate{s.id, w}) + t.controlBuf.put(&windowUpdate{s.id, w, true}) } } s.mu.Unlock() @@ -979,7 +985,7 @@ func (t *http2Server) controller() { case <-t.writableChan: switch i := i.(type) { case *windowUpdate: - t.framer.writeWindowUpdate(true, i.streamID, i.increment) + t.framer.writeWindowUpdate(i.flush, i.streamID, i.increment) case *settings: if i.ack { t.framer.writeSettingsAck(true)