-
Notifications
You must be signed in to change notification settings - Fork 4.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dedicated goroutine for writing. #1498
Conversation
5e48328
to
de64f47
Compare
cd360b6
to
d855a19
Compare
8b852e3
to
b6eeb31
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks really good for such a big change. Just a few minor things.
.travis.yml
Outdated
@@ -18,4 +18,4 @@ before_install: | |||
|
|||
script: | |||
- if [[ "$TRAVIS_GO_VERSION" = 1.9* && "$ARCH" != "386" ]]; then ./vet.sh || exit 1; fi | |||
- make test testrace | |||
- travis_wait make test testrace |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert in this PR?
@@ -891,9 +891,6 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp | |||
trInfo: trInfo, | |||
statsHandler: sh, | |||
} | |||
if ss.cp != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we want this change in this PR (?). This buffer is still relevant and I think this is beneficial for streaming RPCs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually needed, since now a SendMsg call can be made again before the previous msg was written on the wire. In such a case if we have a common buffer, we'll run into a race; concurrent access of the underlying buffer by encode and the writer goroutine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. Did you run the benchmark for streaming large messages with only one concurrent call/connection? (Does that benchmark do repeated sends on the stream or just one send and one receive?) That is the only place where this change should show much impact, so it would be interesting to see how much, if at all, it was impacted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The streaming benchmark does sent multiple messages on a stream.
Run on master:
Stream-noTrace-latency_0s-kbps_0-MTU_0-maxConcurrentCalls_1-reqSize_1048576B-respSize_1048576B-Compressor_false
latency_0s-kbps_0-MTU_0-maxConcurrentCalls_1-reqSize_1048576B-respSize_1048576B-Compressor_false
2116 4727531 ns/op 10686814 B/op 2390 allocs/op
Histogram (unit: ms)
Count: 2116 Min: 3.9 Max: 6.1 Avg: 4.73
------------------------------------------------------------
[ 3.892775, 3.892776) 1 0.0% 0.0%
[ 3.892776, 3.892780) 0 0.0% 0.0%
[ 3.892780, 3.892801) 0 0.0% 0.0%
[ 3.892801, 3.892905) 0 0.0% 0.0%
[ 3.892905, 3.893434) 0 0.0% 0.0%
[ 3.893434, 3.896116) 0 0.0% 0.0%
[ 3.896116, 3.909704) 1 0.0% 0.1%
[ 3.909704, 3.978559) 7 0.3% 0.4%
[ 3.978559, 4.327462) 240 11.3% 11.8% #
[ 4.327462, 6.095440) 1866 88.2% 100.0% #########
[ 6.095440, inf) 1 0.0% 100.0%
On loopy writer:
Stream-noTrace-latency_0s-kbps_0-MTU_0-maxConcurrentCalls_1-reqSize_1048576B-respSize_1048576B-Compressor_false
latency_0s-kbps_0-MTU_0-maxConcurrentCalls_1-reqSize_1048576B-respSize_1048576B-Compressor_false
2615 3824685 ns/op 10669306 B/op 1798 allocs/op
Histogram (unit: ms)
Count: 2615 Min: 3.2 Max: 4.7 Avg: 3.82
------------------------------------------------------------
[ 3.182852, 3.182853) 1 0.0% 0.0%
[ 3.182853, 3.182857) 0 0.0% 0.0%
[ 3.182857, 3.182876) 0 0.0% 0.0%
[ 3.182876, 3.182968) 0 0.0% 0.0%
[ 3.182968, 3.183415) 0 0.0% 0.0%
[ 3.183415, 3.185592) 0 0.0% 0.0%
[ 3.185592, 3.196198) 2 0.1% 0.1%
[ 3.196198, 3.247853) 15 0.6% 0.7%
[ 3.247853, 3.499428) 283 10.8% 11.5% #
[ 3.499428, 4.724684) 2313 88.5% 100.0% #########
[ 4.724684, inf) 1 0.0% 100.0%
test/end2end_test.go
Outdated
bl := l.beLazy | ||
l.mu.Unlock() | ||
if bl { | ||
// The sleep duration here needs to less that the leakCheck deadline. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that->than
test/end2end_test.go
Outdated
} | ||
|
||
func (l *lazyConn) Write(b []byte) (int, error) { | ||
l.mu.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Atomic instead of lock?
test/end2end_test.go
Outdated
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded { | ||
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, context.DeadlineExceeded", err) | ||
} | ||
if time.Now().After(t1.Add(2 * timeout)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if time.Since(t1) > 2 * time.Second {
?
transport/control.go
Outdated
|
||
func (qb *quotaPool) acquireWithVersion() (<-chan int, uint64) { | ||
qb.mu.Lock() | ||
version := qb.version |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe an atomic instead of a lock? You still need the lock for incrementing the version at the same time as the add(), and for compareAndExecute, but that is presumably a more-rare operation than just getting the channel.
@@ -270,44 +262,45 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( | |||
return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface)) | |||
} | |||
if t.initialWindowSize != defaultWindowSize { | |||
err = t.framer.writeSettings(true, http2.Setting{ | |||
err = t.framer.fr.WriteSettings(http2.Setting{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are still being done without a deadline?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Offline discussion. Follow-up PR will take care of this case.
s.localSendQuota.add(ltq - ps) // It's ok if we make it negative. | ||
var endStream bool | ||
// See if this is the last frame to be written. | ||
if opts.Last { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to combine these conditions and still have conditions understandable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Offline discussion. Leaving it as is. Let's chat more on it if needed.
transport/transport.go
Outdated
case <-done: | ||
return | ||
} | ||
subloop: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about "hasData" or something to explain what distinguishes this from the above?
Don't block a writer on the underlying syscall since the context may expire and the writer will sit blocked on the syscall to finish.
Also, this gives performance benefits.