-
Notifications
You must be signed in to change notification settings - Fork 4.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Expand stream's flow control in case of an active read. #1248
Conversation
transport/control.go
Outdated
} | ||
f.mu.Lock() | ||
defer f.mu.Unlock() | ||
senderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small wording nit: rename to estimatedSenderQuota
and estimatedUntransmittedData
, for clarity here?
transport/control.go
Outdated
@@ -167,14 +169,37 @@ type inFlow struct { | |||
// The amount of data the application has consumed but grpc has not sent | |||
// window update for them. Used to reduce window update frequency. | |||
pendingUpdate uint32 | |||
// delta is the extra window update given by receiver when an application | |||
// is reading data bigger in size than the inFlow limit. | |||
delta int32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be unsigned since it can't be negative? (Also, you cast it to uint32 when reading and cast to int32 when assigning.)
transport/control.go
Outdated
@@ -189,6 +214,14 @@ func (f *inFlow) onRead(n uint32) uint32 { | |||
return 0 | |||
} | |||
f.pendingData -= n | |||
if f.delta > 0 { | |||
f.delta -= int32(n) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would be easier to read without the casts and taking the negative of a negative number:
if n > f.delta {
n -= f.delta
f.delta = 0
} else {
f.delta -= n
n = 0
}
(Also would allow f.delta to be uint32 like it seems like it wants to be.)
transport/http2_client.go
Outdated
@@ -173,9 +173,9 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( | |||
conn, err := dial(ctx, opts.Dialer, addr.Addr) | |||
if err != nil { | |||
if opts.FailOnNonTempDialError { | |||
return nil, connectionErrorf(isTemporary(err), err, "transport: %v", err) | |||
return nil, connectionErrorf(isTemporary(err), err, "transport: Error while dialing %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: lowercase "e" in "error", and add a colon before the error being appended.
transport/http2_client.go
Outdated
if s.state == streamDone { | ||
return | ||
} | ||
if w := s.fc.maybeAdjust(n); n > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should the if
be checking w
?
If not, check n
first.
transport/http2_server.go
Outdated
if s.state == streamDone { | ||
return | ||
} | ||
if w := s.fc.maybeAdjust(n); n > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above re: w
and n
.
Can this function be shared somehow? Should it be a method on the Stream instead of server/client?
transport/transport.go
Outdated
// Read reads all the data available for this Stream from the transport and | ||
// Read reads all p bytes from the wire for this stream. | ||
func (s *Stream) Read(p []byte) (n int, err error) { | ||
// Don't request a read if there's was an error earlier |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "there was"
test/end2end_test.go
Outdated
te := newTest(t, e) | ||
te.startServer(&testServer{security: e.security}) | ||
defer te.tearDown() | ||
tc := testpb.NewTestServiceClient(te.clientConn()) | ||
|
||
stream, err := tc.StreamingInputCall(te.ctx) | ||
ctx, _ := context.WithTimeout(te.ctx, time.Minute*3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: 30s
@@ -67,7 +75,7 @@ func TestSimpleParsing(t *testing.T) { | |||
// Check that messages with length >= 2^24 are parsed. | |||
{append([]byte{0, 1, 0, 0, 0}, bigMsg...), nil, bigMsg, compressionNone}, | |||
} { | |||
buf := bytes.NewReader(test.p) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think the use of fullReader
and other changes to this file can be reverted, since the transport is still an io.Reader
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These test here rely on recvMsg's behavior of reading the full message, (which is true when it interacts with the transport stream). However, here the parser is given a "fake" buffer instead of the stream. Given that we changed recvMsg to go from io.ReadFull to p.r.Read, the "fake" buffer here needs to read the full message just like the transport stream does.
transport/control.go
Outdated
} | ||
f.mu.Lock() | ||
defer f.mu.Unlock() | ||
estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional nit, sorry one more naming suggestion:
change to maxPossibleSenderQuota
and (keep minPossibleUnTransmittedData
estUnTransmitted
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about I add comments and leave the names as is? I fear long names take us away from Go-styling.
transport/transport_test.go
Outdated
inBuf := make([]byte, 1) | ||
actualCount, actualErr := s.Read(inBuf) | ||
if actualCount != 0 { | ||
t.Errorf("actualCount, _ := s.Read(_) differs; want %v; got %v", 0, actualCount) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s/want %v/want 0
Building on original PR by apolcyn@ #1073
Benchmark on scenario:
Results before the changes: QPS: 0.1
Results after the changes: QPS: 0.9