diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 91e446fbe6c3..e26e28141e7c 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -549,7 +549,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea s.write(recvMsg{err: err}) close(s.done) // If headerChan isn't closed, then close it. - if atomic.SwapUint32(&s.headerDone, 1) == 0 { + if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) { close(s.headerChan) } @@ -713,7 +713,7 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2. s.write(recvMsg{err: err}) } // If headerChan isn't closed, then close it. - if atomic.SwapUint32(&s.headerDone, 1) == 0 { + if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) { s.noHeaders = true close(s.headerChan) } @@ -1142,26 +1142,24 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { } endStream := frame.StreamEnded() atomic.StoreUint32(&s.bytesReceived, 1) - initialHeader := atomic.SwapUint32(&s.headerDone, 1) == 0 + initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0 if !initialHeader && !endStream { - // As specified by RFC 7540, a HEADERS frame (and associated CONTINUATION frames) can only appear - // at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set. + // As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set. st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream") t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false) return } state := &decodeState{} - // Initialize isGRPC value to be !initialHeader, since if a gRPC ResponseHeader has been received - // which indicates peer speaking gRPC, we are in gRPC mode. + // Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode. state.data.isGRPC = !initialHeader if err := state.decodeHeader(frame); err != nil { t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream) return } - var isHeader bool + isHeader := false defer func() { if t.statsHandler != nil { if isHeader { @@ -1180,10 +1178,10 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { } }() - // If headers haven't been received yet. - if initialHeader { + // If headerChan hasn't been closed yet + if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) { if !endStream { - // Headers frame is ResponseHeader. + // HEADERS frame block carries a Response-Headers. isHeader = true // These values can be set without any synchronization because // stream goroutine will read it only after seeing a closed @@ -1192,14 +1190,17 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { if len(state.data.mdata) > 0 { s.header = state.data.mdata } - close(s.headerChan) - return + } else { + // HEADERS frame block carries a Trailers-Only. + s.noHeaders = true } - // Headers frame is Trailers-only. - s.noHeaders = true close(s.headerChan) } + if !endStream { + return + } + // if client received END_STREAM from server while stream was still active, send RST_STREAM rst := s.getState() == streamActive t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true) diff --git a/internal/transport/transport.go b/internal/transport/transport.go index 846147a64e50..4bf583efc290 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -204,8 +204,8 @@ type Stream struct { // is used to adjust flow control, if needed. requestRead func(int) - headerChan chan struct{} // closed to indicate the end of header metadata. - headerDone uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times. + headerChan chan struct{} // closed to indicate the end of header metadata. + headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times. // hdrMu protects header and trailer metadata on the server-side. hdrMu sync.Mutex diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index f2e0d48c0ffe..ba4174d3c50b 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -1717,6 +1717,24 @@ func TestInvalidHeaderField(t *testing.T) { server.stop() } +func TestHeaderChanClosedAfterReceivingAnInvalidHeader(t *testing.T) { + server, ct, cancel := setUp(t, 0, math.MaxUint32, invalidHeaderField) + defer cancel() + defer server.stop() + defer ct.Close() + s, err := ct.NewStream(context.Background(), &CallHdr{Host: "localhost", Method: "foo"}) + if err != nil { + t.Fatalf("failed to create the stream") + } + timer := time.NewTimer(time.Second) + defer timer.Stop() + select { + case <-s.headerChan: + case <-timer.C: + t.Errorf("s.headerChan: got open, want closed") + } +} + func TestIsReservedHeader(t *testing.T) { tests := []struct { h string