Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport: consume per-stream inflow windows ahead of time - large speedup for big messages #1073

Closed
wants to merge 8 commits into from

Conversation

apolcyn
Copy link
Contributor

@apolcyn apolcyn commented Feb 14, 2017

This is one solution to the problem with grpc vs. http1 that was noticed in #1043. (fixed 64K window causing too many round trips for large messages).

Consumes inflow stream windows based off of requested read size (so e.g., gives stream window update of 2M for 2M grpc message before reading it in, but doesn't change connection window).

This technique used in C-core? cc @ctiller.

Can have big speedup for large messages (e.g, ~10s for 2M request/response down to ~1s, with 150ms RTT).

AFAICS Stream.Read isn't safe to do this with since io.Read could be called repeatedly - possible to do but to me a rename seems safer... So this changes transport package to expose Stream.ReadFull(_), which tries to be equivalent to current io.ReadFull("stream", _)

@apolcyn
Copy link
Contributor Author

apolcyn commented Feb 21, 2017

cc @menghanl, this is the Read -> ReadFull change to transport.Stream discussed

@apolcyn
Copy link
Contributor Author

apolcyn commented Feb 22, 2017

cc @sailat

@@ -186,14 +186,15 @@ type Stream struct {
recvCompress string
sendCompress string
buf *recvBuffer
dec io.Reader
fc *inFlow
recvQuota uint32
// The accumulated inbound quota pending for window update.
updateQuota uint32
// The handler to control the window update procedure for both this
// particular stream and the associated transport.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comments need to be updated to explain the new behavior.

}
defer func() { s.readFullErr = err }()
s.streamWindowHandler(int64(len(p)))
return io.ReadFull(s.sr, p)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

n, err = io.ReadFull(s.sr, p) 
return

instead of

return io.ReadFull(s.sr, p)

So that the return values n and err get updated and the defer function has a valid value of err to populate s.readFullErr.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, FYI, return values are assigned via a return statement, so return io.ReadFull(s.sr, p) is fine. Example: https://play.golang.org/p/V1qMks7o9s.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah neat!

@@ -149,45 +149,45 @@ type inFlow struct {
mu sync.Mutex
// pendingData is the overall data which have been received but not been
// consumed by applications.
pendingData uint32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The http2 framer.WriteWindowUpdate(streamID, incr uint32) doesn't support int64. It's better to let the sizes be in uint32.

@apolcyn
Copy link
Contributor Author

apolcyn commented Mar 7, 2017

@MakMukhi

The http2 framer.WriteWindowUpdate(streamID, incr uint32) doesn't support int64. It's better to let the sizes be in uint32.

I think this points to a bug:
The max legal window update and window is (2^31 - 1), but with the max possible grpc message being (2^32 - 1), this PR could give an illegal/overflowing window update.

@apolcyn apolcyn force-pushed the revise_flow_control_master branch 4 times, most recently from ee57995 to d54684c Compare March 9, 2017 01:59
@apolcyn
Copy link
Contributor Author

apolcyn commented Mar 9, 2017

This is UNSAFE as is - there is a major bug here: if we do a pre-emptive window read that puts the pendingData in the window below zero, then receive data on the stream but the app doesn't read it, and then the stream is cancelled, then the pending data on that stream won't be given back to the connection (as is the case currently) - the connection will eventually deadlock if it happens enough.

@apolcyn apolcyn force-pushed the revise_flow_control_master branch from 9d2a30d to 605ea24 Compare March 10, 2017 00:32
@apolcyn
Copy link
Contributor Author

apolcyn commented Mar 10, 2017

latest updates tentatively fix the race in which pending data in a closed stream's recvBuffer isn't given back to the transport.

it also moves the API break where transport.Stream exports ReadFull instead of Read

@glycerine
Copy link

I tried this PR as a solution to the slow performance, but it just truncated my 512 message stream after 3 messages. It seems half-baked/buggy.

@apolcyn
Copy link
Contributor Author

apolcyn commented Mar 28, 2017

@glycerine there was a major bug here due to a simple misuse of io.LimitReader which I think you were hitting. The latest updates fix that.

Though this could still use some more cleanup, I'd like to get this in as a fix to the high-latency/large-message issues, since it doesn't need the user knobs.

This should really help for scenarios involving:

  • high latency
  • large messages
  • low concurrency (the stream window is dynamic but the 1M conn window still doesn't move).

Of course more than welcome to retest with the latest commits here - I'd expect about 15x speedup for the situation above (with window bottleneck moving from 64K to 1M)

@apolcyn apolcyn force-pushed the revise_flow_control_master branch from 311884b to eef27ff Compare March 28, 2017 04:01
@apolcyn
Copy link
Contributor Author

apolcyn commented Mar 28, 2017

go1.6.3 only failed on travis on FlowControlLogicRace - likely a real issue.

@apolcyn
Copy link
Contributor Author

apolcyn commented Mar 28, 2017

actually re-running travis has FlowControlLogicRace pass, and I haven't been able to reproduce that failure either on this branch or master, with go1.6.3 or go1.8

@apolcyn apolcyn force-pushed the revise_flow_control_master branch from 20d2c2c to 78b0b88 Compare April 20, 2017 22:14
@apolcyn
Copy link
Contributor Author

apolcyn commented Apr 20, 2017

cc @dfawley btw this is the PR I mentioned offline. Just squashed and rebased to resolve conflicts. I think this is ok for a look.

if f.pendingData == 0 {
return 0
if n > http2MaxWindowUpdate {
grpclog.Fatalf("potential window update too large. onRead(n) where n is %v; max n is %v", f.pendingUpdate, http2MaxWindowUpdate)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a fatalf? When can such a condition occur?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this

f.mu.Lock()
defer f.mu.Unlock()
if f.loanedWindowSpace > 0 {
grpclog.Fatalf("pre-consuming window space while there is pre-consumed window space still outstanding")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, Fatalf might be too harsh.

}
defer func() { r.readFullErr = err }()
committedReadAmount := min(maxSingleStreamWindowUpdate, uint32(len(p)))
r.loanSpaceInStreamWindow(committedReadAmount)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we will loan out window space every single time. There may be times when we don't even need the loan.
Does it make sense to have a few if conditions like these:

if len(p) - fc.pendingData < fc.limit {
    // no need to loan
} else {
    delta = (len(p) - fc.pendingData) - fc.limit
    loan(delta)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still haven't addressed this yet but might revisit

@@ -495,6 +495,9 @@ func (f *framer) writeSettingsAck(forceFlush bool) error {
}

func (f *framer) writeWindowUpdate(forceFlush bool, streamID, incr uint32) error {
if incr > http2MaxWindowUpdate {
grpclog.Fatalf("attempted window update too large. have %v; max is %v", incr, http2MaxWindowUpdate)
Copy link
Contributor

@MakMukhi MakMukhi Apr 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again Fatalf seems too harsh.

@@ -161,34 +171,51 @@ type inFlow struct {
limit uint32

mu sync.Mutex
// pendingData is the overall data which have been received but not been
// PendingData is the overall data which have been received but not been
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You changed the capitalization here but the field is still unexported. Maybe just remove the "PendingData is" part?

// consumed by applications.
pendingData uint32
// The amount of data the application has consumed but grpc has not sent
// window update for them. Used to reduce window update frequency.
pendingUpdate uint32

// This is temporary space in the incoming flow control that can be granted at convenient times
// to prevent the sender from stalling for lack flow control space.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: lack of flow control

f.mu.Lock()
defer f.mu.Unlock()
if f.loanedWindowSpace > 0 {
grpclog.Fatalf("pre-consuming window space while there is pre-consumed window space still outstanding")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: extra space between still & outstanding.

}
f.loanedWindowSpace = n

if f.loanedWindowSpace+f.pendingUpdate >= f.limit/4 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor improvement:

wu := f.pendingUpdate + f.loanedWindowSpace
if wu >= f.limit/4 {
  f.pendingUpdate = 0
  return wu
}
return 0

}
f.loanedWindowSpace = n

if f.loanedWindowSpace+f.pendingUpdate >= f.limit/4 {
Copy link
Member

@dfawley dfawley Apr 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the significance of the 25% here? Maybe make that a const if it's arbitrary?

if s.readFullErr != nil {
return 0, s.readFullErr
}
defer func() { s.readFullErr = err }()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know if the compiler optimizes this?

If not, this would be better:

n, err = io.ReadFull()
s.readFullErr = err
return n, err

if r.readFullErr != nil {
return 0, r.readFullErr
}
defer func() { r.readFullErr = err }()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment regarding defer vs. straight-line.

// before server starts reading.
time.Sleep(2 * time.Second)
_, err := s.ReadFull(p)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps log the error

}
p := make([]byte, len(req))
_, err := s.ReadFull(p)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same thing

p := make([]byte, len(expectedResponseLarge))

// Give time to server to begin sending before client starts reading.
time.Sleep(2 * time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed on both sides? The server will sleep for 2 seconds before reading and sending response back. Could we just call s.ReadFull and that'll wait for server to send all p bytes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think was needed on both sides only since I wanted to have a good chance that the reader will have started trying to read before any bytes have arrived, on both client and server.

But that said this is racey and I'm wondering if this would be better replaced by unit tests on an inFlow instance ... doing some experimentation.

@@ -1120,10 +1257,12 @@ func TestClientWithMisbehavedServer(t *testing.T) {
if err := ct.Write(s, d, &Options{Last: true, Delay: false}); err != nil && err != io.EOF {
t.Fatalf("Failed to write: %v", err)
}
// reflect to get the inner recvBufferReader, which reads without doing window updates
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/reflect/type assert/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -1182,7 +1321,9 @@ func TestEncodingRequiredStatus(t *testing.T) {
t.Fatalf("Failed to write the request: %v", err)
}
p := make([]byte, http2MaxFrameLen)
if _, err := s.dec.Read(p); err != io.EOF {
// reflect to get the plain recvBufferReader from the stream's stream reader, which doesn't do window updates
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same thing here and below

@apolcyn apolcyn force-pushed the revise_flow_control_master branch from b9639e3 to edfb984 Compare April 26, 2017 04:03
@apolcyn
Copy link
Contributor Author

apolcyn commented Jun 21, 2017

superseded by #1248

@apolcyn apolcyn closed this Jun 21, 2017
@lock lock bot locked as resolved and limited conversation to collaborators Jan 18, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants