-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Keepalive pings should be sent every [Time+Timeout] period and not every [Time] period #2790
Conversation
internal/transport/http2_client.go
Outdated
timer := time.NewTimer(t.kp.Time) | ||
go t.activityMonitor() | ||
|
||
ticker := time.NewTicker(defaultKeepaliveTickerDuration) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we use a ticker rather than a timer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this implementation if timeout is greater that time and the client reads data t seconds after sending a ping where time < t < timeout, then the connection will be closed. But I think it shouldn't be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use a ticker instead of a timer for convenience reasons. Since we have a forever for
loop here, if we use a timer we would have to take care of resetting the timer every time it fires. And since the timer value does not change, ticker is easier to use.
For your second comment, I think this is what will happen:
- at time t0, lets say we have some read activity (and nothing after that)
- at time t1 = (t0 + keepalive.Time), whenever the next ticker fires, we will start a
pinger
- at time t2 = (t1 + delta), where keepalive.Time < delta < keepalive.Timeout, we receive the ping_ack
- the next time the ticker fires, we will detect read activity, and will close the
pinger
Now, if keepalive.Timeout
expires after t2, but before the ticker fires next, you are right, the connection will be closed. The proposal only asks for second granularity in these events, and since our ticker fires twice every second we should be able to guarantee that we do the right thing, except when the keepalive.Timeout
expiration and the read of the ping_ack don't happen at such close proximity that our ticker granularity can't catch them. Does this make sense?
internal/transport/http2_client.go
Outdated
// If the control gets here, it means that a ping was sent right | ||
// after stream creation. We need to make sure we get an ack for it | ||
// before kp.Timeout expires. | ||
sendPing = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't understand how this can happen. Can you explain this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In http2Client.NewStream()
, in the case where the number of active streams has changed from 0 to 1, we write on the awakenKeepalive
channel to awaken the keepalive
goroutine. After sending on the awakenKeepalive
channel, we also send out a PING. This implementation detail is based on the proposal A8. Related excerpt here:
Since keepalive is not occurring on HTTP/2 connections without any streams, there will be a higher chance of failure for new RPCs following a long period of inactivity. To reduce the tail latency for these RPCs, it is important to not reset the keepalive time when a connection becomes active; if a new stream is created and there has been greater than 'keepalive time' since the last read byte, then a keepalive PING should be sent (ideally before the HEADERS frame). Doing so detects the broken connection with a latency of keepalive timeout instead of keepalive time + timeout.
So, in our keepalive
routine, when we are awakened we know that a PING has already been sent. So, we start the pinger
and ask it to not send out a new PING.
@@ -462,6 +462,8 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con | |||
// TestInflightStreamClosing ensures that closing in-flight stream | |||
// sends status error to concurrent stream reader. | |||
func TestInflightStreamClosing(t *testing.T) { | |||
t.Parallel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe @dfawley has some comments about running parallel tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did see a considerable reduction in test execution time with parallel tests :)
internal/transport/http2_client.go
Outdated
for { | ||
select { | ||
case <-t.lr.ch: | ||
atomic.StoreInt64(&t.lr.time, time.Now().Unix()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we already have an int64, should we store last read time with a higher precision?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I decided to go with second
precision since that's what the proposal asks for, but we could as well store time.Now().UnixNano()
if that is preferable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PTAL.
internal/transport/http2_client.go
Outdated
for { | ||
select { | ||
case <-t.lr.ch: | ||
atomic.StoreInt64(&t.lr.time, time.Now().Unix()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I decided to go with second
precision since that's what the proposal asks for, but we could as well store time.Now().UnixNano()
if that is preferable.
internal/transport/http2_client.go
Outdated
timer := time.NewTimer(t.kp.Time) | ||
go t.activityMonitor() | ||
|
||
ticker := time.NewTicker(defaultKeepaliveTickerDuration) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use a ticker instead of a timer for convenience reasons. Since we have a forever for
loop here, if we use a timer we would have to take care of resetting the timer every time it fires. And since the timer value does not change, ticker is easier to use.
For your second comment, I think this is what will happen:
- at time t0, lets say we have some read activity (and nothing after that)
- at time t1 = (t0 + keepalive.Time), whenever the next ticker fires, we will start a
pinger
- at time t2 = (t1 + delta), where keepalive.Time < delta < keepalive.Timeout, we receive the ping_ack
- the next time the ticker fires, we will detect read activity, and will close the
pinger
Now, if keepalive.Timeout
expires after t2, but before the ticker fires next, you are right, the connection will be closed. The proposal only asks for second granularity in these events, and since our ticker fires twice every second we should be able to guarantee that we do the right thing, except when the keepalive.Timeout
expiration and the read of the ping_ack don't happen at such close proximity that our ticker granularity can't catch them. Does this make sense?
internal/transport/http2_client.go
Outdated
// If the control gets here, it means that a ping was sent right | ||
// after stream creation. We need to make sure we get an ack for it | ||
// before kp.Timeout expires. | ||
sendPing = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In http2Client.NewStream()
, in the case where the number of active streams has changed from 0 to 1, we write on the awakenKeepalive
channel to awaken the keepalive
goroutine. After sending on the awakenKeepalive
channel, we also send out a PING. This implementation detail is based on the proposal A8. Related excerpt here:
Since keepalive is not occurring on HTTP/2 connections without any streams, there will be a higher chance of failure for new RPCs following a long period of inactivity. To reduce the tail latency for these RPCs, it is important to not reset the keepalive time when a connection becomes active; if a new stream is created and there has been greater than 'keepalive time' since the last read byte, then a keepalive PING should be sent (ideally before the HEADERS frame). Doing so detects the broken connection with a latency of keepalive timeout instead of keepalive time + timeout.
So, in our keepalive
routine, when we are awakened we know that a PING has already been sent. So, we start the pinger
and ask it to not send out a new PING.
@@ -462,6 +462,8 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con | |||
// TestInflightStreamClosing ensures that closing in-flight stream | |||
// sends status error to concurrent stream reader. | |||
func TestInflightStreamClosing(t *testing.T) { | |||
t.Parallel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did see a considerable reduction in test execution time with parallel tests :)
internal/transport/http2_client.go
Outdated
} | ||
|
||
// keepalive running in a separate goroutune makes sure the connection is alive | ||
// by sending pings. | ||
func (t *http2Client) keepalive() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @easwars,
I think spawning a new go routine to keep track of the timeout for each ping sent may hurt the performance. Can we use a single go routine to keep track of Time and Timeout at the same time? I have this suggestion:
func minTime(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}
func (t *http2Client) keepalive() {
p := &ping{data: [8]byte{}}
// True iff a PING sent, no data is received since then and didn't timeout.
outstandingPing := false
var ackWaitDuration time.Duration
for {
dataRead := atomic.CompareAndSwapUint32(&t.activity, 1, 0)
if dataRead {
outstandingPing = false
}
// If there was an outstanding ping, last timer was a timeout timer and the ping timed out
if outstandingPing && ackWaitDuration == 0 {
t.Close()
return
}
// Last timer was a time timer
// No data read
if !dataRead {
t.mu.Lock()
dormant := len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream
t.mu.Unlock()
// If not dormant and no outstanding ping then send a PING
if !dormant && !outstandingPing {
t.controlBuf.put(p)
ackWaitDuration = t.kp.Timeout
outstandingPing = true
}
}
sleepDuration := t.kp.Time
// If a PING is sent then we set up a timer for minimum of remaining timeout duration and time duration
if outstandingPing {
sleepDuration = minTime(t.kp.Time, ackWaitDuration)
ackWaitDuration -= sleepDuration
}
timer := time.NewTimer(sleepDuration)
select {
case <-timer.C:
case <-t.ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of notes on this code:
It uses t.activity
rather than your last read time. I think using last read time is much better. But that requires some changes in the if conditions (time since last read with kp.Time or kp.Timeout)
It is an oversimplification of the dormancy case. I think it is functionally equivalent. We don't send any PINGs if dormancy conditions are met but the go routine keeps working. I think having this go routine sleep if it is dormant is a good idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I modified the code to look like this:
func (t *http2Client) keepalive() {
p := &ping{data: [8]byte{}}
// True iff a PING has been sent, and no data has been received since then
// and the PING hasn't timed out.
var outstandingPing bool
// Amount of time remaining before which we should receive an ACK for the
// last sent PING.
var timeoutLeft time.Duration
// UnixNanos recorded before we go block on the timer. This is required to
// check for read activity since then.
var prevNano int64
for {
dataRead := false
if prevNano < atomic.LoadInt64(&t.lr.timeNano) {
// Read activity since the last time we were here.
outstandingPing = false
dataRead = true
}
// Outstanding PING timed out, we are done.
if outstandingPing && timeoutLeft <= 0 {
t.Close()
return
}
if !dataRead {
// Check if keepalive should go dormant.
t.mu.Lock()
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
// Make awakenKeepalive writable.
<-t.awakenKeepalive
t.mu.Unlock()
select {
case <-t.awakenKeepalive:
// If the control gets here a ping has been sent
// need to reset the timer with keepalive.Timeout.
timeoutLeft = t.kp.Timeout
outstandingPing = true
case <-t.ctx.Done():
return
}
} else {
t.mu.Unlock()
if !outstandingPing {
if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
}
t.controlBuf.put(p)
timeoutLeft = t.kp.Timeout
outstandingPing = true
}
}
}
// Amount of kp.Time remaining should be calculated from the time of the
// last read activity.
timeLeft := t.kp.Time
if dataRead {
timeLeft = time.Duration(atomic.LoadInt64(&t.lr.timeNano)) + t.kp.Time - time.Duration(time.Now().UTC().UnixNano())
}
// If a PING is outstanding, the amount of time to sleep here should be the
// minimum of timeoutLeft and timeLeft.
sleepDuration := timeLeft
if outstandingPing {
sleepDuration = minTime(timeLeft, timeoutLeft)
timeoutLeft -= sleepDuration
}
prevNano = time.Now().UTC().UnixNano()
timer := time.NewTimer(sleepDuration)
select {
case <-timer.C:
case <-t.ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
}
}
}
Existing tests seem to pass. I still need to warm up more to this code though, although I do feel this is better. I will also try to see if I can add more tests to increase my confidence level.
Thanks for the suggestion.
Apologize for the hibernation on this thread. Finally got around to run some benchmarks with keepalive enabled. Features for the benchmark::
Here
Here
Here
There is negligible performance difference between the two approaches. So I can go with either one. @canguler @dfawley : I couldn't figure out how to update the timestamp directly in some syscall handler that you mentioned. Could you please point me to that, and I can try that out as well. |
Hey @easwars, I've talked with @dfawley. Framer reads from conn. But we are wrapping this with a bufio.Reader. An option would be to implement this wrapper ourselves, just as we did for the writer. Here is the relevant part: grpc-go/internal/transport/http_util.go Lines 661 to 669 in 684ef04
Once we have an implementation for this wrapper, then we can use it to get the timestamp whenever Read is called on it. |
I tried implementing just the parts of bufio.Reader that we need here, and it turns it that we need quite a bit. We not only need to implement the So, with the new Comparison between current code (none of the changes proposed in this PR) and keepalive changes with the new
Comparison between proposed changes in this PR (writing timestamp on a channel) and the new
Comparison between writing timestamp directly in
No significant performance difference between either of the three approaches, while a slight degradation in performance for all three alternatives when compared to existing code (which was expected I guess). |
This commit makes the following changes: * Keep track of the time of the last read in the transport. * Use this in the keepalive implementation to decide when to send out keepalives. * Address the issue of keepalives being sent every [Time+Timeout] period instead of every [Time] period, as mandated by proposal A8. * Makes many of the transport tests to run in parallel (as most of them spend a lot of time just sleeping, waiting for things to happen). Proposal A8 is here: https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md This commit addresses: grpc#2638
This resolver doesn't do much at this point, except returning an empty address list and a hard-coded service config which picks the xds balancer with a round_robin child policy. Also moved the xdsConfig struct to the xds/internal package and exported it as LBConfig, so that both the resolver and the balancer packages can make use of this.
Total number of Allocs and AllocedBytes retrieved from runtime.Memstats() were not being divided by the number of completed operations during the benchmark run, to get the correct number of Allocs/op and Bytes/op.
The current code was using a buffered channel which was used to signal whether the keepalive goroutine should enter dormancy or if it should be awaked from sleep. Using sync.Cond makes the code much simpler to read, and also improves the performance numbers in all but one front.
This commit makes the following changes: * Keep track of the time of the last read in the transport. * Use this in the keepalive implementation to decide when to send out keepalives. * Address the issue of keepalives being sent every [Time+Timeout] period instead of every [Time] period, as mandated by proposal A8. * Makes many of the transport tests to run in parallel (as most of them spend a lot of time just sleeping, waiting for things to happen). Proposal A8 is here: https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md This commit addresses: grpc#2638
This is now tracked in #3102. |
This commit makes the following changes:
keepalives.
instead of every [Time] period, as mandated by proposal A8.
spend a lot of time just sleeping, waiting for things to happen).
Proposal A8 is here:
https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md
Fixes #2638