From 9e922bde5896c990ff89f5deaeb7512c34bf7dd2 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Fri, 10 Feb 2017 11:41:20 -0800 Subject: [PATCH 1/7] Issue #1060 maximum number of streams on the client should be capped at 100 by default --- test/end2end_test.go | 39 +++++++++++++++++++++++++++++++++++++++ transport/control.go | 5 +++-- transport/http2_client.go | 2 +- 3 files changed, 43 insertions(+), 3 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 902bec420b67..0ba0256f83a3 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -2671,6 +2671,45 @@ func testExceedMaxStreamsLimit(t *testing.T, e env) { } } +const defaultMaxStreamsClient = 100 + +func TestClientExceedMaxStreamsLimit(t *testing.T) { + defer leakCheck(t)() + for _, e := range listTestEnv() { + testExceedMaxStreamsLimit(t, e) + } +} + +func testClientExceedMaxStreamsLimit(t *testing.T, e env) { + te := newTest(t, e) + te.declareLogNoise( + "http2Client.notifyError got notified that the client transport was broken", + "Conn.resetTransport failed to create client transport", + "grpc: the connection is closing", + ) + te.maxStream = 0 // Server allows infinite streams. The cap should be on client side. + te.startServer(&testServer{security: e.security}) + defer te.tearDown() + + cc := te.clientConn() + tc := testpb.NewTestServiceClient(cc) + + // Create as many streams as a client can. + for i := 0; i < defaultMaxStreamsClient; i++ { + if _, err := tc.StreamingInputCall(te.ctx); err != nil { + t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, ", tc, err) + } + } + + // Trying to create one more should timeout. + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + _, err := tc.StreamingInputCall(ctx) + if err == nil || grpc.Code(err) != codes.DeadlineExceeded { + t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded) + } +} + func TestStreamsQuotaRecovery(t *testing.T) { defer leakCheck(t)() for _, e := range listTestEnv() { diff --git a/transport/control.go b/transport/control.go index 2586cba469c6..33de7b60bbdc 100644 --- a/transport/control.go +++ b/transport/control.go @@ -44,8 +44,9 @@ const ( // The default value of flow control window size in HTTP2 spec. defaultWindowSize = 65535 // The initial window size for flow control. - initialWindowSize = defaultWindowSize // for an RPC - initialConnWindowSize = defaultWindowSize * 16 // for a connection + initialWindowSize = defaultWindowSize // for an RPC + initialConnWindowSize = defaultWindowSize * 16 // for a connection + defaultMaxStreamsClient = 100 ) // The following defines various control items which could flow through diff --git a/transport/http2_client.go b/transport/http2_client.go index 892f8ba675ab..a85552bec53e 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -208,7 +208,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( state: reachable, activeStreams: make(map[uint32]*Stream), creds: opts.PerRPCCredentials, - maxStreams: math.MaxInt32, + maxStreams: defaultMaxStreamsClient, streamSendQuota: defaultWindowSize, statsHandler: opts.StatsHandler, } From df4f24b125eb9876766d0c3acb30ede1c5700368 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Mon, 13 Feb 2017 14:24:31 -0800 Subject: [PATCH 2/7] 1. Initialize streamsQuota at object creation. 2. Defer adding back to streamsQuota pool in CloseStream --- test/end2end_test.go | 2 +- transport/http2_client.go | 49 ++++++++++--------------------------- transport/transport_test.go | 5 +++- 3 files changed, 18 insertions(+), 38 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 0ba0256f83a3..f79b91e07a5f 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -2676,7 +2676,7 @@ const defaultMaxStreamsClient = 100 func TestClientExceedMaxStreamsLimit(t *testing.T) { defer leakCheck(t)() for _, e := range listTestEnv() { - testExceedMaxStreamsLimit(t, e) + testClientExceedMaxStreamsLimit(t, e) } } diff --git a/transport/http2_client.go b/transport/http2_client.go index a85552bec53e..c9458fe1eec3 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -209,6 +209,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( activeStreams: make(map[uint32]*Stream), creds: opts.PerRPCCredentials, maxStreams: defaultMaxStreamsClient, + streamsQuota: newQuotaPool(defaultMaxStreamsClient), streamSendQuota: defaultWindowSize, statsHandler: opts.StatsHandler, } @@ -337,21 +338,18 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea t.mu.Unlock() return nil, ErrConnClosing } - checkStreamsQuota := t.streamsQuota != nil t.mu.Unlock() - if checkStreamsQuota { - sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire()) - if err != nil { - return nil, err - } - // Returns the quota balance back. - if sq > 1 { - t.streamsQuota.add(sq - 1) - } + sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire()) + if err != nil { + return nil, err + } + // Returns the quota balance back. + if sq > 1 { + t.streamsQuota.add(sq - 1) } if _, err := wait(ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil { // Return the quota back now because there is no stream returned to the caller. - if _, ok := err.(StreamError); ok && checkStreamsQuota { + if _, ok := err.(StreamError); ok { t.streamsQuota.add(1) } return nil, err @@ -359,9 +357,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea t.mu.Lock() if t.state == draining { t.mu.Unlock() - if checkStreamsQuota { - t.streamsQuota.add(1) - } + t.streamsQuota.add(1) // Need to make t writable again so that the rpc in flight can still proceed. t.writableChan <- 0 return nil, ErrStreamDrain @@ -374,16 +370,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea s.clientStatsCtx = userCtx t.activeStreams[s.id] = s - // This stream is not counted when applySetings(...) initialize t.streamsQuota. - // Reset t.streamsQuota to the right value. - var reset bool - if !checkStreamsQuota && t.streamsQuota != nil { - reset = true - } t.mu.Unlock() - if reset { - t.streamsQuota.add(-1) - } // HPACK encodes various headers. Note that once WriteField(...) is // called, the corresponding headers/continuation frame has to be sent @@ -491,15 +478,11 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea // CloseStream clears the footprint of a stream when the stream is not needed any more. // This must not be executed in reader's goroutine. func (t *http2Client) CloseStream(s *Stream, err error) { - var updateStreams bool t.mu.Lock() if t.activeStreams == nil { t.mu.Unlock() return } - if t.streamsQuota != nil { - updateStreams = true - } delete(t.activeStreams, s.id) if t.state == draining && len(t.activeStreams) == 0 { // The transport is draining and s is the last live stream on t. @@ -508,9 +491,9 @@ func (t *http2Client) CloseStream(s *Stream, err error) { return } t.mu.Unlock() - if updateStreams { + defer func() { t.streamsQuota.add(1) - } + }() s.mu.Lock() if q := s.fc.resetPendingData(); q > 0 { if n := t.fc.onRead(q); n > 0 { @@ -1043,16 +1026,10 @@ func (t *http2Client) applySettings(ss []http2.Setting) { s.Val = math.MaxInt32 } t.mu.Lock() - reset := t.streamsQuota != nil - if !reset { - t.streamsQuota = newQuotaPool(int(s.Val) - len(t.activeStreams)) - } ms := t.maxStreams t.maxStreams = int(s.Val) t.mu.Unlock() - if reset { - t.streamsQuota.add(int(s.Val) - ms) - } + t.streamsQuota.add(int(s.Val) - ms) case http2.SettingInitialWindowSize: t.mu.Lock() for _, stream := range t.activeStreams { diff --git a/transport/transport_test.go b/transport/transport_test.go index 1ca6eb1a6312..e91fc6ed9d1d 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -507,7 +507,10 @@ func TestMaxStreams(t *testing.T) { case <-cc.streamsQuota.acquire(): t.Fatalf("streamsQuota.acquire() becomes readable mistakenly.") default: - if cc.streamsQuota.quota != 0 { + cc.streamsQuota.mu.Lock() + quota := cc.streamsQuota.quota + cc.streamsQuota.mu.Unlock() + if quota != 0 { t.Fatalf("streamsQuota.quota got non-zero quota mistakenly.") } } From 888efe30a648ba513aaa6f7bbea2c3427aa7b685 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Mon, 13 Feb 2017 14:36:26 -0800 Subject: [PATCH 3/7] style update --- transport/http2_client.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/transport/http2_client.go b/transport/http2_client.go index c9458fe1eec3..59ac347b1130 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -491,9 +491,7 @@ func (t *http2Client) CloseStream(s *Stream, err error) { return } t.mu.Unlock() - defer func() { - t.streamsQuota.add(1) - }() + defer t.streamsQuota.add(1) s.mu.Lock() if q := s.fc.resetPendingData(); q > 0 { if n := t.fc.onRead(q); n > 0 { From 7f74821ff2c0b0601488d973dc9aaaa59d84c623 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Fri, 17 Feb 2017 15:00:07 -0800 Subject: [PATCH 4/7] experimental commit --- transport/http2_client.go | 11 ++++++++++- transport/transport.go | 5 ++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/transport/http2_client.go b/transport/http2_client.go index 59ac347b1130..781e8707a16c 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -491,8 +491,14 @@ func (t *http2Client) CloseStream(s *Stream, err error) { return } t.mu.Unlock() - defer t.streamsQuota.add(1) + var rstStream bool + defer func() { + if !rstStream { + t.streamsQuota.add(1) + } + }() s.mu.Lock() + rstStream = s.rstStream if q := s.fc.resetPendingData(); q > 0 { if n := t.fc.onRead(q); n > 0 { t.controlBuf.put(&windowUpdate{0, n}) @@ -509,6 +515,7 @@ func (t *http2Client) CloseStream(s *Stream, err error) { s.state = streamDone s.mu.Unlock() if se, ok := err.(StreamError); ok && se.Code != codes.DeadlineExceeded { + rstStream = true t.controlBuf.put(&resetStream{s.id, http2.ErrCodeCancel}) } } @@ -750,6 +757,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) { s.state = streamDone s.statusCode = codes.Internal s.statusDesc = err.Error() + s.rstStream = true close(s.done) s.mu.Unlock() s.write(recvMsg{err: io.EOF}) @@ -1060,6 +1068,7 @@ func (t *http2Client) controller() { t.framer.writeSettings(true, i.ss...) } case *resetStream: + t.streamsQuota.add(1) t.framer.writeRSTStream(true, i.streamID, i.code) case *flushIO: t.framer.flushWrite() diff --git a/transport/transport.go b/transport/transport.go index d465991823fb..06cc9c0f7cae 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -35,7 +35,7 @@ Package transport defines and implements message oriented communication channel to complete various transactions (e.g., an RPC). */ -package transport // import "google.golang.org/grpc/transport" +package transport // externally used as import "google.golang.org/grpc/transport" import ( "bytes" @@ -213,6 +213,9 @@ type Stream struct { // the status received from the server. statusCode codes.Code statusDesc string + // rstStream is a flag that is true when a RST stream frame + // is sent to the server signifying that this stream is closing. + rstStream bool } // RecvCompress returns the compression algorithm applied to the inbound From 0dd2b9613395b3a3b13bd2ec8d55b83beab1ec4a Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Wed, 22 Feb 2017 14:03:29 -0800 Subject: [PATCH 5/7] comment update --- transport/transport.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transport/transport.go b/transport/transport.go index 06cc9c0f7cae..a94337bbf0f9 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -35,7 +35,7 @@ Package transport defines and implements message oriented communication channel to complete various transactions (e.g., an RPC). */ -package transport // externally used as import "google.golang.org/grpc/transport" +package transport // import "google.golang.org/grpc/transport" import ( "bytes" From f28d4877539a166323fd5c23ffdf53939c337d17 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Tue, 28 Feb 2017 18:11:19 -0800 Subject: [PATCH 6/7] future-proofing --- transport/http2_client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/transport/http2_client.go b/transport/http2_client.go index 781e8707a16c..696bfa7b72a2 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -495,7 +495,9 @@ func (t *http2Client) CloseStream(s *Stream, err error) { defer func() { if !rstStream { t.streamsQuota.add(1) + return } + t.controlBuf.put(&resetStream{s.id, http2.ErrCodeCancel}) }() s.mu.Lock() rstStream = s.rstStream @@ -516,7 +518,6 @@ func (t *http2Client) CloseStream(s *Stream, err error) { s.mu.Unlock() if se, ok := err.(StreamError); ok && se.Code != codes.DeadlineExceeded { rstStream = true - t.controlBuf.put(&resetStream{s.id, http2.ErrCodeCancel}) } } @@ -761,7 +762,6 @@ func (t *http2Client) handleData(f *http2.DataFrame) { close(s.done) s.mu.Unlock() s.write(recvMsg{err: io.EOF}) - t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl}) return } s.mu.Unlock() From 0dc1a7dd670bfa6ce92934bd83aadaa1232f9b12 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Wed, 1 Mar 2017 11:03:46 -0800 Subject: [PATCH 7/7] post review updates --- test/end2end_test.go | 11 +++++++---- transport/http2_client.go | 14 ++++++++++++++ transport/transport.go | 4 ++-- 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index f79b91e07a5f..199b0cf87737 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -2673,21 +2673,24 @@ func testExceedMaxStreamsLimit(t *testing.T, e env) { const defaultMaxStreamsClient = 100 -func TestClientExceedMaxStreamsLimit(t *testing.T) { +func TestExceedDefaultMaxStreamsLimit(t *testing.T) { defer leakCheck(t)() for _, e := range listTestEnv() { - testClientExceedMaxStreamsLimit(t, e) + testExceedDefaultMaxStreamsLimit(t, e) } } -func testClientExceedMaxStreamsLimit(t *testing.T, e env) { +func testExceedDefaultMaxStreamsLimit(t *testing.T, e env) { te := newTest(t, e) te.declareLogNoise( "http2Client.notifyError got notified that the client transport was broken", "Conn.resetTransport failed to create client transport", "grpc: the connection is closing", ) - te.maxStream = 0 // Server allows infinite streams. The cap should be on client side. + // When masStream is set to 0 the server doesn't send a settings frame for + // MaxConcurrentStreams, essentially allowing infinite (math.MaxInt32) streams. + // In such a case, there should be a default cap on the client-side. + te.maxStream = 0 te.startServer(&testServer{security: e.security}) defer te.tearDown() diff --git a/transport/http2_client.go b/transport/http2_client.go index 696bfa7b72a2..001522bdb556 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -491,8 +491,17 @@ func (t *http2Client) CloseStream(s *Stream, err error) { return } t.mu.Unlock() + // rstStream is true in case the stream is being closed at the client-side + // and the server needs to be intimated about it by sending a RST_STREAM + // frame. + // To make sure this frame is written to the wire before the headers of the + // next stream waiting for streamsQuota, we add to streamsQuota pool only + // after having acquired the writableChan to send RST_STREAM out (look at + // the controller() routine). var rstStream bool defer func() { + // In case, the client doesn't have to send RST_STREAM to server + // we can safely add back to streamsQuota pool now. if !rstStream { t.streamsQuota.add(1) return @@ -1068,6 +1077,11 @@ func (t *http2Client) controller() { t.framer.writeSettings(true, i.ss...) } case *resetStream: + // If the server needs to be to intimated about stream closing, + // then we need to make sure the RST_STREAM frame is written to + // the wire before the headers of the next stream waiting on + // streamQuota. We ensure this by adding to the streamsQuota pool + // only after having acquired the writableChan to send RST_STREAM. t.streamsQuota.add(1) t.framer.writeRSTStream(true, i.streamID, i.code) case *flushIO: diff --git a/transport/transport.go b/transport/transport.go index a94337bbf0f9..34c4dbc90ff5 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -213,8 +213,8 @@ type Stream struct { // the status received from the server. statusCode codes.Code statusDesc string - // rstStream is a flag that is true when a RST stream frame - // is sent to the server signifying that this stream is closing. + // rstStream indicates whether a RST_STREAM frame needs to be sent + // to the server to signify that this stream is closing. rstStream bool }