Skip to content

Commit

Permalink
Support fault injection and multi-keyspace in pd-tso-bench
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing committed Jun 15, 2023
1 parent 6afbcec commit cbfe681
Showing 1 changed file with 67 additions and 24 deletions.
91 changes: 67 additions & 24 deletions tools/pd-tso-bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"flag"
"fmt"
"io"
"math/rand"
"net/http"
"net/http/httptest"
"os"
Expand Down Expand Up @@ -49,6 +50,9 @@ var (
keyPath = flag.String("key", "", "path of file that contains X509 key in PEM format")
maxBatchWaitInterval = flag.Duration("batch-interval", 0, "the max batch wait interval")
enableTSOFollowerProxy = flag.Bool("enable-tso-follower-proxy", false, "whether enable the TSO Follower Proxy")
enableFaultInjection = flag.Bool("enable-fault-injection", false, "whether enable fault injection")
faultInjectionRate = flag.Float64("fault-injection-rate", 0.01, "the failure rate [0.0001, 1]. 0.01 means 1% failure rate")
keyspace = flag.Uint("keyspace", 0, "the id of the keyspac to access")
wg sync.WaitGroup
)

Expand Down Expand Up @@ -90,19 +94,7 @@ func bench(mainCtx context.Context) {
fmt.Printf("Create %d client(s) for benchmark\n", *clientNumber)
pdClients := make([]pd.Client, *clientNumber)
for idx := range pdClients {
var (
pdCli pd.Client
err error
)

pdCli, err = pd.NewClientWithContext(mainCtx, []string{*pdAddrs}, pd.SecurityOption{
CAPath: *caPath,
CertPath: *certPath,
KeyPath: *keyPath,
})

pdCli.UpdateOption(pd.MaxTSOBatchWaitInterval, *maxBatchWaitInterval)
pdCli.UpdateOption(pd.EnableTSOFollowerProxy, *enableTSOFollowerProxy)
pdCli, err := createPDClient(mainCtx)
if err != nil {
log.Fatal(fmt.Sprintf("create pd client #%d failed: %v", idx, err))
}
Expand All @@ -120,10 +112,18 @@ func bench(mainCtx context.Context) {

durCh := make(chan time.Duration, 2*(*concurrency)*(*clientNumber))

wg.Add((*concurrency) * (*clientNumber))
for _, pdCli := range pdClients {
for i := 0; i < *concurrency; i++ {
go reqWorker(ctx, pdCli, durCh)
if *enableFaultInjection {
fmt.Printf("Enable fault injection, failure rate: %f\n", *faultInjectionRate)
wg.Add(*clientNumber)
for i := 0; i < *clientNumber; i++ {
go reqWorker(ctx, pdClients, i, durCh)
}
} else {
wg.Add((*concurrency) * (*clientNumber))
for i := 0; i < *clientNumber; i++ {
for i := 0; i < *concurrency; i++ {
go reqWorker(ctx, pdClients, i, durCh)
}
}
}

Expand Down Expand Up @@ -338,22 +338,41 @@ func (s *stats) calculate(count int) float64 {
return float64(count) * 100 / float64(s.count)
}

func reqWorker(ctx context.Context, pdCli pd.Client, durCh chan time.Duration) {
func reqWorker(ctx context.Context, pdClients []pd.Client, clientIdx int, durCh chan time.Duration) {
defer wg.Done()

reqCtx, cancel := context.WithCancel(ctx)
defer cancel()
var (
err error
maxRetryTime int = 50
maxTSOSendIntervalSeconds int = 3
sleepIntervalOnFailure time.Duration = 100 * time.Millisecond
)
pdCli := pdClients[clientIdx]

for {
if *enableFaultInjection && shouldInjectFault() {
pdCli.Close()
pdCli, err = createPDClient(ctx)
if err != nil {
log.Error(fmt.Sprintf("re-create pd client #%d failed: %v", clientIdx, err))
select {
case <-reqCtx.Done():
case <-time.After(100 * time.Millisecond):
}
continue
}
pdClients[clientIdx] = pdCli
}

start := time.Now()

var (
i int32
err error
maxRetryTime int32 = 50
sleepIntervalOnFailure time.Duration = 100 * time.Millisecond
)
i := 0
for ; i < maxRetryTime; i++ {
if shouldInjectFault() {
time.Sleep(time.Duration(rand.Intn(maxTSOSendIntervalSeconds)) * time.Second)
}
_, _, err = pdCli.GetLocalTS(reqCtx, *dcLocation)
if errors.Cause(err) == context.Canceled {
return
Expand All @@ -376,3 +395,27 @@ func reqWorker(ctx context.Context, pdCli pd.Client, durCh chan time.Duration) {
}
}
}

func createPDClient(ctx context.Context) (pd.Client, error) {
var (
pdCli pd.Client
err error
)

pdCli, err = pd.NewClientWithKeyspace(ctx, uint32(*keyspace), []string{*pdAddrs}, pd.SecurityOption{
CAPath: *caPath,
CertPath: *certPath,
KeyPath: *keyPath,
})
if err != nil {
return nil, err
}

pdCli.UpdateOption(pd.MaxTSOBatchWaitInterval, *maxBatchWaitInterval)
pdCli.UpdateOption(pd.EnableTSOFollowerProxy, *enableTSOFollowerProxy)
return pdCli, err
}

func shouldInjectFault() bool {
return rand.Intn(10000) < int(*faultInjectionRate*10000)
}

0 comments on commit cbfe681

Please sign in to comment.