Skip to content

Commit

Permalink
internal: server deletes stream after receiving an RST_STREAM frame
Browse files Browse the repository at this point in the history
* Fixes established streams leak in the loopy writer.

RSTStreamFrames used to be ignored by the server transport, if a trailer had already been put into the transport's control buffer. If loopy writer couldn't write anything into a stream because of an error on the client side, then this trailer would never be sent. At that point, server would receive an RSTStreamFrame from client. But this RSTStreamFrame would be ignored because a trailer was already put into the control buffer. This would keep the stream open and in memory on the server side.

With this change, a cleanupStream item is put into the transport's control buffer, whenever an RSTStreamFrame is received by the server, even after a trailer has been put into the buffer.

* When client sends a header to initiate a stream just after sending an RST_STREAM, server gets these frames in the correct order.
When server receives the RST_STREAM, it marks the stream as done and defers the deletion of the stream to the loopy writer by putting a cleanupStream item into control buffer.
Then the server receives the header to initiate a stream. It acts on the header immediately and attempts to create the stream. But because the old stream is not deleted, it hits the number of streams limit and fails.
This commit solves this problem by letting server handle the deletion immediately after receiving the RST_STREAM.

* Refactors deleteStream method.

* Moves consts declarations into test function's body.
  • Loading branch information
Can Guler authored Feb 12, 2019
1 parent a402911 commit 32559e2
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 25 deletions.
65 changes: 40 additions & 25 deletions internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1004,45 +1004,60 @@ func (t *http2Server) Close() error {
return err
}

// deleteStream deletes the stream s from transport's active streams.
func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
t.mu.Lock()
if _, ok := t.activeStreams[s.id]; !ok {
t.mu.Unlock()
return
}

delete(t.activeStreams, s.id)
if len(t.activeStreams) == 0 {
t.idle = time.Now()
}
t.mu.Unlock()

if channelz.IsOn() {
if eosReceived {
atomic.AddInt64(&t.czData.streamsSucceeded, 1)
} else {
atomic.AddInt64(&t.czData.streamsFailed, 1)
}
}
}

// closeStream clears the footprint of a stream when the stream is not needed
// any more.
func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
if s.swapState(streamDone) == streamDone {
// If the stream was already done, return.
return
}
// In case stream sending and receiving are invoked in separate
// goroutines (e.g., bi-directional streaming), cancel needs to be
// called to interrupt the potential blocking on other goroutines.
s.cancel()

// Deletes the stream from active streams
t.deleteStream(s, eosReceived)

cleanup := &cleanupStream{
streamID: s.id,
rst: rst,
rstCode: rstCode,
onWrite: func() {
t.mu.Lock()
if t.activeStreams != nil {
delete(t.activeStreams, s.id)
if len(t.activeStreams) == 0 {
t.idle = time.Now()
}
}
t.mu.Unlock()
if channelz.IsOn() {
if eosReceived {
atomic.AddInt64(&t.czData.streamsSucceeded, 1)
} else {
atomic.AddInt64(&t.czData.streamsFailed, 1)
}
}
},
onWrite: func() {},
}
if hdr != nil {
hdr.cleanup = cleanup
t.controlBuf.put(hdr)
} else {

// No trailer. Puts cleanupFrame into transport's control buffer.
if hdr == nil {
t.controlBuf.put(cleanup)
return
}

// If the stream is already done, don't send the trailer.
if s.swapState(streamDone) == streamDone {
return
}

hdr.cleanup = cleanup
t.controlBuf.put(hdr)
}

func (t *http2Server) RemoteAddr() net.Addr {
Expand Down
5 changes: 5 additions & 0 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5154,6 +5154,7 @@ type stubServer struct {

// Customizable implementations of server handlers.
emptyCall func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error)
unaryCall func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error)
fullDuplexCall func(stream testpb.TestService_FullDuplexCallServer) error

// A client connected to this service the test may use. Created in Start().
Expand All @@ -5169,6 +5170,10 @@ func (ss *stubServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.
return ss.emptyCall(ctx, in)
}

func (ss *stubServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return ss.unaryCall(ctx, in)
}

func (ss *stubServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
return ss.fullDuplexCall(stream)
}
Expand Down
57 changes: 57 additions & 0 deletions test/stream_cleanup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package test

import (
"context"
"testing"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
testpb "google.golang.org/grpc/test/grpc_testing"
)

func (s) TestStreamCleanup(t *testing.T) {
const initialWindowSize uint = 70 * 1024 // Must be higher than default 64K, ignored otherwise
const bodySize uint = 2 * initialWindowSize // Something that is not going to fit in a single window
const callRecvMsgSize uint = 1 // The maximum message size the client can receive

ss := &stubServer{
unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{Payload: &testpb.Payload{
Body: make([]byte, bodySize),
}}, nil
},
emptyCall: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
}
if err := ss.Start([]grpc.ServerOption{grpc.MaxConcurrentStreams(1)}, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(callRecvMsgSize))), grpc.WithInitialWindowSize(int32(initialWindowSize))); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()

if _, err := ss.client.UnaryCall(context.Background(), &testpb.SimpleRequest{}); status.Code(err) != codes.ResourceExhausted {
t.Fatalf("should fail with ResourceExhausted, message's body size: %v, maximum message size the client can receive: %v", bodySize, callRecvMsgSize)
}
if _, err := ss.client.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
t.Fatalf("should succeed, err: %v", err)
}
}

0 comments on commit 32559e2

Please sign in to comment.