From 2a19115a964843d231e6aa931f974d5ff16db288 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 19 Apr 2019 11:27:13 -0700 Subject: [PATCH 1/6] Align the keepalive implementation with proposal A8. This commit makes the following changes: * Keep track of the time of the last read in the transport. * Use this in the keepalive implementation to decide when to send out keepalives. * Address the issue of keepalives being sent every [Time+Timeout] period instead of every [Time] period, as mandated by proposal A8. * Makes many of the transport tests to run in parallel (as most of them spend a lot of time just sleeping, waiting for things to happen). Proposal A8 is here: https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md This commit addresses: https://github.com/grpc/grpc-go/issues/2638 --- internal/transport/http2_client.go | 149 +++++++++++++++++++-------- internal/transport/transport_test.go | 58 ++++++++--- 2 files changed, 147 insertions(+), 60 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 9dee6db61d9d..b094e114df93 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -44,6 +44,11 @@ import ( "google.golang.org/grpc/status" ) +// Keepalive goroutine runs a ticker at this frequency to perform keepalive +// related tasks. Based on proposal A8, at least second precision is expected. +// Tests could set this to a lower value for convenience. +var defaultKeepaliveTickerDuration = 500 * time.Millisecond + // http2Client implements the ClientTransport interface with HTTP2. type http2Client struct { ctx context.Context @@ -77,11 +82,11 @@ type http2Client struct { perRPCCreds []credentials.PerRPCCredentials - // Boolean to keep track of reading activity on transport. - // 1 is true and 0 is false. - activity uint32 // Accessed atomically. kp keepalive.ClientParameters keepaliveEnabled bool + // Channel to keep track of read activity on the transport. + activityCh chan struct{} + lastReadSec int64 // Accessed atomically. statsHandler stats.Handler @@ -249,6 +254,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne onGoAway: onGoAway, onClose: onClose, keepaliveEnabled: keepaliveEnabled, + activityCh: make(chan struct{}, 1), } t.controlBuf = newControlBuffer(t.ctxDone) if opts.InitialWindowSize >= defaultWindowSize { @@ -1221,7 +1227,7 @@ func (t *http2Client) reader() { } t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!) if t.keepaliveEnabled { - atomic.CompareAndSwapUint32(&t.activity, 0, 1) + t.activityCh <- struct{}{} } sf, ok := frame.(*http2.SettingsFrame) if !ok { @@ -1235,7 +1241,7 @@ func (t *http2Client) reader() { for { frame, err := t.framer.fr.ReadFrame() if t.keepaliveEnabled { - atomic.CompareAndSwapUint32(&t.activity, 0, 1) + t.activityCh <- struct{}{} } if err != nil { // Abort an active stream if the http2.Framer returns a @@ -1279,59 +1285,112 @@ func (t *http2Client) reader() { } } -// keepalive running in a separate goroutune makes sure the connection is alive by sending pings. +// activityMonitory reads from the activity channel (which is written to, when +// there is a read), and updates the lastReadSec atomic. +func (t *http2Client) activityMonitor() { + for { + select { + case <-t.activityCh: + atomic.StoreInt64(&t.lastReadSec, time.Now().Unix()) + case <-t.ctx.Done(): + return + } + } +} + +// pinger sends out a PING frame, and waits for one of the following events to +// happen (kp.Timeout expires, top-level context is done, or the done channel +// is written to (this usually happens when there is read activity). +func (t *http2Client) pinger(send bool, done chan struct{}) { + if send { + if channelz.IsOn() { + atomic.AddInt64(&t.czData.kpCount, 1) + } + t.controlBuf.put(&ping{data: [8]byte{}}) + } + + timer := time.NewTimer(t.kp.Timeout) + select { + case <-timer.C: + t.Close() + return + case <-t.ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return + case <-done: + if !timer.Stop() { + <-timer.C + } + return + } +} + +// keepalive running in a separate goroutune makes sure the connection is alive +// by sending pings. func (t *http2Client) keepalive() { - p := &ping{data: [8]byte{}} - timer := time.NewTimer(t.kp.Time) + go t.activityMonitor() + + ticker := time.NewTicker(defaultKeepaliveTickerDuration) + defer ticker.Stop() + + var done chan struct{} for { select { - case <-timer.C: - if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { - timer.Reset(t.kp.Time) + case <-ticker.C: + // If the amount of time elapsed since the last read is less than kp.Time + // and we have an active pinger routine, we need to cancel it. + lr := time.Since(time.Unix(atomic.LoadInt64(&t.lastReadSec), 0)) + if lr < t.kp.Time { + if done != nil { + close(done) + done = nil + } continue } - // Check if keepalive should go dormant. - t.mu.Lock() - if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream { - // Make awakenKeepalive writable. - <-t.awakenKeepalive - t.mu.Unlock() + // If the control gets here, it means more than kp.Time has elapsed since + // the last read, and we need to do one of the following: + // - If keepalive dormancy conditions are met, then go dormant. + // - If there is an active pinger routine, then do nothing. + // - If dormany conditions are not met and there is no active pinger + // routine, start one. + sendPing := true + goDormant := func() bool { + t.mu.Lock() + defer t.mu.Unlock() + if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream { + // If pinger routine is active, it probably means that one was started + // when there was an active stream, but now there is none, and + // therefore we should stop the pinger. + if done != nil { + close(done) + done = nil + } + // Make awakenKeepalive writable. + <-t.awakenKeepalive + return true + } + return false + }() + if goDormant { select { case <-t.awakenKeepalive: - // If the control gets here a ping has been sent - // need to reset the timer with keepalive.Timeout. + // If the control gets here, it means that a ping was sent right + // after stream creation. We need to make sure we get an ack for it + // before kp.Timeout expires. + sendPing = false case <-t.ctx.Done(): return } - } else { - t.mu.Unlock() - if channelz.IsOn() { - atomic.AddInt64(&t.czData.kpCount, 1) - } - // Send ping. - t.controlBuf.put(p) } - - // By the time control gets here a ping has been sent one way or the other. - timer.Reset(t.kp.Timeout) - select { - case <-timer.C: - if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { - timer.Reset(t.kp.Time) - continue - } - t.Close() - return - case <-t.ctx.Done(): - if !timer.Stop() { - <-timer.C - } - return + // Active pinger exists, so there is nothing much to do here. + if done != nil { + continue } + done = make(chan struct{}) + go t.pinger(sendPing, done) case <-t.ctx.Done(): - if !timer.Stop() { - <-timer.C - } return } } diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index baea6befdcde..24a624d36d90 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -462,6 +462,8 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con // TestInflightStreamClosing ensures that closing in-flight stream // sends status error to concurrent stream reader. func TestInflightStreamClosing(t *testing.T) { + t.Parallel() + serverConfig := &ServerConfig{} server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) defer cancel() @@ -501,6 +503,8 @@ func TestInflightStreamClosing(t *testing.T) { // An idle client is one who doesn't make any RPC calls for a duration of // MaxConnectionIdle time. func TestMaxConnectionIdle(t *testing.T) { + t.Parallel() + serverConfig := &ServerConfig{ KeepaliveParams: keepalive.ServerParameters{ MaxConnectionIdle: 2 * time.Second, @@ -529,6 +533,8 @@ func TestMaxConnectionIdle(t *testing.T) { // TestMaxConenctionIdleNegative tests that a server will not send GoAway to a non-idle(busy) client. func TestMaxConnectionIdleNegative(t *testing.T) { + t.Parallel() + serverConfig := &ServerConfig{ KeepaliveParams: keepalive.ServerParameters{ MaxConnectionIdle: 2 * time.Second, @@ -556,6 +562,8 @@ func TestMaxConnectionIdleNegative(t *testing.T) { // TestMaxConnectionAge tests that a server will send GoAway after a duration of MaxConnectionAge. func TestMaxConnectionAge(t *testing.T) { + t.Parallel() + serverConfig := &ServerConfig{ KeepaliveParams: keepalive.ServerParameters{ MaxConnectionAge: 2 * time.Second, @@ -588,6 +596,8 @@ const ( // TestKeepaliveServer tests that a server closes connection with a client that doesn't respond to keepalive pings. func TestKeepaliveServer(t *testing.T) { + t.Parallel() + serverConfig := &ServerConfig{ KeepaliveParams: keepalive.ServerParameters{ Time: 2 * time.Second, @@ -632,6 +642,8 @@ func TestKeepaliveServer(t *testing.T) { // TestKeepaliveServerNegative tests that a server doesn't close connection with a client that responds to keepalive pings. func TestKeepaliveServerNegative(t *testing.T) { + t.Parallel() + serverConfig := &ServerConfig{ KeepaliveParams: keepalive.ServerParameters{ Time: 2 * time.Second, @@ -653,6 +665,8 @@ func TestKeepaliveServerNegative(t *testing.T) { } func TestKeepaliveClientClosesIdleTransport(t *testing.T) { + t.Parallel() + done := make(chan net.Conn, 1) tr, cancel := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{ Time: 2 * time.Second, // Keepalive time = 2 sec. @@ -677,6 +691,8 @@ func TestKeepaliveClientClosesIdleTransport(t *testing.T) { } func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { + t.Parallel() + done := make(chan net.Conn, 1) tr, cancel := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{ Time: 2 * time.Second, // Keepalive time = 2 sec. @@ -700,6 +716,8 @@ func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { } func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { + t.Parallel() + done := make(chan net.Conn, 1) tr, cancel := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{ Time: 2 * time.Second, // Keepalive time = 2 sec. @@ -728,6 +746,8 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { } func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) { + t.Parallel() + s, tr, cancel := setUpWithOptions(t, 0, &ServerConfig{MaxStreams: math.MaxUint32}, normal, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{ Time: 2 * time.Second, // Keepalive time = 2 sec. Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. @@ -747,16 +767,18 @@ func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) { } func TestKeepaliveServerEnforcementWithAbusiveClientNoRPC(t *testing.T) { + t.Parallel() + serverConfig := &ServerConfig{ KeepalivePolicy: keepalive.EnforcementPolicy{ - MinTime: 2 * time.Second, + MinTime: 5 * time.Second, }, } clientOptions := ConnectOptions{ KeepaliveParams: keepalive.ClientParameters{ - Time: 50 * time.Millisecond, - Timeout: 1 * time.Second, - PermitWithoutStream: true, + Time: 2 * time.Second, // Keepalive time = 2 sec. + Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. + PermitWithoutStream: true, // Run keepalive even with no RPCs. }, } server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions) @@ -782,15 +804,17 @@ func TestKeepaliveServerEnforcementWithAbusiveClientNoRPC(t *testing.T) { } func TestKeepaliveServerEnforcementWithAbusiveClientWithRPC(t *testing.T) { + t.Parallel() + serverConfig := &ServerConfig{ KeepalivePolicy: keepalive.EnforcementPolicy{ - MinTime: 2 * time.Second, + MinTime: 5 * time.Second, }, } clientOptions := ConnectOptions{ KeepaliveParams: keepalive.ClientParameters{ - Time: 50 * time.Millisecond, - Timeout: 1 * time.Second, + Time: 2 * time.Second, // Keepalive time = 2 sec. + Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. }, } server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions) @@ -819,16 +843,18 @@ func TestKeepaliveServerEnforcementWithAbusiveClientWithRPC(t *testing.T) { } func TestKeepaliveServerEnforcementWithObeyingClientNoRPC(t *testing.T) { + t.Parallel() + serverConfig := &ServerConfig{ KeepalivePolicy: keepalive.EnforcementPolicy{ - MinTime: 100 * time.Millisecond, + MinTime: 1 * time.Second, PermitWithoutStream: true, }, } clientOptions := ConnectOptions{ KeepaliveParams: keepalive.ClientParameters{ - Time: 101 * time.Millisecond, - Timeout: 1 * time.Second, + Time: 2 * time.Second, + Timeout: 5 * time.Second, PermitWithoutStream: true, }, } @@ -838,7 +864,7 @@ func TestKeepaliveServerEnforcementWithObeyingClientNoRPC(t *testing.T) { defer client.Close() // Give keepalive enough time. - time.Sleep(3 * time.Second) + time.Sleep(10 * time.Second) // Assert that connection is healthy. client.mu.Lock() defer client.mu.Unlock() @@ -848,15 +874,17 @@ func TestKeepaliveServerEnforcementWithObeyingClientNoRPC(t *testing.T) { } func TestKeepaliveServerEnforcementWithObeyingClientWithRPC(t *testing.T) { + t.Parallel() + serverConfig := &ServerConfig{ KeepalivePolicy: keepalive.EnforcementPolicy{ - MinTime: 100 * time.Millisecond, + MinTime: 1 * time.Second, }, } clientOptions := ConnectOptions{ KeepaliveParams: keepalive.ClientParameters{ - Time: 101 * time.Millisecond, - Timeout: 1 * time.Second, + Time: 2 * time.Second, + Timeout: 5 * time.Second, }, } server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions) @@ -869,7 +897,7 @@ func TestKeepaliveServerEnforcementWithObeyingClientWithRPC(t *testing.T) { } // Give keepalive enough time. - time.Sleep(3 * time.Second) + time.Sleep(10 * time.Second) // Assert that connection is healthy. client.mu.Lock() defer client.mu.Unlock() From 5f4d6b81272ce626461c1994855f0821f2464650 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Mon, 22 Apr 2019 12:47:36 -0700 Subject: [PATCH 2/6] Couple of fixes to make tavis happy. --- internal/transport/http2_client.go | 5 ++++- internal/transport/transport_test.go | 23 ++++++++++++++++++++--- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index b094e114df93..adda22ef7cdb 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1241,7 +1241,10 @@ func (t *http2Client) reader() { for { frame, err := t.framer.fr.ReadFrame() if t.keepaliveEnabled { - t.activityCh <- struct{}{} + select { + case t.activityCh <- struct{}{}: + default: + } } if err != nil { // Abort an active stream if the http2.Framer returns a diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 24a624d36d90..92ecbdc59d46 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -979,18 +979,35 @@ func performOneRPC(ct ClientTransport) { func TestClientMix(t *testing.T) { s, ct, cancel := setUp(t, 0, math.MaxUint32, normal) defer cancel() + done := make(chan struct{}) + go func(s *server) { - time.Sleep(5 * time.Second) + select { + case <-done: + case <-time.After(5 * time.Second): + } s.stop() }(s) + go func(ct ClientTransport) { - <-ct.Error() + select { + case <-done: + case <-ct.Error(): + } ct.Close() }(ct) + + var wg sync.WaitGroup for i := 0; i < 1000; i++ { time.Sleep(10 * time.Millisecond) - go performOneRPC(ct) + go func() { + wg.Add(1) + performOneRPC(ct) + wg.Done() + }() } + wg.Wait() + close(done) } func TestLargeMessage(t *testing.T) { From 88a66934b686f8ed349c10f0e720d3cc6ce84193 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Mon, 22 Apr 2019 12:47:36 -0700 Subject: [PATCH 3/6] Couple of fixes to make tavis happy. --- internal/transport/http2_client.go | 5 ++++- internal/transport/transport_test.go | 23 ++++++++++++++++++++--- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index b094e114df93..adda22ef7cdb 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1241,7 +1241,10 @@ func (t *http2Client) reader() { for { frame, err := t.framer.fr.ReadFrame() if t.keepaliveEnabled { - t.activityCh <- struct{}{} + select { + case t.activityCh <- struct{}{}: + default: + } } if err != nil { // Abort an active stream if the http2.Framer returns a diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 24a624d36d90..d1f80e6213a9 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -979,18 +979,35 @@ func performOneRPC(ct ClientTransport) { func TestClientMix(t *testing.T) { s, ct, cancel := setUp(t, 0, math.MaxUint32, normal) defer cancel() + done := make(chan struct{}) + go func(s *server) { - time.Sleep(5 * time.Second) + select { + case <-done: + case <-time.After(5 * time.Second): + } s.stop() }(s) + go func(ct ClientTransport) { - <-ct.Error() + select { + case <-done: + case <-ct.Error(): + } ct.Close() }(ct) + + var wg sync.WaitGroup for i := 0; i < 1000; i++ { time.Sleep(10 * time.Millisecond) - go performOneRPC(ct) + wg.Add(1) + go func() { + performOneRPC(ct) + wg.Done() + }() } + wg.Wait() + close(done) } func TestLargeMessage(t *testing.T) { From 9bec4a11835e0a039ae606b0d358cea825c4cdee Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Tue, 23 Apr 2019 09:29:25 -0700 Subject: [PATCH 4/6] Align the last read time on a 64-bit boundary.. --- internal/transport/http2_client.go | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index adda22ef7cdb..525a48ac826c 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -84,9 +84,7 @@ type http2Client struct { kp keepalive.ClientParameters keepaliveEnabled bool - // Channel to keep track of read activity on the transport. - activityCh chan struct{} - lastReadSec int64 // Accessed atomically. + lr lastRead statsHandler stats.Handler @@ -124,6 +122,16 @@ type http2Client struct { onClose func() } +type lastRead struct { + // The time field here cannot be embedded in the http2Client struct because + // this field is accessed using functions from the atomic package. And on + // 32-bit machines, it is the caller's responsibility to arrange for 64-bit + // alignment of this field. + time int64 + // Channel to keep track of read activity on the transport. + ch chan struct{} +} + func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) { if fn != nil { return fn(ctx, addr) @@ -254,7 +262,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne onGoAway: onGoAway, onClose: onClose, keepaliveEnabled: keepaliveEnabled, - activityCh: make(chan struct{}, 1), + lr: lastRead{ch: make(chan struct{}, 1)}, } t.controlBuf = newControlBuffer(t.ctxDone) if opts.InitialWindowSize >= defaultWindowSize { @@ -1227,7 +1235,7 @@ func (t *http2Client) reader() { } t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!) if t.keepaliveEnabled { - t.activityCh <- struct{}{} + t.lr.ch <- struct{}{} } sf, ok := frame.(*http2.SettingsFrame) if !ok { @@ -1242,7 +1250,7 @@ func (t *http2Client) reader() { frame, err := t.framer.fr.ReadFrame() if t.keepaliveEnabled { select { - case t.activityCh <- struct{}{}: + case t.lr.ch <- struct{}{}: default: } } @@ -1289,12 +1297,12 @@ func (t *http2Client) reader() { } // activityMonitory reads from the activity channel (which is written to, when -// there is a read), and updates the lastReadSec atomic. +// there is a read), and updates the lastRead.time atomic. func (t *http2Client) activityMonitor() { for { select { - case <-t.activityCh: - atomic.StoreInt64(&t.lastReadSec, time.Now().Unix()) + case <-t.lr.ch: + atomic.StoreInt64(&t.lr.time, time.Now().Unix()) case <-t.ctx.Done(): return } @@ -1344,7 +1352,7 @@ func (t *http2Client) keepalive() { case <-ticker.C: // If the amount of time elapsed since the last read is less than kp.Time // and we have an active pinger routine, we need to cancel it. - lr := time.Since(time.Unix(atomic.LoadInt64(&t.lastReadSec), 0)) + lr := time.Since(time.Unix(atomic.LoadInt64(&t.lr.time), 0)) if lr < t.kp.Time { if done != nil { close(done) From ba3e68d064b8e48fa9af09246ef1c26eb3c95763 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Mon, 13 May 2019 13:48:39 -0700 Subject: [PATCH 5/6] Don't use a separate goroutine for the pinger. --- internal/transport/http2_client.go | 107 +++++++++++++++++++++++++++-- 1 file changed, 100 insertions(+), 7 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 525a48ac826c..d3b0b4651e0d 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -123,11 +123,11 @@ type http2Client struct { } type lastRead struct { - // The time field here cannot be embedded in the http2Client struct because - // this field is accessed using functions from the atomic package. And on - // 32-bit machines, it is the caller's responsibility to arrange for 64-bit - // alignment of this field. - time int64 + // Stores the Unix time in nanoseconds. This time cannot be directly embedded + // in the http2Client struct because this field is accessed using functions + // from the atomic package. And on 32-bit machines, it is the caller's + // responsibility to arrange for 64-bit alignment of this field. + timeNano int64 // Channel to keep track of read activity on the transport. ch chan struct{} } @@ -292,6 +292,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr)) } if t.keepaliveEnabled { + go t.activityMonitor() go t.keepalive() } // Start the reader goroutine for incoming message. Each transport has @@ -1297,18 +1298,109 @@ func (t *http2Client) reader() { } // activityMonitory reads from the activity channel (which is written to, when -// there is a read), and updates the lastRead.time atomic. +// there is a read), and updates the lastRead.timeNano atomic. func (t *http2Client) activityMonitor() { for { select { case <-t.lr.ch: - atomic.StoreInt64(&t.lr.time, time.Now().Unix()) + atomic.StoreInt64(&t.lr.timeNano, time.Now().UnixNano()) case <-t.ctx.Done(): return } } } +func minTime(a, b time.Duration) time.Duration { + if a < b { + return a + } + return b +} + +// keepalive running in a separate goroutune makes sure the connection is alive +// by sending pings. +func (t *http2Client) keepalive() { + p := &ping{data: [8]byte{}} + // True iff a PING has been sent, and no data has been received since then + // and the PING hasn't timed out. + var outstandingPing bool + // Amount of time remaining before which we should receive an ACK for the + // last sent PING. + var timeoutLeft time.Duration + // UnixNanos recorded before we go block on the timer. This is required to + // check for read activity since then. + var prevNano int64 + for { + dataRead := false + if prevNano < atomic.LoadInt64(&t.lr.timeNano) { + // Read activity since the last time we were here. + outstandingPing = false + dataRead = true + } + + // Outstanding PING timed out, we are done. + if outstandingPing && timeoutLeft <= 0 { + t.Close() + return + } + + if !dataRead { + // Check if keepalive should go dormant. + t.mu.Lock() + if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream { + // Make awakenKeepalive writable. + <-t.awakenKeepalive + t.mu.Unlock() + select { + case <-t.awakenKeepalive: + // If the control gets here a ping has been sent + // need to reset the timer with keepalive.Timeout. + timeoutLeft = t.kp.Timeout + outstandingPing = true + case <-t.ctx.Done(): + return + } + } else { + t.mu.Unlock() + if !outstandingPing { + if channelz.IsOn() { + atomic.AddInt64(&t.czData.kpCount, 1) + } + t.controlBuf.put(p) + timeoutLeft = t.kp.Timeout + outstandingPing = true + } + } + } + + // Amount of kp.Time remaining should be calculated from the time of the + // last read activity. + timeLeft := t.kp.Time + if dataRead { + timeLeft = time.Duration(atomic.LoadInt64(&t.lr.timeNano)) + t.kp.Time - time.Duration(time.Now().UTC().UnixNano()) + } + // If a PING is outstanding, the amount of time to sleep here should be the + // minimum of timeoutLeft and timeLeft. + sleepDuration := timeLeft + if outstandingPing { + sleepDuration = minTime(timeLeft, timeoutLeft) + timeoutLeft -= sleepDuration + } + + prevNano = time.Now().UTC().UnixNano() + timer := time.NewTimer(sleepDuration) + select { + case <-timer.C: + case <-t.ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return + } + } +} + +/* // pinger sends out a PING frame, and waits for one of the following events to // happen (kp.Timeout expires, top-level context is done, or the done channel // is written to (this usually happens when there is read activity). @@ -1406,6 +1498,7 @@ func (t *http2Client) keepalive() { } } } +*/ func (t *http2Client) Error() <-chan struct{} { return t.ctx.Done() From 10237cf90c3814cd159a8046bb4af81eb2538113 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 17 May 2019 11:15:47 -0700 Subject: [PATCH 6/6] Remove an unused identifier in the code. --- internal/transport/http2_client.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index d3b0b4651e0d..c18331c7201b 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -44,11 +44,6 @@ import ( "google.golang.org/grpc/status" ) -// Keepalive goroutine runs a ticker at this frequency to perform keepalive -// related tasks. Based on proposal A8, at least second precision is expected. -// Tests could set this to a lower value for convenience. -var defaultKeepaliveTickerDuration = 500 * time.Millisecond - // http2Client implements the ClientTransport interface with HTTP2. type http2Client struct { ctx context.Context