Skip to content

Commit

Permalink
Align the keepalive implementation with proposal A8.
Browse files Browse the repository at this point in the history
This commit makes the following changes:
* Keep track of the time of the last read in the transport.
* Use this in the keepalive implementation to decide when to send out
  keepalives.
* Address the issue of keepalives being sent every [Time+Timeout] period
  instead of every [Time] period, as mandated by proposal A8.
* Makes many of the transport tests to run in parallel (as most of them
  spend a lot of time just sleeping, waiting for things to happen).

Proposal A8 is here:
https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md

This commit addresses:
grpc#2638
  • Loading branch information
easwars committed Aug 30, 2019
1 parent d5a36f0 commit c3a2e9b
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 45 deletions.
106 changes: 79 additions & 27 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,9 @@ type http2Client struct {

perRPCCreds []credentials.PerRPCCredentials

// Boolean to keep track of reading activity on transport.
// 1 is true and 0 is false.
activity uint32 // Accessed atomically.
kp keepalive.ClientParameters
keepaliveEnabled bool
lr lastRead

statsHandler stats.Handler

Expand Down Expand Up @@ -129,6 +127,16 @@ type http2Client struct {
bufferPool *bufferPool
}

type lastRead struct {
// Stores the Unix time in nanoseconds. This time cannot be directly embedded
// in the http2Client struct because this field is accessed using functions
// from the atomic package. And on 32-bit machines, it is the caller's
// responsibility to arrange for 64-bit alignment of this field.
timeNano int64
// Channel to keep track of read activity on the transport.
ch chan struct{}
}

func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
if fn != nil {
return fn(ctx, addr)
Expand Down Expand Up @@ -259,6 +267,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
onClose: onClose,
keepaliveEnabled: keepaliveEnabled,
bufferPool: newBufferPool(),
activityCh: make(chan struct{}, 1),
lr: lastRead{ch: make(chan struct{}, 1)},
}
t.controlBuf = newControlBuffer(t.ctxDone)
if opts.InitialWindowSize >= defaultWindowSize {
Expand Down Expand Up @@ -286,6 +296,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
}
if t.keepaliveEnabled {
t.kpDormancyCond = sync.NewCond(&t.mu)
go t.activityMonitor()
go t.keepalive()
}
// Start the reader goroutine for incoming message. Each transport has
Expand Down Expand Up @@ -1233,7 +1244,7 @@ func (t *http2Client) reader() {
}
t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
if t.keepaliveEnabled {
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
t.lr.ch <- struct{}{}
}
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
Expand All @@ -1248,7 +1259,10 @@ func (t *http2Client) reader() {
t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
if t.keepaliveEnabled {
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
select {
case t.lr.ch <- struct{}{}:
default:
}
}
if err != nil {
// Abort an active stream if the http2.Framer returns a
Expand Down Expand Up @@ -1292,17 +1306,54 @@ func (t *http2Client) reader() {
}
}

// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
// activityMonitory reads from the activity channel (which is written to, when
// there is a read), and updates the lastRead.timeNano atomic.
func (t *http2Client) activityMonitor() {
for {
select {
case <-t.lr.ch:
atomic.StoreInt64(&t.lr.timeNano, time.Now().UnixNano())
case <-t.ctx.Done():
return
}
}
}

func minTime(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}

// keepalive running in a separate goroutune makes sure the connection is alive
// by sending pings.
func (t *http2Client) keepalive() {
p := &ping{data: [8]byte{}}
// True iff a ping has been sent, and no data has been received since then.
outstandingPing := false
// Amount of time remaining before which we should receive an ACK for the
// last sent ping.
timeoutLeft := time.Duration(0)
// UnixNanos recorded before we go block on the timer. This is required to
// check for read activity since then.
prevNano := time.Now().UTC().UnixNano()
timer := time.NewTimer(t.kp.Time)
for {
select {
case <-timer.C:
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
timer.Reset(t.kp.Time)
if lastRead := atomic.LoadInt64(&t.lr.timeNano); lastRead > prevNano {
// Read activity since the last time we were here.
outstandingPing = false
prevNano = time.Now().UTC().UnixNano()
// Timer should fire at kp.Time seconds from lastRead time.
timer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(prevNano))
continue
}
if outstandingPing && timeoutLeft <= 0 {
t.Close()
return
}
t.mu.Lock()
if t.state == closing {
// If the transport is closing, we should exit from the
Expand All @@ -1315,36 +1366,37 @@ func (t *http2Client) keepalive() {
return
}
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
// If a ping was sent out previously (because there were active
// streams at that point) which wasn't acked and it's timeout
// hadn't fired, but we got here and are about to go dormant,
// we should make sure that we unconditionally send a ping once
// we awaken.
outstandingPing = false
t.kpDormant = true
t.kpDormancyCond.Wait()
}
t.kpDormant = false
t.mu.Unlock()

if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
}
// We get here either because we were dormant and a new stream was
// created which unblocked the Wait() call, or because the
// keepalive timer expired. In both cases, we need to send a ping.
t.controlBuf.put(p)

timer.Reset(t.kp.Timeout)
select {
case <-timer.C:
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
timer.Reset(t.kp.Time)
continue
if !outstandingPing {
if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
}
infof("transport: closing client transport due to idleness.")
t.Close()
return
case <-t.ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
t.controlBuf.put(p)
timeoutLeft = t.kp.Timeout
outstandingPing = true
}
// The amount of time to sleep here is the minimum of kp.Time and
// timeoutLeft. This will ensure that we wait only for kp.Time
// before sending out the next ping (for cases where the ping is
// acked).
sleepDuration = minTime(kp.Time, timeoutLeft)
timeoutLeft -= sleepDuration
prevNano = time.Now().UTC().UnixNano()
timer.Reset(timeoutLeft)
case <-t.ctx.Done():
if !timer.Stop() {
<-timer.C
Expand Down
Loading

0 comments on commit c3a2e9b

Please sign in to comment.