Skip to content

Commit fbc419b

Browse files
committed
First commit:Benchmarks that runs server and client and separate processes.
1 parent bdb0727 commit fbc419b

File tree

4 files changed

+405
-143
lines changed

4 files changed

+405
-143
lines changed

benchmark/benchmark.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -227,9 +227,14 @@ func DoByteBufStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallCli
227227

228228
// NewClientConn creates a gRPC client connection to addr.
229229
func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn {
230+
return NewClientConnWithContext(context.Background(), addr, opts...)
231+
}
232+
233+
// NewClientConnWithContext creates a gRPC client connection to addr using ctx.
234+
func NewClientConnWithContext(ctx context.Context, addr string, opts ...grpc.DialOption) *grpc.ClientConn {
230235
opts = append(opts, grpc.WithWriteBufferSize(128*1024))
231236
opts = append(opts, grpc.WithReadBufferSize(128*1024))
232-
conn, err := grpc.Dial(addr, opts...)
237+
conn, err := grpc.DialContext(ctx, addr, opts...)
233238
if err != nil {
234239
grpclog.Fatalf("NewClientConn(%q) failed to create a ClientConn %v", addr, err)
235240
}

benchmark/client/main.go

+139-123
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@ package main
2020

2121
import (
2222
"flag"
23-
"math"
24-
"net"
25-
"net/http"
26-
_ "net/http/pprof"
23+
"fmt"
24+
"os"
25+
"runtime"
26+
"runtime/pprof"
2727
"sync"
2828
"time"
2929

3030
"golang.org/x/net/context"
31+
"golang.org/x/sys/unix"
3132
"google.golang.org/grpc"
3233
"google.golang.org/grpc/benchmark"
3334
testpb "google.golang.org/grpc/benchmark/grpc_testing"
@@ -36,145 +37,160 @@ import (
3637
)
3738

3839
var (
39-
server = flag.String("server", "", "The server address")
40-
maxConcurrentRPCs = flag.Int("max_concurrent_rpcs", 1, "The max number of concurrent RPCs")
41-
duration = flag.Int("duration", math.MaxInt32, "The duration in seconds to run the benchmark client")
42-
trace = flag.Bool("trace", true, "Whether tracing is on")
43-
rpcType = flag.Int("rpc_type", 0,
40+
port = flag.String("port", "50051", "Localhost port to connect to.")
41+
r = flag.Int("r", 1, "The number of concurrent RPCs on each connection.")
42+
c = flag.Int("c", 1, "The number of parallel connections.")
43+
w = flag.Int("w", 10, "Warm-up duration in seconds")
44+
d = flag.Int("d", 60, "Benchmark duration in seconds")
45+
rqSize = flag.Int("req", 1, "Request message size in bytes.")
46+
rspSize = flag.Int("resp", 1, "Response message size in bytes.")
47+
rpcType = flag.String("rpc_type", "unary",
4448
`Configure different client rpc type. Valid options are:
45-
0 : unary call;
46-
1 : streaming call.`)
49+
unary;
50+
streaming.`)
51+
testName = flag.String("test_name", "", "Name of the test used for creating profiles.")
52+
wg sync.WaitGroup
53+
hopts = stats.HistogramOptions{
54+
NumBuckets: 2495,
55+
GrowthFactor: .01,
56+
}
57+
mu sync.Mutex
58+
hists []*stats.Histogram
4759
)
4860

49-
func unaryCaller(client testpb.BenchmarkServiceClient) {
50-
benchmark.DoUnaryCall(client, 1, 1)
51-
}
52-
53-
func streamCaller(stream testpb.BenchmarkService_StreamingCallClient) {
54-
benchmark.DoStreamingRoundTrip(stream, 1, 1)
55-
}
56-
57-
func buildConnection() (s *stats.Stats, conn *grpc.ClientConn, tc testpb.BenchmarkServiceClient) {
58-
s = stats.NewStats(256)
59-
conn = benchmark.NewClientConn(*server)
60-
tc = testpb.NewBenchmarkServiceClient(conn)
61-
return s, conn, tc
61+
func main() {
62+
flag.Parse()
63+
if *testName == "" {
64+
grpclog.Fatalf("test_name not set")
65+
}
66+
req := &testpb.SimpleRequest{
67+
ResponseType: testpb.PayloadType_COMPRESSABLE,
68+
ResponseSize: int32(*rspSize),
69+
Payload: &testpb.Payload{
70+
Type: testpb.PayloadType_COMPRESSABLE,
71+
Body: make([]byte, *rqSize),
72+
},
73+
}
74+
connectCtx, connectCancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
75+
defer connectCancel()
76+
ccs := buildConnections(connectCtx)
77+
warmDeadline := time.Now().Add(time.Duration(*w) * time.Second)
78+
endDeadline := warmDeadline.Add(time.Duration(*d) * time.Second)
79+
cf, err := os.Create("/tmp/" + *testName + ".cpu")
80+
if err != nil {
81+
grpclog.Fatalf("Error creating file: %v", err)
82+
}
83+
defer cf.Close()
84+
pprof.StartCPUProfile(cf)
85+
cpuBeg := getCPUTime()
86+
for _, cc := range ccs {
87+
runWithConn(cc, req, warmDeadline, endDeadline)
88+
}
89+
wg.Wait()
90+
cpu := time.Duration(getCPUTime() - cpuBeg)
91+
pprof.StopCPUProfile()
92+
mf, err := os.Create("/tmp/" + *testName + ".mem")
93+
if err != nil {
94+
grpclog.Fatalf("Error creating file: %v", err)
95+
}
96+
defer mf.Close()
97+
runtime.GC() // materialize all statistics
98+
if err := pprof.WriteHeapProfile(mf); err != nil {
99+
grpclog.Fatalf("Error writing memory profile: %v", err)
100+
}
101+
hist := stats.NewHistogram(hopts)
102+
for _, h := range hists {
103+
hist.Merge(h)
104+
}
105+
parseHist(hist)
106+
fmt.Println("Client CPU utilization:", cpu)
107+
fmt.Println("Client CPU profile:", cf.Name())
108+
fmt.Println("Client Mem Profile:", mf.Name())
62109
}
63110

64-
func closeLoopUnary() {
65-
s, conn, tc := buildConnection()
66-
67-
for i := 0; i < 100; i++ {
68-
unaryCaller(tc)
111+
func buildConnections(ctx context.Context) []*grpc.ClientConn {
112+
ccs := make([]*grpc.ClientConn, *c)
113+
for i := range ccs {
114+
ccs[i] = benchmark.NewClientConnWithContext(ctx, "localhost:"+*port, grpc.WithInsecure(), grpc.WithBlock())
69115
}
70-
ch := make(chan int, *maxConcurrentRPCs*4)
71-
var (
72-
mu sync.Mutex
73-
wg sync.WaitGroup
74-
)
75-
wg.Add(*maxConcurrentRPCs)
116+
return ccs
117+
}
76118

77-
for i := 0; i < *maxConcurrentRPCs; i++ {
119+
func runWithConn(cc *grpc.ClientConn, req *testpb.SimpleRequest, warmDeadline, endDeadline time.Time) {
120+
for i := 0; i < *r; i++ {
121+
wg.Add(1)
78122
go func() {
79-
for range ch {
123+
defer wg.Done()
124+
caller := makeCaller(cc, req)
125+
hist := stats.NewHistogram(hopts)
126+
for {
80127
start := time.Now()
81-
unaryCaller(tc)
82-
elapse := time.Since(start)
83-
mu.Lock()
84-
s.Add(elapse)
85-
mu.Unlock()
128+
if start.After(endDeadline) {
129+
mu.Lock()
130+
hists = append(hists, hist)
131+
mu.Unlock()
132+
return
133+
}
134+
caller()
135+
elapsed := time.Since(start)
136+
if start.After(warmDeadline) {
137+
hist.Add(elapsed.Nanoseconds())
138+
}
86139
}
87-
wg.Done()
88140
}()
89141
}
90-
// Stop the client when time is up.
91-
done := make(chan struct{})
92-
go func() {
93-
<-time.After(time.Duration(*duration) * time.Second)
94-
close(done)
95-
}()
96-
ok := true
97-
for ok {
98-
select {
99-
case ch <- 0:
100-
case <-done:
101-
ok = false
102-
}
103-
}
104-
close(ch)
105-
wg.Wait()
106-
conn.Close()
107-
grpclog.Println(s.String())
108-
109142
}
110143

111-
func closeLoopStream() {
112-
s, conn, tc := buildConnection()
113-
ch := make(chan int, *maxConcurrentRPCs*4)
114-
var (
115-
mu sync.Mutex
116-
wg sync.WaitGroup
117-
)
118-
wg.Add(*maxConcurrentRPCs)
119-
// Distribute RPCs over maxConcurrentCalls workers.
120-
for i := 0; i < *maxConcurrentRPCs; i++ {
121-
go func() {
122-
stream, err := tc.StreamingCall(context.Background())
123-
if err != nil {
124-
grpclog.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
125-
}
126-
// Do some warm up.
127-
for i := 0; i < 100; i++ {
128-
streamCaller(stream)
129-
}
130-
for range ch {
131-
start := time.Now()
132-
streamCaller(stream)
133-
elapse := time.Since(start)
134-
mu.Lock()
135-
s.Add(elapse)
136-
mu.Unlock()
144+
func makeCaller(cc *grpc.ClientConn, req *testpb.SimpleRequest) func() {
145+
client := testpb.NewBenchmarkServiceClient(cc)
146+
if *rpcType == "unary" {
147+
return func() {
148+
if _, err := client.UnaryCall(context.Background(), req); err != nil {
149+
grpclog.Fatalf("RPC failed: %v", err)
137150
}
138-
wg.Done()
139-
}()
151+
}
152+
}
153+
stream, err := client.StreamingCall(context.Background())
154+
if err != nil {
155+
grpclog.Fatalf("RPC failed: %v", err)
140156
}
141-
// Stop the client when time is up.
142-
done := make(chan struct{})
143-
go func() {
144-
<-time.After(time.Duration(*duration) * time.Second)
145-
close(done)
146-
}()
147-
ok := true
148-
for ok {
149-
select {
150-
case ch <- 0:
151-
case <-done:
152-
ok = false
157+
return func() {
158+
if err := stream.Send(req); err != nil {
159+
grpclog.Fatalf("Streaming RPC failed to send: %v", err)
160+
}
161+
if _, err := stream.Recv(); err != nil {
162+
grpclog.Fatalf("Streaming RPC failed to read: %v", err)
153163
}
154164
}
155-
close(ch)
156-
wg.Wait()
157-
conn.Close()
158-
grpclog.Println(s.String())
165+
159166
}
160167

161-
func main() {
162-
flag.Parse()
163-
grpc.EnableTracing = *trace
164-
go func() {
165-
lis, err := net.Listen("tcp", ":0")
166-
if err != nil {
167-
grpclog.Fatalf("Failed to listen: %v", err)
168-
}
169-
grpclog.Println("Client profiling address: ", lis.Addr().String())
170-
if err := http.Serve(lis, nil); err != nil {
171-
grpclog.Fatalf("Failed to serve: %v", err)
168+
func parseHist(hist *stats.Histogram) {
169+
fmt.Println("qps:", float64(hist.Count)/float64(*d))
170+
fmt.Printf("Latency: (50/90/99 %%ile): %v/%v/%v\n",
171+
time.Duration(median(.5, hist)),
172+
time.Duration(median(.9, hist)),
173+
time.Duration(median(.99, hist)))
174+
}
175+
176+
func median(percentile float64, h *stats.Histogram) int64 {
177+
need := int64(float64(h.Count) * percentile)
178+
have := int64(0)
179+
for _, bucket := range h.Buckets {
180+
count := bucket.Count
181+
if have+count >= need {
182+
percent := float64(need-have) / float64(count)
183+
return int64((1.0-percent)*bucket.LowBound + percent*bucket.LowBound*(1.0+hopts.GrowthFactor))
172184
}
173-
}()
174-
switch *rpcType {
175-
case 0:
176-
closeLoopUnary()
177-
case 1:
178-
closeLoopStream()
185+
have += bucket.Count
186+
}
187+
panic("should have found a bound")
188+
}
189+
190+
func getCPUTime() int64 {
191+
var ts unix.Timespec
192+
if err := unix.ClockGettime(unix.CLOCK_PROCESS_CPUTIME_ID, &ts); err != nil {
193+
grpclog.Fatal(err)
179194
}
195+
return ts.Nano()
180196
}

0 commit comments

Comments
 (0)