-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add package stats, and export stats. #961
Conversation
e18a430
to
78d4f0d
Compare
func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer) ([]byte, error) { | ||
var b []byte | ||
var length uint | ||
func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outgoingPayloadStats *stats.OutgoingPayloadStats) ([]byte, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/outgoingPayloadStats/outStats/
@@ -311,11 +324,15 @@ func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) er | |||
return nil | |||
} | |||
|
|||
func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int) error { | |||
func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int, incomingPayloadStats *stats.IncomingPayloadStats) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/incomingPayloadStats/inStats/
var cbuf *bytes.Buffer | ||
var ( | ||
cbuf *bytes.Buffer | ||
outgoingPayloadStats *stats.OutgoingPayloadStats |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change "outgoing/Outgoing" to "out/Out" and "incoming/Incoming" to "in/In" in all the places.
* | ||
*/ | ||
|
||
// Package stats reports stats for gRPC. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Package stats is for collecting and reporting various network and RPC stats.
|
||
// Stats contains stats information about RPCs. | ||
// All stats types in this package implements this interface. | ||
type Stats interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name it RPCStats since you will add connection stats later?
|
||
// OutgoingHeaderStats indicates a header is sent. | ||
// Method, addresses and Encryption are only valid if IsClient is true. | ||
type OutgoingHeaderStats struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OutHeader
func (s *OutgoingHeaderStats) isStats() {} | ||
|
||
// OutgoingTrailerStats indicates a trailer is sent. | ||
type OutgoingTrailerStats struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OutTrailer
func (s *OutgoingTrailerStats) isStats() {} | ||
|
||
// ErrorStats indicates an error happens. | ||
type ErrorStats struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RPCErr
} | ||
|
||
// RegisterHandler registers the user handler function and starts the stats collection. | ||
// This handler function will be called to process the stats. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add something to prevent it from calling multiple times?
start() | ||
} | ||
|
||
// start starts the stats collection. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we export this too?
d8d33f8
to
ca1e7b9
Compare
IsClient() bool | ||
} | ||
|
||
// InPayload contains the information for a incoming payload. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
an incoming
// RPCStats contains stats information about RPCs. | ||
// All stats types in this package implements this interface. | ||
type RPCStats interface { | ||
// IsClient indicates if the stats is a client stats. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IsClient is true if this RPCStats is from client side.
// InPayload contains the information for a incoming payload. | ||
type InPayload struct { | ||
// Client indicates if this stats is a client stats. | ||
Client bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Client is true if this InPayload is from client side.
} | ||
|
||
// IsClient indicates if the stats is a client stats. | ||
func (s *InPayload) IsClient() bool { return s.Client } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IsClient indicates if this is from client side.
// IsClient indicates if the stats is a client stats. | ||
func (s *InPayload) IsClient() bool { return s.Client } | ||
|
||
// InHeader indicates a header is received. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InHeader contains stats when a header is received.
type OutTrailer struct { | ||
// Client indicates if this stats is a client stats. | ||
Client bool | ||
// WireLength is the wire length of header. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/header/trailer
WireLength int | ||
} | ||
|
||
// IsClient indicates if the stats is a client stats. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
// IsClient indicates if the stats is a client stats. | ||
func (s *OutTrailer) IsClient() bool { return s.Client } | ||
|
||
// RPCErr indicates an error happens. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
||
// RPCErr indicates an error happens. | ||
type RPCErr struct { | ||
// Client indicates if this stats is a client stats. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Error error | ||
} | ||
|
||
// IsClient indicates if the stats is a client stats. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Thanks for the review. All fixed. PTAL. |
|
||
// On indicates whether stats is started. | ||
func On() bool { | ||
return atomic.LoadInt32(on) == 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better use CompareAndSwapInt32.
return atomic.LoadInt32(on) == 1 | ||
} | ||
|
||
// Handle returns the call back function registered by user to process the stats. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle returns the function registered to process the stats.
} | ||
|
||
// Start starts the stats collection. | ||
// Stats will only be started if handler is not nil. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Start starts the stats collection and reporting if there is a registered stats handle.
} | ||
|
||
// Stop stops the collection of any further stats. | ||
// Stop won't unregister handler. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stop does not
atomic.StoreInt32(on, 1) | ||
} | ||
|
||
// Stop stops the collection of any further stats. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stop stops the stats collection and processing.
@@ -63,14 +64,24 @@ func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, s | |||
return | |||
} | |||
p := &parser{r: stream} | |||
var inStats *stats.InPayload |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inPayload?
if err == io.EOF { | ||
break | ||
} | ||
return | ||
} | ||
} | ||
if err == io.EOF && stream.StatusCode() == codes.OK && inStats != nil { | ||
// TODO in the current implementation, inTrailer is handled before inStats. Fix the order if necessary. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deterministic?
if err == io.EOF { | ||
break | ||
} | ||
return | ||
} | ||
} | ||
if err == io.EOF && stream.StatusCode() == codes.OK && inStats != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put "inStats != nil" in the beginning of the condition.
var cbuf *bytes.Buffer | ||
var ( | ||
cbuf *bytes.Buffer | ||
outStats *stats.OutPayload |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
outPayload?
"google.golang.org/grpc/transport" | ||
) | ||
|
||
// recvResponse receives and parses an RPC response. | ||
// On error, it returns the error and indicates whether the call should be retried. | ||
// | ||
// TODO(zhaoq): Check whether the received message sequence is valid. | ||
func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) { | ||
// TODO ctx is userCtx, not stream.Context. It is used for stats handling. Change this later if necessary. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ctx is used for stats collection and processing. It is the context passed from the application.
// LocalAddr returns the local network address. | ||
LocalAddr() net.Addr | ||
// RemoteAddr returns the remote network address. | ||
RemoteAddr() net.Addr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These 2 APIs are not needed.
err = t.Write(stream, outBuf, opts) | ||
if outPayload != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we check the value of "err"? what if it is not nil?
Host: cc.authority, | ||
Method: method, | ||
Flush: desc.ServerStreams && desc.ClientStreams, | ||
FailFast: c.failFast, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it okay to put fail fast into stats.Begin so that we do not need to pass it into the transport layer? You can refactor NewClientStream and newClientStream somehow (e.g., move opt processing to NewClientStream).
@@ -275,6 +311,16 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { | |||
cs.mu.Unlock() | |||
} | |||
defer func() { | |||
if err != nil && stats.On() { | |||
// Only handle end stats if err != nil. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if err == nil?
Method: method, | ||
Host: cc.authority, | ||
Method: method, | ||
FailFast: c.failFast, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we put fail fast into Begin stats so that it is not needed to pass it into transport?
@@ -333,6 +350,11 @@ func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{ | |||
if err := c.Unmarshal(d, m); err != nil { | |||
return Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err) | |||
} | |||
if inPayload != nil { | |||
inPayload.Payload = m | |||
inPayload.Data = d |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a TODO to truncate the large payload. Also for outPayout.
@@ -547,10 +554,36 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str | |||
// the optimal option. | |||
grpclog.Fatalf("grpc: Server failed to encode response %v", err) | |||
} | |||
return t.Write(stream, p, opts) | |||
if outPayload != nil { | |||
outPayload.SentTime = time.Now() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this happen after write returns?
@@ -569,6 +600,13 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. | |||
p := &parser{r: stream} | |||
for { | |||
pf, req, err := p.recvMsg(s.opts.maxMsgSize) | |||
var inPayload *stats.InPayload |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if err is not nil?
} | ||
statusCode := codes.OK | ||
statusDesc := "" | ||
df := func(v interface{}) error { | ||
if inPayload != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider the error case
} | ||
return Errorf(codes.Internal, err.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this?
if pf == compressionMade { | ||
var err error | ||
req, err = s.opts.dc.Do(bytes.NewReader(req)) | ||
if err != nil { | ||
if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil { | ||
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err) | ||
} | ||
return err | ||
return Errorf(codes.Internal, err.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason for this change?
} | ||
stats.Handle(stream.Context(), end) | ||
} | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this part (from line 736-754 to the caller of processStreamingRPC and processUnaryRPC) since they are common for both cases?
ed61dca
to
433d594
Compare
99efc01
to
69b1142
Compare
69b1142
to
3de821b
Compare
@@ -274,6 +300,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { | |||
} | |||
cs.mu.Unlock() | |||
} | |||
// TODO generate stats.End if err != nil && err != io.EOF. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: Investigate how to signal the stats handling party.
Client: true, | ||
EndTime: time.Now(), | ||
Error: e, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if stats.On() {
end := &stats.End{
Client: true,
EndTime. time.Now(),
}
if err != nil && err != io.EOF {
end.Error = toRPCErr(err)
}
}
inHeader := &stats.InHeader{ | ||
FullMethod: s.method, | ||
RemoteAddr: t.conn.RemoteAddr(), | ||
LocalAddr: t.conn.LocalAddr(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a chance the t.conn is broken when these are called so that they cannot get the values. You should probably cache them when the connection is setup.
package grpc.testing; | ||
|
||
// Unary request. | ||
message SimpleRequest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not a unary request since it is also used in FullDuplexCall.
} | ||
|
||
// Unary response, as configured by the request. | ||
message SimpleResponse { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
) | ||
tc := testpb.NewTestServiceClient(te.clientConn()) | ||
if c.success { | ||
req = &testpb.SimpleRequest{Id: 1} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use errorID+1 instead?
outheader | ||
outtrailer | ||
errors | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
capitalize the 1st letter of the second word in the strings.
} | ||
} | ||
|
||
func TestServerStatsUnaryRPC(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way we can verify the contexts from the stats of a particular rpc are same?
afb1a30
to
6445ded
Compare
if !ok { | ||
grpclog.Fatalf("cannot get trace from context while EnableTracing == true") | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due the latest change, it seems this function can be simplified as:
tr, ok := trace.FromContext(stream.Context())
if !ok {
return nil
}
...
And you need to fix transport_test.go. |
- cloud.google.com/go: nothing of note. - github.com/cockroachdb/c-jemalloc: - Update upstream to 4.4.0 (cockroachdb/c-jemalloc@910bc3a) - github.com/cockroachdb/cmux: nothing of note. - github.com/coreos/etcd: nothing of note. - github.com/elastic/gosigar: nothing of note. - github.com/gogo/protobuf: - Allow usage of message types with customtype (gogo/protobuf@f9114da) - github.com/golang/protobuf: nothing of note. - github.com/google/go-github: nothing of note. - github.com/grpc-ecosystem/grpc-gateway: nothing of note. - github.com/lib/pq: - Use space to separate date and time portions of Time literals (lib/pq@8af01c1) - github.com/lightstep/lightstep-tracer-go: - Remove vendor directory (lightstep/lightstep-tracer-go@70a2e8a) - github.com/opentracing/opentracing-go: - Avoid panic marshaling nil error (opentracing/opentracing-go@5e5abf8) - github.com/prometheus/common: nothing of note. - golang.org/x/net: nothing of note. - golang.org/x/tools: nothing of note. - google.golang.org/grpc: - Add stats (grpc/grpc-go#961) - Add transport tap (grpc/grpc-go#968) - Add FailOnNonTempDialError (grpc/grpc-go#974 & grpc/grpc-go#985) - honnef.co/go/staticcheck: nothing of note. - honnef.co/go/unused: nothing of note.
Package stats exports RPC stats, including payload length and wire-length.
Package stats is for monitoring purpose only.
All stats APIs are experimental.