Skip to content

Commit

Permalink
Align the keepalive implementation with proposal A8.
Browse files Browse the repository at this point in the history
This commit makes the following changes:
* Keep track of the time of the last read in the transport.
* Use this in the keepalive implementation to decide when to send out
  keepalives.
* Address the issue of keepalives being sent every [Time+Timeout] period
  instead of every [Time] period, as mandated by proposal A8.
* Makes many of the transport tests to run in parallel (as most of them
  spend a lot of time just sleeping, waiting for things to happen).

Proposal A8 is here:
https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md

This commit addresses:
grpc#2638
  • Loading branch information
easwars committed Apr 19, 2019
1 parent d389f9f commit 2a19115
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 60 deletions.
149 changes: 104 additions & 45 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ import (
"google.golang.org/grpc/status"
)

// Keepalive goroutine runs a ticker at this frequency to perform keepalive
// related tasks. Based on proposal A8, at least second precision is expected.
// Tests could set this to a lower value for convenience.
var defaultKeepaliveTickerDuration = 500 * time.Millisecond

// http2Client implements the ClientTransport interface with HTTP2.
type http2Client struct {
ctx context.Context
Expand Down Expand Up @@ -77,11 +82,11 @@ type http2Client struct {

perRPCCreds []credentials.PerRPCCredentials

// Boolean to keep track of reading activity on transport.
// 1 is true and 0 is false.
activity uint32 // Accessed atomically.
kp keepalive.ClientParameters
keepaliveEnabled bool
// Channel to keep track of read activity on the transport.
activityCh chan struct{}
lastReadSec int64 // Accessed atomically.

statsHandler stats.Handler

Expand Down Expand Up @@ -249,6 +254,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
onGoAway: onGoAway,
onClose: onClose,
keepaliveEnabled: keepaliveEnabled,
activityCh: make(chan struct{}, 1),
}
t.controlBuf = newControlBuffer(t.ctxDone)
if opts.InitialWindowSize >= defaultWindowSize {
Expand Down Expand Up @@ -1221,7 +1227,7 @@ func (t *http2Client) reader() {
}
t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
if t.keepaliveEnabled {
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
t.activityCh <- struct{}{}
}
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
Expand All @@ -1235,7 +1241,7 @@ func (t *http2Client) reader() {
for {
frame, err := t.framer.fr.ReadFrame()
if t.keepaliveEnabled {
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
t.activityCh <- struct{}{}
}
if err != nil {
// Abort an active stream if the http2.Framer returns a
Expand Down Expand Up @@ -1279,59 +1285,112 @@ func (t *http2Client) reader() {
}
}

// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
// activityMonitory reads from the activity channel (which is written to, when
// there is a read), and updates the lastReadSec atomic.
func (t *http2Client) activityMonitor() {
for {
select {
case <-t.activityCh:
atomic.StoreInt64(&t.lastReadSec, time.Now().Unix())
case <-t.ctx.Done():
return
}
}
}

// pinger sends out a PING frame, and waits for one of the following events to
// happen (kp.Timeout expires, top-level context is done, or the done channel
// is written to (this usually happens when there is read activity).
func (t *http2Client) pinger(send bool, done chan struct{}) {
if send {
if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
}
t.controlBuf.put(&ping{data: [8]byte{}})
}

timer := time.NewTimer(t.kp.Timeout)
select {
case <-timer.C:
t.Close()
return
case <-t.ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
case <-done:
if !timer.Stop() {
<-timer.C
}
return
}
}

// keepalive running in a separate goroutune makes sure the connection is alive
// by sending pings.
func (t *http2Client) keepalive() {
p := &ping{data: [8]byte{}}
timer := time.NewTimer(t.kp.Time)
go t.activityMonitor()

ticker := time.NewTicker(defaultKeepaliveTickerDuration)
defer ticker.Stop()

var done chan struct{}
for {
select {
case <-timer.C:
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
timer.Reset(t.kp.Time)
case <-ticker.C:
// If the amount of time elapsed since the last read is less than kp.Time
// and we have an active pinger routine, we need to cancel it.
lr := time.Since(time.Unix(atomic.LoadInt64(&t.lastReadSec), 0))
if lr < t.kp.Time {
if done != nil {
close(done)
done = nil
}
continue
}
// Check if keepalive should go dormant.
t.mu.Lock()
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
// Make awakenKeepalive writable.
<-t.awakenKeepalive
t.mu.Unlock()
// If the control gets here, it means more than kp.Time has elapsed since
// the last read, and we need to do one of the following:
// - If keepalive dormancy conditions are met, then go dormant.
// - If there is an active pinger routine, then do nothing.
// - If dormany conditions are not met and there is no active pinger
// routine, start one.
sendPing := true
goDormant := func() bool {
t.mu.Lock()
defer t.mu.Unlock()
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
// If pinger routine is active, it probably means that one was started
// when there was an active stream, but now there is none, and
// therefore we should stop the pinger.
if done != nil {
close(done)
done = nil
}
// Make awakenKeepalive writable.
<-t.awakenKeepalive
return true
}
return false
}()
if goDormant {
select {
case <-t.awakenKeepalive:
// If the control gets here a ping has been sent
// need to reset the timer with keepalive.Timeout.
// If the control gets here, it means that a ping was sent right
// after stream creation. We need to make sure we get an ack for it
// before kp.Timeout expires.
sendPing = false
case <-t.ctx.Done():
return
}
} else {
t.mu.Unlock()
if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
}
// Send ping.
t.controlBuf.put(p)
}

// By the time control gets here a ping has been sent one way or the other.
timer.Reset(t.kp.Timeout)
select {
case <-timer.C:
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
timer.Reset(t.kp.Time)
continue
}
t.Close()
return
case <-t.ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
// Active pinger exists, so there is nothing much to do here.
if done != nil {
continue
}
done = make(chan struct{})
go t.pinger(sendPing, done)
case <-t.ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
}
}
Expand Down
Loading

0 comments on commit 2a19115

Please sign in to comment.