Skip to content

Commit ede983e

Browse files
committed
First commit:Benchmarks that runs server and client and separate processes.
1 parent bdb0727 commit ede983e

File tree

6 files changed

+495
-237
lines changed

6 files changed

+495
-237
lines changed

benchmark/benchmark.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -227,9 +227,14 @@ func DoByteBufStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallCli
227227

228228
// NewClientConn creates a gRPC client connection to addr.
229229
func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn {
230+
return NewClientConnWithContext(context.Background(), addr, opts...)
231+
}
232+
233+
// NewClientConnWithContext creates a gRPC client connection to addr using ctx.
234+
func NewClientConnWithContext(ctx context.Context, addr string, opts ...grpc.DialOption) *grpc.ClientConn {
230235
opts = append(opts, grpc.WithWriteBufferSize(128*1024))
231236
opts = append(opts, grpc.WithReadBufferSize(128*1024))
232-
conn, err := grpc.Dial(addr, opts...)
237+
conn, err := grpc.DialContext(ctx, addr, opts...)
233238
if err != nil {
234239
grpclog.Fatalf("NewClientConn(%q) failed to create a ClientConn %v", addr, err)
235240
}

benchmark/client/benchclient.go

+196
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
/*
2+
*
3+
* Copyright 2017 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package main
20+
21+
import (
22+
"flag"
23+
"fmt"
24+
"os"
25+
"runtime"
26+
"runtime/pprof"
27+
"sync"
28+
"time"
29+
30+
"golang.org/x/net/context"
31+
"golang.org/x/sys/unix"
32+
"google.golang.org/grpc"
33+
"google.golang.org/grpc/benchmark"
34+
testpb "google.golang.org/grpc/benchmark/grpc_testing"
35+
"google.golang.org/grpc/benchmark/stats"
36+
"google.golang.org/grpc/grpclog"
37+
)
38+
39+
var (
40+
port = flag.String("port", "50051", "Localhost port to connect to.")
41+
r = flag.Int("r", 1, "The number of concurrent RPCs on each connection.")
42+
c = flag.Int("c", 1, "The number of parallel connections.")
43+
w = flag.Int("w", 10, "Warm-up duration in seconds")
44+
d = flag.Int("d", 60, "Benchmark duration in seconds")
45+
rqSize = flag.Int("req", 1, "Request message size in bytes.")
46+
rspSize = flag.Int("resp", 1, "Response message size in bytes.")
47+
rpcType = flag.String("rpc_type", "unary",
48+
`Configure different client rpc type. Valid options are:
49+
unary;
50+
streaming.`)
51+
testName = flag.String("test_name", "", "Name of the test used for creating profiles.")
52+
wg sync.WaitGroup
53+
hopts = stats.HistogramOptions{
54+
NumBuckets: 2495,
55+
GrowthFactor: .01,
56+
}
57+
mu sync.Mutex
58+
hists []*stats.Histogram
59+
)
60+
61+
func main() {
62+
flag.Parse()
63+
if *testName == "" {
64+
grpclog.Fatalf("test_name not set")
65+
}
66+
req := &testpb.SimpleRequest{
67+
ResponseType: testpb.PayloadType_COMPRESSABLE,
68+
ResponseSize: int32(*rspSize),
69+
Payload: &testpb.Payload{
70+
Type: testpb.PayloadType_COMPRESSABLE,
71+
Body: make([]byte, *rqSize),
72+
},
73+
}
74+
connectCtx, connectCancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
75+
defer connectCancel()
76+
ccs := buildConnections(connectCtx)
77+
warmDeadline := time.Now().Add(time.Duration(*w) * time.Second)
78+
endDeadline := warmDeadline.Add(time.Duration(*d) * time.Second)
79+
cf, err := os.Create("/tmp/" + *testName + ".cpu")
80+
if err != nil {
81+
grpclog.Fatalf("Error creating file: %v", err)
82+
}
83+
defer cf.Close()
84+
pprof.StartCPUProfile(cf)
85+
cpuBeg := getCPUTime()
86+
for _, cc := range ccs {
87+
runWithConn(cc, req, warmDeadline, endDeadline)
88+
}
89+
wg.Wait()
90+
cpu := time.Duration(getCPUTime() - cpuBeg)
91+
pprof.StopCPUProfile()
92+
mf, err := os.Create("/tmp/" + *testName + ".mem")
93+
if err != nil {
94+
grpclog.Fatalf("Error creating file: %v", err)
95+
}
96+
defer mf.Close()
97+
runtime.GC() // materialize all statistics
98+
if err := pprof.WriteHeapProfile(mf); err != nil {
99+
grpclog.Fatalf("Error writing memory profile: %v", err)
100+
}
101+
hist := stats.NewHistogram(hopts)
102+
for _, h := range hists {
103+
hist.Merge(h)
104+
}
105+
parseHist(hist)
106+
fmt.Println("Client CPU utilization:", cpu)
107+
fmt.Println("Client CPU profile:", cf.Name())
108+
fmt.Println("Client Mem Profile:", mf.Name())
109+
}
110+
111+
func buildConnections(ctx context.Context) []*grpc.ClientConn {
112+
ccs := make([]*grpc.ClientConn, *c)
113+
for i := range ccs {
114+
ccs[i] = benchmark.NewClientConnWithContext(ctx, "localhost:"+*port, grpc.WithInsecure(), grpc.WithBlock())
115+
}
116+
return ccs
117+
}
118+
119+
func runWithConn(cc *grpc.ClientConn, req *testpb.SimpleRequest, warmDeadline, endDeadline time.Time) {
120+
for i := 0; i < *r; i++ {
121+
wg.Add(1)
122+
go func() {
123+
defer wg.Done()
124+
caller := makeCaller(cc, req)
125+
hist := stats.NewHistogram(hopts)
126+
for {
127+
start := time.Now()
128+
if start.After(endDeadline) {
129+
mu.Lock()
130+
hists = append(hists, hist)
131+
mu.Unlock()
132+
return
133+
}
134+
caller()
135+
elapsed := time.Since(start)
136+
if start.After(warmDeadline) {
137+
hist.Add(elapsed.Nanoseconds())
138+
}
139+
}
140+
}()
141+
}
142+
}
143+
144+
func makeCaller(cc *grpc.ClientConn, req *testpb.SimpleRequest) func() {
145+
client := testpb.NewBenchmarkServiceClient(cc)
146+
if *rpcType == "unary" {
147+
return func() {
148+
if _, err := client.UnaryCall(context.Background(), req); err != nil {
149+
grpclog.Fatalf("RPC failed: %v", err)
150+
}
151+
}
152+
}
153+
stream, err := client.StreamingCall(context.Background())
154+
if err != nil {
155+
grpclog.Fatalf("RPC failed: %v", err)
156+
}
157+
return func() {
158+
if err := stream.Send(req); err != nil {
159+
grpclog.Fatalf("Streaming RPC failed to send: %v", err)
160+
}
161+
if _, err := stream.Recv(); err != nil {
162+
grpclog.Fatalf("Streaming RPC failed to read: %v", err)
163+
}
164+
}
165+
166+
}
167+
168+
func parseHist(hist *stats.Histogram) {
169+
fmt.Println("qps:", float64(hist.Count)/float64(*d))
170+
fmt.Printf("Latency: (50/90/99 %%ile): %v/%v/%v\n",
171+
time.Duration(median(.5, hist)),
172+
time.Duration(median(.9, hist)),
173+
time.Duration(median(.99, hist)))
174+
}
175+
176+
func median(percentile float64, h *stats.Histogram) int64 {
177+
need := int64(float64(h.Count) * percentile)
178+
have := int64(0)
179+
for _, bucket := range h.Buckets {
180+
count := bucket.Count
181+
if have+count >= need {
182+
percent := float64(need-have) / float64(count)
183+
return int64((1.0-percent)*bucket.LowBound + percent*bucket.LowBound*(1.0+hopts.GrowthFactor))
184+
}
185+
have += bucket.Count
186+
}
187+
panic("should have found a bound")
188+
}
189+
190+
func getCPUTime() int64 {
191+
var ts unix.Timespec
192+
if err := unix.ClockGettime(unix.CLOCK_PROCESS_CPUTIME_ID, &ts); err != nil {
193+
grpclog.Fatal(err)
194+
}
195+
return ts.Nano()
196+
}

benchmark/client/main.go

-180
This file was deleted.

0 commit comments

Comments
 (0)