Skip to content

Commit

Permalink
add failfast option to stats
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Nov 3, 2016
1 parent d12c110 commit ca1e7b9
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 20 deletions.
5 changes: 3 additions & 2 deletions call.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,9 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
for {
// TODO(zhaoq): Need a formal spec of fail-fast.
callHdr := &transport.CallHdr{
Host: cc.authority,
Method: method,
Host: cc.authority,
Method: method,
FailFast: c.failFast,
}
if cc.dopts.cp != nil {
callHdr.SendCompress = cc.dopts.cp.Type()
Expand Down
2 changes: 2 additions & 0 deletions stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ type OutHeader struct {
LocalAddr net.Addr
// Encryption is encrypt method used in the RPC.
Encryption string
// Failfast indicates if this RPC is failfast.
FailFast bool
}

func (s *OutHeader) isStats() {}
Expand Down
48 changes: 33 additions & 15 deletions stats/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,44 +221,50 @@ func (te *test) clientConn() *grpc.ClientConn {
return te.cc
}

func (te *test) doUnaryCall(success bool) (*testpb.SimpleRequest, *testpb.SimpleResponse, error) {
type rpcConfig struct {
count int // Number of requests and responses for streaming RPCs.
success bool // Whether the RPC should succeed or return error.
failfast bool
}

func (te *test) doUnaryCall(c *rpcConfig) (*testpb.SimpleRequest, *testpb.SimpleResponse, error) {
var (
resp *testpb.SimpleResponse
req *testpb.SimpleRequest
err error
)
tc := testpb.NewTestServiceClient(te.clientConn())
if success {
if c.success {
req = &testpb.SimpleRequest{Id: 1}
} else {
req = &testpb.SimpleRequest{Id: errorID}
}
ctx := metadata.NewContext(context.Background(), testMetadata)

resp, err = tc.UnaryCall(ctx, req, grpc.FailFast(false))
resp, err = tc.UnaryCall(ctx, req, grpc.FailFast(c.failfast))
if err != nil {
return req, resp, err
}

return req, resp, err
}

func (te *test) doFullDuplexCallRoundtrip(count int, success bool) ([]*testpb.SimpleRequest, []*testpb.SimpleResponse, error) {
func (te *test) doFullDuplexCallRoundtrip(c *rpcConfig) ([]*testpb.SimpleRequest, []*testpb.SimpleResponse, error) {
var (
reqs []*testpb.SimpleRequest
resps []*testpb.SimpleResponse
err error
)
tc := testpb.NewTestServiceClient(te.clientConn())
stream, err := tc.FullDuplexCall(metadata.NewContext(context.Background(), testMetadata))
stream, err := tc.FullDuplexCall(metadata.NewContext(context.Background(), testMetadata), grpc.FailFast(c.failfast))
if err != nil {
return reqs, resps, err
}
var startID int32
if !success {
if !c.success {
startID = errorID
}
for i := 0; i < count; i++ {
for i := 0; i < c.count; i++ {
req := &testpb.SimpleRequest{
Id: int32(i) + startID,
}
Expand Down Expand Up @@ -291,6 +297,7 @@ type expectedData struct {
respIdx int
responses []*testpb.SimpleResponse
err error
failfast bool
}

type gotData struct {
Expand Down Expand Up @@ -428,6 +435,9 @@ func checkOutHeader(t *testing.T, d *gotData, e *expectedData) {
if st.Encryption != e.encryption {
t.Fatalf("st.Encryption = %v, want %v", st.Encryption, e.encryption)
}
if st.FailFast != e.failfast {
t.Fatalf("st.FailFast = %v, want %v", st.FailFast, e.failfast)
}
}
}

Expand Down Expand Up @@ -534,7 +544,7 @@ func TestServerStatsUnaryRPC(t *testing.T) {
te.startServer(&testServer{})
defer te.tearDown()

req, resp, err := te.doUnaryCall(true)
req, resp, err := te.doUnaryCall(&rpcConfig{success: true})
if err != nil {
t.Fatalf(err.Error())
}
Expand Down Expand Up @@ -585,7 +595,7 @@ func TestServerStatsUnaryRPCError(t *testing.T) {
te.startServer(&testServer{})
defer te.tearDown()

req, resp, err := te.doUnaryCall(false)
req, resp, err := te.doUnaryCall(&rpcConfig{success: false})
if err == nil {
t.Fatalf("got error <nil>; want <non-nil>")
}
Expand Down Expand Up @@ -638,7 +648,7 @@ func TestServerStatsStreamingRPC(t *testing.T) {
defer te.tearDown()

count := 5
reqs, resps, err := te.doFullDuplexCallRoundtrip(count, true)
reqs, resps, err := te.doFullDuplexCallRoundtrip(&rpcConfig{count: count, success: true})
if err == nil {
t.Fatalf(err.Error())
}
Expand Down Expand Up @@ -696,7 +706,7 @@ func TestServerStatsStreamingRPCError(t *testing.T) {
defer te.tearDown()

count := 5
reqs, resps, err := te.doFullDuplexCallRoundtrip(count, false)
reqs, resps, err := te.doFullDuplexCallRoundtrip(&rpcConfig{count: count, success: false})
if err == nil {
t.Fatalf("got error <nil>; want <non-nil>")
}
Expand Down Expand Up @@ -754,7 +764,8 @@ func TestClientStatsUnaryRPC(t *testing.T) {
te.startServer(&testServer{})
defer te.tearDown()

req, resp, err := te.doUnaryCall(true)
failfast := false
req, resp, err := te.doUnaryCall(&rpcConfig{success: true, failfast: failfast})
if err != nil {
t.Fatalf(err.Error())
}
Expand All @@ -765,6 +776,7 @@ func TestClientStatsUnaryRPC(t *testing.T) {
serverAddr: te.srvAddr,
requests: []*testpb.SimpleRequest{req},
responses: []*testpb.SimpleResponse{resp},
failfast: failfast,
}

checkFuncs := map[int]*checkFuncWithCount{
Expand Down Expand Up @@ -842,7 +854,8 @@ func TestClientStatsUnaryRPCError(t *testing.T) {
te.startServer(&testServer{})
defer te.tearDown()

req, resp, err := te.doUnaryCall(false)
failfast := true
req, resp, err := te.doUnaryCall(&rpcConfig{success: false, failfast: failfast})
if err == nil {
t.Fatalf("got error <nil>; want <non-nil>")
}
Expand All @@ -854,6 +867,7 @@ func TestClientStatsUnaryRPCError(t *testing.T) {
requests: []*testpb.SimpleRequest{req},
responses: []*testpb.SimpleResponse{resp},
err: err,
failfast: failfast,
}

checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
Expand Down Expand Up @@ -895,7 +909,8 @@ func TestClientStatsStreamingRPC(t *testing.T) {
defer te.tearDown()

count := 5
reqs, resps, err := te.doFullDuplexCallRoundtrip(count, true)
failfast := false
reqs, resps, err := te.doFullDuplexCallRoundtrip(&rpcConfig{count: count, success: true, failfast: failfast})
if err == nil {
t.Fatalf(err.Error())
}
Expand All @@ -907,6 +922,7 @@ func TestClientStatsStreamingRPC(t *testing.T) {
encryption: "gzip",
requests: reqs,
responses: resps,
failfast: failfast,
}

checkFuncs := map[int]*checkFuncWithCount{
Expand Down Expand Up @@ -985,7 +1001,8 @@ func TestClientStatsStreamingRPCError(t *testing.T) {
defer te.tearDown()

count := 5
reqs, resps, err := te.doFullDuplexCallRoundtrip(count, false)
failfast := true
reqs, resps, err := te.doFullDuplexCallRoundtrip(&rpcConfig{count: count, success: false, failfast: failfast})
if err == nil {
t.Fatalf("got error <nil>; want <non-nil>")
}
Expand All @@ -998,6 +1015,7 @@ func TestClientStatsStreamingRPCError(t *testing.T) {
requests: reqs,
responses: resps,
err: err,
failfast: failfast,
}

checkFuncs := map[int]*checkFuncWithCount{
Expand Down
7 changes: 4 additions & 3 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,10 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}
}
callHdr := &transport.CallHdr{
Host: cc.authority,
Method: method,
Flush: desc.ServerStreams && desc.ClientStreams,
Host: cc.authority,
Method: method,
Flush: desc.ServerStreams && desc.ClientStreams,
FailFast: c.failFast,
}
if cc.dopts.cp != nil {
callHdr.SendCompress = cc.dopts.cp.Type()
Expand Down
1 change: 1 addition & 0 deletions transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
RemoteAddr: t.RemoteAddr(),
LocalAddr: t.LocalAddr(),
Encryption: callHdr.SendCompress,
FailFast: callHdr.FailFast,
}
stats.Handle(s.Context(), outHeader)
}
Expand Down
3 changes: 3 additions & 0 deletions transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,9 @@ type CallHdr struct {
// only a hint. The transport may modify the flush decision
// for performance purposes.
Flush bool

// FailFast indicates whether the RPC is failfast.
FailFast bool
}

// ClientTransport is the common interface for all gRPC client-side transport
Expand Down

0 comments on commit ca1e7b9

Please sign in to comment.