diff --git a/examples/benchtool/config/global.go b/examples/benchtool/config/global.go new file mode 100644 index 0000000000..c668f719ab --- /dev/null +++ b/examples/benchtool/config/global.go @@ -0,0 +1,152 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "fmt" + "os" + "strconv" + "sync" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/spf13/cobra" + "github.com/tikv/client-go/v2/config" + "go.uber.org/zap" +) + +var initOnce = sync.Once{} + +const ( + WorkloadColumnFamilyDefault = "default" + WorkloadColumnFamilyWrite = "write" + WorkloadColumnFamilyLock = "lock" +) + +type GlobalConfig struct { + ips []string + port int + host string + + Threads int + TotalTime time.Duration + TotalCount int + DropData bool + IgnoreError bool + OutputInterval time.Duration + Silence bool + OutputStyle string + + Targets []string + Security config.Security + + // for log + LogLevel string + LogFile string +} + +func (c *GlobalConfig) ParsePdAddrs() error { + if len(c.ips) == 0 && c.host == "" { + return fmt.Errorf("PD address is empty") + } + targets := make([]string, 0, len(c.ips)) + for _, host := range c.ips { + targets = append(targets, host+":"+strconv.Itoa(c.port)) + } + if c.host != "" { + targets = append(targets, c.host) + } + c.Targets = targets + return nil +} + +func (c *GlobalConfig) Format() string { + return fmt.Sprintf("Host: %s, IPs: %v, Port: %d, Threads: %d, TotalTime: %v, TotalCount: %d, DropData: %t, IgnoreError: %t, OutputInterval: %v, Silence: %t, OutputStyle: %s", + c.host, c.ips, c.port, c.Threads, c.TotalTime, c.TotalCount, c.DropData, c.IgnoreError, c.OutputInterval, c.Silence, c.OutputStyle) +} + +func (c *GlobalConfig) InitLogger() (err error) { + initOnce.Do(func() { + // Initialize the logger. + conf := &log.Config{ + Level: c.LogLevel, + File: log.FileLogConfig{ + Filename: c.LogFile, + MaxSize: 256, + }, + } + lg, p, e := log.InitLogger(conf) + if e != nil { + err = e + return + } + log.ReplaceGlobals(lg, p) + }) + return errors.Trace(err) +} + +type CommandLineParser struct { + command *cobra.Command + config *GlobalConfig +} + +func NewCommandLineParser() *CommandLineParser { + return &CommandLineParser{} +} + +func (p *CommandLineParser) Initialize() { + var globalCfg = &GlobalConfig{} + var rootCmd = &cobra.Command{ + Use: "bench-tool", + Short: "Benchmark tikv with different workloads", + PersistentPreRun: func(cmd *cobra.Command, args []string) { + if err := globalCfg.InitLogger(); err != nil { + log.Error("InitLogger failed", zap.Error(err)) + } + }, + } + + rootCmd.PersistentFlags().StringSliceVarP(&globalCfg.ips, "ip", "I", []string{"127.0.0.1"}, "PD ips") + rootCmd.PersistentFlags().IntVarP(&globalCfg.port, "port", "P", 2379, "PD port") + rootCmd.PersistentFlags().StringVar(&globalCfg.host, "host", "127.0.0.1:2379", "PD address") + + rootCmd.PersistentFlags().IntVarP(&globalCfg.Threads, "threads", "T", 1, "Thread concurrency") + rootCmd.PersistentFlags().DurationVar(&globalCfg.TotalTime, "time", 1<<63-1, "Total execution time") + rootCmd.PersistentFlags().IntVar(&globalCfg.TotalCount, "count", 0, "Total execution count, 0 means infinite") + rootCmd.PersistentFlags().BoolVar(&globalCfg.DropData, "dropdata", false, "Cleanup data before prepare") + rootCmd.PersistentFlags().BoolVar(&globalCfg.IgnoreError, "ignore-error", false, "Ignore error when running workload") + rootCmd.PersistentFlags().BoolVar(&globalCfg.Silence, "silence", false, "Don't print error when running workload") + rootCmd.PersistentFlags().DurationVar(&globalCfg.OutputInterval, "interval", 10*time.Second, "Output interval time") + rootCmd.PersistentFlags().StringVar(&globalCfg.OutputStyle, "output", "plain", "output style, valid values can be { plain | table | json }") + + rootCmd.PersistentFlags().StringVar(&globalCfg.LogFile, "log-file", "record.log", "filename of the log file") + rootCmd.PersistentFlags().StringVar(&globalCfg.LogLevel, "log-level", "info", "log level { debug | info | warn | error | fatal }") + + rootCmd.SetOut(os.Stdout) + + cobra.EnablePrefixMatching = true + + p.command = rootCmd + p.config = globalCfg +} + +func (p *CommandLineParser) GetConfig() *GlobalConfig { + return p.config +} + +func (p *CommandLineParser) GetCommand() *cobra.Command { + return p.command +} diff --git a/examples/benchtool/config/pattern.go b/examples/benchtool/config/pattern.go new file mode 100644 index 0000000000..1877386d1a --- /dev/null +++ b/examples/benchtool/config/pattern.go @@ -0,0 +1,154 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "fmt" + "os" + + "gopkg.in/yaml.v3" +) + +const ( + WorkloadTypeHybrid = "hybrid" +) + +// TODO: convert the txnConfig and rawkvConfig to interfaces +type SubPatternConfig struct { + txnConfig *TxnKVConfig + rawkvConfig *RawKVConfig + workloads []string + name string +} + +func (s *SubPatternConfig) GetName() string { + return s.name +} + +func (s *SubPatternConfig) GetWorkloads() []string { + return s.workloads +} + +func (s *SubPatternConfig) GetTxnKVConfig() *TxnKVConfig { + return s.txnConfig +} + +func (s *SubPatternConfig) GetRawKVConfig() *RawKVConfig { + return s.rawkvConfig +} + +type SubPattern struct { + Name string `yaml:"name,omitempty"` + WorkloadType string `yaml:"workload_type,omitempty"` + Workloads []string `yaml:"workloads,omitempty"` + + // for txnkv + Mode string `yaml:"mode,omitempty"` + LockTimeout int `yaml:"lock_timeout"` + ColumnSize int `yaml:"column_size"` + TxnSize int `yaml:"txn_size"` + // common + Count int `yaml:"count"` + KeyPrefix string `yaml:"key_prefix,omitempty"` + KeySize int `yaml:"key_size"` + ValueSize int `yaml:"value_size"` + BatchSize int `yaml:"batch_size"` + Threads int `yaml:"threads"` + Randomize bool `yaml:"random"` +} + +func (s *SubPattern) ConvertBasedOn(global *GlobalConfig) *SubPatternConfig { + // Invalid workloads + if s.Workloads == nil { + return nil + } + + globalCfg := &GlobalConfig{} + if global != nil { + globalCfg = global + } + globalCfg.TotalCount = s.Count + globalCfg.Threads = s.Threads + + switch s.WorkloadType { + case WorkloadTypeTxnKV: + config := &TxnKVConfig{ + TxnMode: s.Mode, + LockTimeout: s.LockTimeout, + // KeyPrefix: s.key_prefix, + KeySize: s.KeySize, + ValueSize: s.ValueSize, + ColumnSize: s.ColumnSize, + TxnSize: s.TxnSize, + } + config.Global = globalCfg + return &SubPatternConfig{ + txnConfig: config, + workloads: s.Workloads, + name: s.Name, + } + case WorkloadTypeRawKV: + config := &RawKVConfig{ + // KeyPrefix: s.key_prefix, + KeySize: s.KeySize, + BatchSize: s.BatchSize, + ValueSize: s.ValueSize, + Randomize: s.Randomize, + } + config.Global = globalCfg + return &SubPatternConfig{ + rawkvConfig: config, + workloads: s.Workloads, + name: s.Name, + } + } + return nil +} + +type PatternsConfig struct { + Items []*SubPattern `yaml:"patterns"` + + FilePath string + + Plans []*SubPatternConfig + Global *GlobalConfig +} + +// Parse parses the yaml file. +func (p *PatternsConfig) Parse() error { + data, err := os.ReadFile(p.FilePath) + if err != nil { + return err + } + err = yaml.Unmarshal(data, p) + if err != nil { + return err + } + p.Plans = make([]*SubPatternConfig, 0, len(p.Items)) + for _, item := range p.Items { + p.Plans = append(p.Plans, item.ConvertBasedOn(p.Global)) + } + return nil +} + +func (p *PatternsConfig) Validate() error { + if p.Global == nil { + return fmt.Errorf("global config is missing") + } + if p.Items == nil { + return fmt.Errorf("patterns config is missing") + } + return p.Global.ParsePdAddrs() +} diff --git a/examples/benchtool/config/rawkv.go b/examples/benchtool/config/rawkv.go new file mode 100644 index 0000000000..6572841cb0 --- /dev/null +++ b/examples/benchtool/config/rawkv.go @@ -0,0 +1,66 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "fmt" + "time" +) + +const ( + WorkloadTypeRawKV = "rawkv" +) + +const ( + RawKVCommandTypePut = "put" + RawKVCommandTypeGet = "get" + RawKVCommandTypeDel = "del" + RawKVCommandTypeBatchPut = "batch_put" + RawKVCommandTypeBatchGet = "batch_get" + RawKVCommandTypeBatchDel = "batch_del" + RawKVCommandTypeScan = "scan" + RawKVCommandTypeReverseScan = "reverse_scan" + RawKVCommandTypeCAS = "cas" + + RawKVCommandDefaultKey = "rawkv_key" + RawKVCommandDefaultEndKey = "rawkv_key`" + RawKVCommandDefaultValue = "rawkv_value" +) + +type RawKVConfig struct { + KeySize int + ValueSize int + BatchSize int + + ColumnFamily string + CommandType string + PrepareRetryCount int + PrepareRetryInterval time.Duration + Randomize bool + + Global *GlobalConfig +} + +func (c *RawKVConfig) Validate() error { + if c.KeySize <= 0 || c.ValueSize <= 0 { + return fmt.Errorf("key size or value size must be greater than 0") + } + if c.ColumnFamily != WorkloadColumnFamilyDefault && + c.ColumnFamily != WorkloadColumnFamilyWrite && + c.ColumnFamily != WorkloadColumnFamilyLock { + return fmt.Errorf("invalid column family: %s", c.ColumnFamily) + } + return c.Global.ParsePdAddrs() +} diff --git a/examples/benchtool/config/txnkv.go b/examples/benchtool/config/txnkv.go new file mode 100644 index 0000000000..86d8e4c0fc --- /dev/null +++ b/examples/benchtool/config/txnkv.go @@ -0,0 +1,54 @@ +package config + +import ( + "benchtool/utils" + "fmt" + "time" +) + +const ( + WorkloadTypeTxnKV = "txnkv" +) + +const ( + TxnKVCommandTypeBegin = "begin" + TxnKVCommandTypeCommit = "commit" + TxnKVCommandTypeRollback = "rollback" + TxnKVCommandTypeWrite = "write" + TxnKVCommandTypeSet = "set" + TxnKVCommandTypeDel = "delete" + TxnKVCommandTypeRead = "read" + + TxnKVCommandDefaultKey = "txnkv_key" + TxnKVCommandDefaultEndKey = "txnkv_key`" + TxnKVCommandDefaultValue = "txnkv_value" + + TxnKVModeDefault = "2PC" + TxnKVMode1PC = "1PC" + TxnKVModeAsyncCommit = "async-commit" +) + +type TxnKVConfig struct { + KeySize int + ValueSize int + ColumnSize int + TxnSize int + + PrepareRetryCount int + PrepareRetryInterval time.Duration + ReadWriteRatio *utils.ReadWriteRatio + TxnMode string + LockTimeout int + + Global *GlobalConfig +} + +func (c *TxnKVConfig) Validate() error { + if c.KeySize <= 0 || c.ValueSize <= 0 { + return fmt.Errorf("key size or value size must be greater than 0") + } + if err := c.ReadWriteRatio.ParseRatio(); err != nil { + return fmt.Errorf("parse read-write-ratio failed: %v", err) + } + return c.Global.ParsePdAddrs() +} diff --git a/examples/benchtool/go.mod b/examples/benchtool/go.mod new file mode 100644 index 0000000000..ddcbebd62c --- /dev/null +++ b/examples/benchtool/go.mod @@ -0,0 +1,64 @@ +module benchtool + +go 1.21.0 + +require ( + github.com/HdrHistogram/hdrhistogram-go v1.1.2 + github.com/olekukonko/tablewriter v0.0.5 + github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c + github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 + github.com/spf13/cobra v1.8.0 + github.com/stretchr/testify v1.9.0 + github.com/tikv/client-go/v2 v2.0.7 + go.uber.org/zap v1.24.0 + gopkg.in/yaml.v3 v3.0.1 + gotest.tools/v3 v3.5.1 +) + +require ( + github.com/benbjohnson/clock v1.3.0 // indirect + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/coreos/go-semver v0.3.0 // indirect + github.com/coreos/go-systemd/v22 v22.3.2 // indirect + github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect + github.com/elastic/gosigar v0.14.2 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/protobuf v1.5.3 // indirect + github.com/google/btree v1.1.2 // indirect + github.com/google/go-cmp v0.5.9 // indirect + github.com/google/uuid v1.3.0 // indirect + github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/mattn/go-runewidth v0.0.9 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect + github.com/opentracing/opentracing-go v1.2.0 // indirect + github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect + github.com/pingcap/kvproto v0.0.0-20230403051650-e166ae588106 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_golang v1.14.0 // indirect + github.com/prometheus/client_model v0.3.0 // indirect + github.com/prometheus/common v0.39.0 // indirect + github.com/prometheus/procfs v0.9.0 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect + github.com/spf13/pflag v1.0.5 // indirect + github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect + github.com/tikv/pd/client v0.0.0-20230329114254-1948c247c2b1 // indirect + github.com/twmb/murmur3 v1.1.3 // indirect + go.etcd.io/etcd/api/v3 v3.5.2 // indirect + go.etcd.io/etcd/client/pkg/v3 v3.5.2 // indirect + go.etcd.io/etcd/client/v3 v3.5.2 // indirect + go.uber.org/atomic v1.10.0 // indirect + go.uber.org/multierr v1.9.0 // indirect + golang.org/x/net v0.8.0 // indirect + golang.org/x/sync v0.1.0 // indirect + golang.org/x/sys v0.6.0 // indirect + golang.org/x/text v0.8.0 // indirect + google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633 // indirect + google.golang.org/grpc v1.54.0 // indirect + google.golang.org/protobuf v1.30.0 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect +) diff --git a/examples/benchtool/main.go b/examples/benchtool/main.go new file mode 100644 index 0000000000..05384305a3 --- /dev/null +++ b/examples/benchtool/main.go @@ -0,0 +1,74 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "benchtool/config" + "benchtool/workloads" + "benchtool/workloads/patterns" + "benchtool/workloads/rawkv" + "benchtool/workloads/txnkv" + "context" + "fmt" + "os" + "os/signal" + "syscall" + "time" + + "github.com/spf13/cobra" +) + +func main() { + cobra.EnablePrefixMatching = true + + commandLineParser := config.NewCommandLineParser() + commandLineParser.Initialize() + + // Register all workloads + rawkv.Register(commandLineParser) + txnkv.Register(commandLineParser) + patterns.Register(commandLineParser) + + var cancel context.CancelFunc + workloads.GlobalContext, cancel = context.WithCancel(context.Background()) + + sc := make(chan os.Signal, 1) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + + // Capture signals to cancel the context + go func() { + sig := <-sc + fmt.Printf("\nGot signal [%v] to exit.\n", sig) + cancel() + + select { + case <-sc: + // send signal again, return directly + fmt.Printf("\nGot signal [%v] again to exit.\n", sig) + os.Exit(1) + case <-time.After(10 * time.Second): + fmt.Print("\nWait 10s for closed, force exit\n") + os.Exit(1) + default: + return + } + }() + commandLineParser.GetCommand().Execute() + cancel() +} diff --git a/examples/benchtool/utils/statistics/histogram.go b/examples/benchtool/utils/statistics/histogram.go new file mode 100644 index 0000000000..1dcbf77e2b --- /dev/null +++ b/examples/benchtool/utils/statistics/histogram.go @@ -0,0 +1,163 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +import ( + "fmt" + "sort" + "strings" + "sync" + "time" + + "benchtool/utils" + + "github.com/HdrHistogram/hdrhistogram-go" +) + +// Format: "Elapsed" - "Sum" - "Count" - "Ops" - "Avg" - "P50" - "P90" - "P95" - "P99" - "P999" - "P9999" - "Min" - "Max +var WorkloadFormat = []string{"Elapsed(s)", "Sum", "Count", "Ops", "Avg(ms)", "50th(ms)", "90th(ms)", "95th(ms)", "99th(ms)", "99.9th(ms)", "99.99th(ms)", "Min(ms)", "Max(ms)"} + +type RuntimeStatistics struct { + elapsed float64 + + // Operation statistics + sum float64 + count int64 + ops float64 + + // Execution time statistics + p50 float64 + p90 float64 + p95 float64 + p99 float64 + p999 float64 + p9999 float64 + avg float64 + min float64 + max float64 +} + +type PerfHistogram struct { + m sync.RWMutex + + startTime time.Time + sum int64 + *hdrhistogram.Histogram +} + +func NewPerfHistogram(minLat, maxLat time.Duration, sf int) *PerfHistogram { + return &PerfHistogram{Histogram: hdrhistogram.New(minLat.Nanoseconds(), maxLat.Nanoseconds(), sf), startTime: time.Now()} +} + +func (h *PerfHistogram) Record(rawLatency time.Duration) { + latency := rawLatency + low := time.Duration(h.LowestTrackableValue()) + high := time.Duration(h.HighestTrackableValue()) + if latency < low { + latency = low + } else if latency > high { + latency = high + } + + h.m.Lock() + err := h.RecordValue(latency.Nanoseconds()) + h.sum += rawLatency.Nanoseconds() + h.m.Unlock() + if err != nil { + panic(fmt.Sprintf(`recording value error: %s`, err)) + } +} + +func (h *PerfHistogram) Empty() bool { + h.m.Lock() + defer h.m.Unlock() + return h.TotalCount() == 0 +} + +func (h *PerfHistogram) Format() []string { + res := h.GetRuntimeStatistics() + + // Define the regular expression pattern + // pattern => `([\w\s]+)\s+-\sElapsed\(s\):\s([\d.]+),\sSum:\s([\d.]+),\sCount:\s(\d+),\sOps:\s([\d.]+),\sAvg\(ms\):\s([\d.]+),\s50th\(ms\):\s([\d.]+),\s90th\(ms\):\s([\d.]+),\s95th\(ms\):\s([\d.]+),\s99th\(ms\):\s([\d.]+),\s99.9th\(ms\):\s([\d.]+),\s99.99th\(ms\):\s([\d.]+),\sMin\(ms\):\s([\d.]+),\sMax\(ms\):\s([\d.]+)` + // Format: "Elapsed(s)" - "Sum" - "Count" - "Ops" - "Avg" - "P50" - "P90" - "P95" - "P99" - "P999" - "P9999" - "Min" - "Max + return []string{ + utils.FloatToString(res.elapsed), + utils.FloatToString(res.sum), + utils.IntToString(res.count), + utils.FloatToString(res.ops * 60), + utils.FloatToString(res.avg), + utils.FloatToString(res.p50), + utils.FloatToString(res.p90), + utils.FloatToString(res.p95), + utils.FloatToString(res.p99), + utils.FloatToString(res.p999), + utils.FloatToString(res.p9999), + utils.FloatToString(res.min), + utils.FloatToString(res.max), + } +} + +func (h *PerfHistogram) GetRuntimeStatistics() RuntimeStatistics { + h.m.RLock() + defer h.m.RUnlock() + sum := time.Duration(h.sum).Seconds() * 1000 + avg := time.Duration(h.Mean()).Seconds() * 1000 + elapsed := time.Since(h.startTime).Seconds() + count := h.TotalCount() + ops := float64(count) / elapsed + info := RuntimeStatistics{ + elapsed: elapsed, + sum: sum, + count: count, + ops: ops, + avg: avg, + p50: time.Duration(h.ValueAtQuantile(50)).Seconds() * 1000, + p90: time.Duration(h.ValueAtQuantile(90)).Seconds() * 1000, + p95: time.Duration(h.ValueAtQuantile(95)).Seconds() * 1000, + p99: time.Duration(h.ValueAtQuantile(99)).Seconds() * 1000, + p999: time.Duration(h.ValueAtQuantile(99.9)).Seconds() * 1000, + p9999: time.Duration(h.ValueAtQuantile(99.99)).Seconds() * 1000, + min: time.Duration(h.Min()).Seconds() * 1000, + max: time.Duration(h.Max()).Seconds() * 1000, + } + return info +} + +func HistogramOutputFunc(outputStyle string, prefix string, perfHist map[string]*PerfHistogram) { + keys := make([]string, 0, len(perfHist)) + for k := range perfHist { + keys = append(keys, k) + } + sort.Strings(keys) + + lines := [][]string{} + for _, op := range keys { + hist := perfHist[op] + if !hist.Empty() { + op = strings.ToUpper(op) + line := []string{prefix, op} + line = append(line, hist.Format()...) + lines = append(lines, line) + } + } + switch outputStyle { + case utils.OutputStylePlain: + utils.RenderString("%s%-6s - %s\n", WorkloadFormat, lines) + case utils.OutputStyleTable: + utils.RenderTable(WorkloadFormat, lines) + case utils.OutputStyleJson: + utils.RenderJson(WorkloadFormat, lines) + } +} diff --git a/examples/benchtool/utils/statistics/historgram_test.go b/examples/benchtool/utils/statistics/historgram_test.go new file mode 100644 index 0000000000..5aca6e0776 --- /dev/null +++ b/examples/benchtool/utils/statistics/historgram_test.go @@ -0,0 +1,32 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +import ( + "math/rand" + "testing" + "time" +) + +func TestHist(t *testing.T) { + h := NewPerfHistogram(1*time.Millisecond, 20*time.Minute, 1) + for i := 0; i < 10000; i++ { + n := rand.Intn(15020) + h.Record(time.Millisecond * time.Duration(n)) + } + h.Record(time.Minute * 9) + h.Record(time.Minute * 100) + t.Logf("%+v", h.Format()) +} diff --git a/examples/benchtool/utils/statistics/misc.go b/examples/benchtool/utils/statistics/misc.go new file mode 100644 index 0000000000..3357280360 --- /dev/null +++ b/examples/benchtool/utils/statistics/misc.go @@ -0,0 +1,121 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +import ( + "sync" + "time" +) + +const ( + DefaultMinLatency = 1 * time.Millisecond + DefaultMaxLatency = 16 * time.Second + + DefaultHistogramSize = 16 +) + +type PerfProfile struct { + sync.RWMutex + + MinLatency time.Duration + MaxLatency time.Duration + SigFigs int + PeriodicalPerfHist map[string]*PerfHistogram + SummaryPerfHist map[string]*PerfHistogram +} + +func NewPerfProfile() *PerfProfile { + return &PerfProfile{ + MinLatency: DefaultMinLatency, + MaxLatency: DefaultMaxLatency, + SigFigs: 1, + PeriodicalPerfHist: make(map[string]*PerfHistogram, DefaultHistogramSize), + SummaryPerfHist: make(map[string]*PerfHistogram, DefaultHistogramSize), + } +} + +func (p *PerfProfile) Record(op string, latency time.Duration) { + p.Lock() + defer p.Unlock() + + if _, ok := p.PeriodicalPerfHist[op]; !ok { + p.PeriodicalPerfHist[op] = NewPerfHistogram(p.MinLatency, p.MaxLatency, p.SigFigs) + } + p.PeriodicalPerfHist[op].Record(latency) + if _, ok := p.SummaryPerfHist[op]; !ok { + p.SummaryPerfHist[op] = NewPerfHistogram(p.MinLatency, p.MaxLatency, p.SigFigs) + } + p.SummaryPerfHist[op].Record(latency) +} + +func (p *PerfProfile) Get(op string, sum bool) *PerfHistogram { + histMap := p.PeriodicalPerfHist + if sum { + histMap = p.SummaryPerfHist + } + + p.RLock() + hist, ok := histMap[op] + p.RUnlock() + if !ok { + perfHist := NewPerfHistogram(p.MinLatency, p.MaxLatency, p.SigFigs) + p.Lock() + histMap[op] = perfHist + hist = histMap[op] + p.Unlock() + } + return hist +} + +func (p *PerfProfile) TakePeriodHist() map[string]*PerfHistogram { + p.Lock() + defer p.Unlock() + periodicalHist := make(map[string]*PerfHistogram, len(p.PeriodicalPerfHist)) + swapOutHist := p.PeriodicalPerfHist + p.PeriodicalPerfHist = periodicalHist + return swapOutHist +} + +// Prints the PerfProfile. +func (p *PerfProfile) PrintFmt(ifSummaryReport bool, outputStyle string, outputFunc func(string, string, map[string]*PerfHistogram)) { + if ifSummaryReport { + p.RLock() + defer p.RUnlock() + outputFunc(outputStyle, "[Summary] ", p.SummaryPerfHist) + return + } + // Clear current PerfHistogram and print current PerfHistogram. + periodicalHist := p.TakePeriodHist() + p.RLock() + defer p.RUnlock() + outputFunc(outputStyle, "[Current] ", periodicalHist) +} + +func (p *PerfProfile) Clear() { + p.Lock() + defer p.Unlock() + + perfHist := p.PeriodicalPerfHist + for k := range perfHist { + delete(perfHist, k) + } + perfHist = p.SummaryPerfHist + for k := range perfHist { + delete(perfHist, k) + } + + p.PeriodicalPerfHist = make(map[string]*PerfHistogram, DefaultHistogramSize) + p.SummaryPerfHist = make(map[string]*PerfHistogram, DefaultHistogramSize) +} diff --git a/examples/benchtool/utils/util.go b/examples/benchtool/utils/util.go new file mode 100644 index 0000000000..aa993ab9b0 --- /dev/null +++ b/examples/benchtool/utils/util.go @@ -0,0 +1,192 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "bytes" + "encoding/json" + "fmt" + "math/rand" + "os" + "strconv" + "strings" + + "github.com/olekukonko/tablewriter" +) + +const ( + DEFAULT_PRECISION = 2 + MAX_INT64 = 1<<63 - 1 +) + +const ( + OutputStylePlain = "plain" + OutputStyleTable = "table" + OutputStyleJson = "json" +) + +const ( + ReadPercent = "read" + WritePercent = "write" +) + +// ReadWriteRatio is used to parse the read-write ratio. +type ReadWriteRatio struct { + Ratio string + readPercent int + writePercent int +} + +func NewReadWriteRatio(ratio string) *ReadWriteRatio { + return &ReadWriteRatio{Ratio: ratio, readPercent: -1, writePercent: -1} +} + +func (r *ReadWriteRatio) ParseRatio() error { + if r.Ratio == "" { + return fmt.Errorf("empty read-write-ratio") + } + ratios := strings.Split(r.Ratio, ":") + if len(ratios) == 2 { + readRatio := 0 + writeRatio := 0 + + readRatio, _ = strconv.Atoi(ratios[0]) + writeRatio, _ = strconv.Atoi(ratios[1]) + if readRatio < 0 || writeRatio < 0 { + return fmt.Errorf("invalid read-write-ratio format") + } + + sumRatio := readRatio + writeRatio + r.readPercent = readRatio * 100 / sumRatio + r.writePercent = 100 - r.readPercent + } else { + return fmt.Errorf("invalid read-write-ratio format") + } + return nil +} + +func (r *ReadWriteRatio) GetPercent(choice string) int { + if r.Ratio == "" { + return 0 + } + // Not parsed yet. + if r.readPercent == -1 || r.writePercent == -1 { + if r.ParseRatio() != nil { + return 0 + } + } + if choice == ReadPercent { + return r.readPercent + } else if choice == WritePercent { + return r.writePercent + } + return 0 +} + +// Converting functions. + +func FloatToString(num float64) string { + return strconv.FormatFloat(num, 'f', DEFAULT_PRECISION, 64) +} + +func IntToString(num int64) string { + return strconv.FormatInt(num, 10) +} + +func StrArrsToByteArrs(strArrs []string) [][]byte { + byteArrs := make([][]byte, 0, len(strArrs)) + for _, strArr := range strArrs { + byteArrs = append(byteArrs, []byte(strArr)) + } + return byteArrs +} + +func GenRandomStr(prefix string, keySize int) string { + return fmt.Sprintf("%s@%0*d", prefix, keySize, rand.Intn(MAX_INT64)) +} + +func GenRandomStrArrs(prefix string, keySize, count int) []string { + keys := make([]string, 0, count) + for i := 0; i < count; i++ { + keys = append(keys, GenRandomStr(prefix, keySize)) + } + return keys +} + +func GenRandomByteArrs(prefix string, keySize, count int) [][]byte { + keys := make([][]byte, 0, count) + for i := 0; i < count; i++ { + keys = append(keys, []byte(GenRandomStr(prefix, keySize))) + } + return keys +} + +// Output formatting functions. + +func RenderString(format string, headers []string, values [][]string) { + if len(values) == 0 { + return + } + if len(headers) == 0 { + for _, value := range values { + args := make([]interface{}, len(value)) + for i, v := range value { + args[i] = v + } + fmt.Printf(format, args...) + } + return + } + + buf := new(bytes.Buffer) + for _, value := range values { + args := make([]string, len(headers)) + for i, header := range headers { + args[i] = header + ": " + value[i+2] + } + buf.WriteString(fmt.Sprintf(format, value[0], value[1], strings.Join(args, ", "))) + } + fmt.Print(buf.String()) +} + +func RenderTable(headers []string, values [][]string) { + if len(values) == 0 { + return + } + tb := tablewriter.NewWriter(os.Stdout) + tb.SetHeader(headers) + tb.AppendBulk(values) + tb.Render() +} + +func RenderJson(headers []string, values [][]string) { + if len(values) == 0 { + return + } + data := make([]map[string]string, 0, len(values)) + for _, value := range values { + line := make(map[string]string, 0) + for i, header := range headers { + line[header] = value[i] + } + data = append(data, line) + } + outStr, err := json.Marshal(data) + if err != nil { + fmt.Println(err) + return + } + fmt.Println(string(outStr)) +} diff --git a/examples/benchtool/utils/util_test.go b/examples/benchtool/utils/util_test.go new file mode 100644 index 0000000000..32d373d24a --- /dev/null +++ b/examples/benchtool/utils/util_test.go @@ -0,0 +1,51 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestReadWriteRatio(t *testing.T) { + r := NewReadWriteRatio("100:0") + assert.Nil(t, r.ParseRatio()) + assert.Equal(t, 100, r.readPercent) + assert.Equal(t, 0, r.writePercent) + + r = NewReadWriteRatio("90:10") + assert.Nil(t, r.ParseRatio()) + assert.Equal(t, 90, r.readPercent) + assert.Equal(t, 10, r.writePercent) + + r = NewReadWriteRatio("-10:110") + assert.Error(t, r.ParseRatio()) +} + +func TestBasics(t *testing.T) { + prefix := "test" + str := GenRandomStr(prefix, 256) + assert.True(t, len(str) > 256+len(prefix)) + + str = GenRandomStr(prefix, 0) + assert.True(t, len(str) > len(prefix)) + + strs := GenRandomStrArrs(prefix, 256, 10) + assert.Equal(t, 10, len(strs)) + + byteArrs := GenRandomByteArrs(prefix, 256, 10) + assert.Equal(t, 10, len(byteArrs)) +} diff --git a/examples/benchtool/workloads/base.go b/examples/benchtool/workloads/base.go new file mode 100644 index 0000000000..aa10068db2 --- /dev/null +++ b/examples/benchtool/workloads/base.go @@ -0,0 +1,78 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package workloads + +import ( + "context" + "database/sql" + "fmt" + "time" +) + +const ( + DefaultDriver = "mysql" +) + +// WorkloadInterface is the interface for running customized workload +type WorkloadInterface interface { + Name() string + InitThread(ctx context.Context, threadID int) error + CleanupThread(ctx context.Context, threadID int) + Prepare(ctx context.Context, threadID int) error + CheckPrepare(ctx context.Context, threadID int) error + Run(ctx context.Context, threadID int) error + Cleanup(ctx context.Context, threadID int) error + Check(ctx context.Context, threadID int) error + OutputStats(ifSummaryReport bool) +} + +var GlobalContext context.Context +var GlobalDB *sql.DB // Maybe useless, as the tikv.Client is the only enter to access the TiKV. + +func DispatchExecution(timeoutCtx context.Context, w WorkloadInterface, action string, count int, threadIdx int, silence bool, ignoreError bool) error { + if err := w.InitThread(context.Background(), threadIdx); err != nil { + return err + } + defer w.CleanupThread(timeoutCtx, threadIdx) + + switch action { + case "prepare": + return w.Prepare(timeoutCtx, threadIdx) + case "cleanup": + return w.Cleanup(timeoutCtx, threadIdx) + case "check": + return w.Check(timeoutCtx, threadIdx) + } + + if count > 0 { + for i := 0; i < count || count <= 0; i++ { + err := w.Run(timeoutCtx, threadIdx) + if err != nil { + if !silence { + fmt.Printf("[%s] execute %s failed, err %v\n", time.Now().Format("2006-01-02 15:04:05"), action, err) + } + if !ignoreError { + return err + } + } + select { + case <-timeoutCtx.Done(): + return nil + default: + } + } + } + return nil +} diff --git a/examples/benchtool/workloads/patterns/pattern.go b/examples/benchtool/workloads/patterns/pattern.go new file mode 100644 index 0000000000..3842c57c9b --- /dev/null +++ b/examples/benchtool/workloads/patterns/pattern.go @@ -0,0 +1,454 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package patterns + +import ( + "benchtool/config" + "benchtool/utils" + "benchtool/utils/statistics" + "benchtool/workloads" + "benchtool/workloads/rawkv" + "context" + "fmt" + "sync" + "time" + + "github.com/pingcap/log" + "github.com/spf13/cobra" + clientConfig "github.com/tikv/client-go/v2/config" + clientRawKV "github.com/tikv/client-go/v2/rawkv" + clientTxnKV "github.com/tikv/client-go/v2/txnkv" + "go.uber.org/zap" +) + +func getPatternsConfig(ctx context.Context) *config.PatternsConfig { + c := ctx.Value(config.WorkloadTypeHybrid).(*config.PatternsConfig) + return c +} + +// Assistants for TxnKV workload +func prepareLockKeyWithTimeout(ctx context.Context, txn *clientTxnKV.KVTxn, key []byte, timeout int64) error { + if timeout > 0 { + return txn.LockKeysWithWaitTime(ctx, timeout, key) + } + return nil +} + +func execPatternsWorkloads(cmd string) { + if cmd == "" { + return + } + patternsConfig := getPatternsConfig(workloads.GlobalContext) + + var workload *WorkloadImpl + var err error + if workload, err = NewPatternWorkload(patternsConfig); err != nil { + fmt.Printf("create Patterns workload failed: %v\n", err) + return + } + + timeoutCtx, cancel := context.WithTimeout(workloads.GlobalContext, patternsConfig.Global.TotalTime) + workloads.GlobalContext = timeoutCtx + defer cancel() + + for { + if !workload.ContinueToExecute() { + break + } + if err = workload.BeforeExecute(); err != nil { + fmt.Println("BeforeExecute failed:", err) + break + } + workload.Execute(cmd) + workload.AfterExecute() + } +} + +// Register registers the workload to the command line parser +func Register(command *config.CommandLineParser) *config.PatternsConfig { + if command == nil { + return nil + } + patternsConfig := &config.PatternsConfig{ + Global: command.GetConfig(), + } + + cmd := &cobra.Command{ + Use: config.WorkloadTypeHybrid, + PersistentPreRun: func(cmd *cobra.Command, args []string) { + if err := patternsConfig.Global.InitLogger(); err != nil { + log.Error("InitLogger failed", zap.Error(err)) + } + workloads.GlobalContext = context.WithValue(workloads.GlobalContext, config.WorkloadTypeHybrid, patternsConfig) + }, + } + cmd.PersistentFlags().StringVar(&patternsConfig.FilePath, "file-path", "", "The path of the pattern file") + + var cmdPrepare = &cobra.Command{ + Use: "prepare", + Short: "Prepare data for workload", + Run: func(cmd *cobra.Command, _ []string) { + execPatternsWorkloads("prepare") + }, + } + var cmdRun = &cobra.Command{ + Use: "run", + Short: "Run workload", + Run: func(cmd *cobra.Command, _ []string) { + execPatternsWorkloads("run") + }, + } + var cmdCleanup = &cobra.Command{ + Use: "cleanup", + Short: "Cleanup data for the workload", + Run: func(cmd *cobra.Command, _ []string) { + execPatternsWorkloads("cleanup") + }, + } + var cmdCheck = &cobra.Command{ + Use: "check", + Short: "Check data consistency for the workload", + Run: func(cmd *cobra.Command, _ []string) { + execPatternsWorkloads("check") + }, + } + cmd.AddCommand(cmdRun, cmdPrepare, cmdCleanup, cmdCheck) + + command.GetCommand().AddCommand(cmd) + + return patternsConfig +} + +// Workload is the implementation of WorkloadInterface +type WorkloadImpl struct { + // Pointer to the next execution plan + patternIdx int + // workload pattern + config *config.PatternsConfig + + rawClients []*clientRawKV.Client + txnClients []*clientTxnKV.Client + + stats *statistics.PerfProfile + + wait sync.WaitGroup +} + +func NewPatternWorkload(cfg *config.PatternsConfig) (*WorkloadImpl, error) { + if err := cfg.Parse(); err != nil { + return nil, err + } + if err := cfg.Validate(); err != nil { + return nil, err + } + w := &WorkloadImpl{ + patternIdx: 0, // start from 0 + config: cfg, + stats: statistics.NewPerfProfile(), + } + return w, nil +} + +func (w *WorkloadImpl) Name() string { + return config.WorkloadTypeHybrid +} + +func (w *WorkloadImpl) isValid() bool { + return w.config != nil && w.config.Global != nil && (len(w.rawClients) > 0 || len(w.txnClients) > 0) +} + +func (w *WorkloadImpl) isValidThread(threadID int) bool { + return w.isValid() && threadID < max(len(w.rawClients), len(w.txnClients)) +} + +// InitThread implements WorkloadInterface +func (w *WorkloadImpl) InitThread(ctx context.Context, threadID int) error { + // Nothing to do + return nil +} + +// CleanupThread implements WorkloadInterface +func (w *WorkloadImpl) CleanupThread(ctx context.Context, threadID int) { + if w.isValidThread(threadID) { + if len(w.rawClients) > 0 { + client := w.rawClients[threadID] + if client != nil { + client.Close() + } + } else { + client := w.txnClients[threadID] + if client != nil { + client.Close() + } + } + } +} + +// Prepare implements WorkloadInterface +func (w *WorkloadImpl) Prepare(ctx context.Context, threadID int) error { + if !w.isValidThread(threadID) { + return fmt.Errorf("no valid clients for patterns workloads") + } + + // return prepareWorkloadImpl(ctx, w, w.cfg.Threads, w.cfg.Warehouses, threadID) + // TODO: add prepare stage + return nil +} + +// CheckPrepare implements WorkloadInterface +func (w *WorkloadImpl) CheckPrepare(ctx context.Context, threadID int) error { + return nil +} + +func (w *WorkloadImpl) Run(ctx context.Context, threadID int) error { + if !w.isValidThread(threadID) { + return fmt.Errorf("no valid clients for pattern workload") + } + + if len(w.rawClients) > 0 { + return w.RunRawkKvWorkloads(ctx, threadID) + } else if len(w.txnClients) > 0 { + return w.RunTxnKvWorkloads(ctx, threadID) + } + return fmt.Errorf("invalid pattern workload") +} + +// RunRawkKvWorkloads implements the executing details on RawKV part. +func (w *WorkloadImpl) RunRawkKvWorkloads(ctx context.Context, threadID int) error { + if !w.isValidThread(threadID) { + return fmt.Errorf("no valid RawKV clients") + } + + plan := w.config.Plans[w.patternIdx] + rawConfig := plan.GetRawKVConfig() + + for _, workload := range plan.GetWorkloads() { + rawkv.RunRawKVCommand(ctx, w.rawClients[threadID], workload, rawConfig.KeySize, rawConfig.ValueSize, rawConfig.BatchSize, rawConfig.Randomize, w.stats, w.config.Global.IgnoreError) + } + return nil +} + +// RunTxnKvWorkloads implements the executing details on TxnKV part. +func (w *WorkloadImpl) RunTxnKvWorkloads(ctx context.Context, threadID int) error { + if !w.isValidThread(threadID) { + return fmt.Errorf("no valid TxnKV clients") + } + + plan := w.config.Plans[w.patternIdx] + { + // Check the current plan is valid or not + workloads := plan.GetWorkloads() + if len(workloads) < 2 || workloads[0] != config.TxnKVCommandTypeBegin { + return fmt.Errorf("invalid plan, idx %d", w.patternIdx) + } + } + txnConfig := plan.GetTxnKVConfig() + // Prepare the key value pairs + key := config.TxnKVCommandDefaultKey + val := utils.GenRandomStr(config.TxnKVCommandDefaultValue, txnConfig.ValueSize) + lockTimeout := int64(txnConfig.LockTimeout) + // Constructs the txn client and sets the txn mode + client := w.txnClients[threadID] + txn, err := client.Begin() + if err != nil { + return fmt.Errorf("txn begin failed, err %v", err) + } + switch txnConfig.TxnMode { + case config.TxnKVMode1PC: + txn.SetEnable1PC(true) + case config.TxnKVModeAsyncCommit: + txn.SetEnableAsyncCommit(true) + } + // Default is optimistic lock mode. + txn.SetPessimistic(lockTimeout > 0) + // Tranverse each command + hasUncommitted := true // mark the previous txn has been committed or not + for idx, workload := range plan.GetWorkloads() { + if (workload == config.TxnKVCommandTypeCommit) || (workload == config.TxnKVCommandTypeBegin && idx > 0) { + hasUncommitted = false + start := time.Now() + if txnErr := txn.Commit(ctx); txnErr != nil { + return fmt.Errorf("txn commit failed, err %v", txnErr) + } + w.stats.Record(txnConfig.TxnMode, time.Since(start)) + // Create a new txn. + txn, err = client.Begin() + if err != nil { + return fmt.Errorf("txn begin failed, err %v", err) + } + continue + } else if workload == config.TxnKVCommandTypeRollback { + hasUncommitted = true + if err = txn.Rollback(); err != nil { + return fmt.Errorf("txn rollback failed, err %v", err) + } + continue + } + hasUncommitted = true + for row := 0; row < txnConfig.TxnSize; row++ { + key = fmt.Sprintf("%s@col_", utils.GenRandomStr(key, txnConfig.KeySize)) + // Lock the key with timeout if necessary. + if err = prepareLockKeyWithTimeout(ctx, txn, []byte(key), lockTimeout); err != nil { + fmt.Printf("txn lock key failed, err %v", err) + continue + } + for col := 0; col < txnConfig.ColumnSize; col++ { + colKey := fmt.Sprintf("%s%d", key, col) + if workload == config.TxnKVCommandTypeRead { + _, err = txn.Get(ctx, []byte(colKey)) + } else if workload == config.TxnKVCommandTypeWrite || workload == config.TxnKVCommandTypeSet { + err = txn.Set([]byte(colKey), []byte(val)) + } else if workload == config.TxnKVCommandTypeDel { + err = txn.Delete([]byte(colKey)) + } + if err != nil { + return fmt.Errorf("txn set / get failed, err %v", err) + } + } + } + } + // If the previous txn is not committed, commit it. + if hasUncommitted { + start := time.Now() + if txnErr := txn.Commit(ctx); txnErr != nil { + return fmt.Errorf("txn commit failed, err %v", txnErr) + } + w.stats.Record(txnConfig.TxnMode, time.Since(start)) + } + return nil +} + +// Check implements WorkloadInterface +func (w *WorkloadImpl) Check(ctx context.Context, threadID int) error { + return nil +} + +// Cleanup implements WorkloadInterface +func (w *WorkloadImpl) Cleanup(ctx context.Context, threadID int) error { + if !w.isValidThread(threadID) { + return fmt.Errorf("no valid clients for pattern workload") + } + // delete all keys + if threadID == 0 { + if len(w.rawClients) > 0 { + client := w.rawClients[threadID] + client.DeleteRange(ctx, []byte(config.RawKVCommandDefaultKey), []byte(config.RawKVCommandDefaultEndKey)) + } else { + client := w.txnClients[threadID] + client.DeleteRange(ctx, []byte(config.TxnKVCommandDefaultKey), []byte(config.TxnKVCommandDefaultEndKey), len(w.txnClients)) + } + } + return nil +} + +func (w *WorkloadImpl) OutputStats(ifSummaryReport bool) { + w.stats.PrintFmt(ifSummaryReport, w.config.Global.OutputStyle, statistics.HistogramOutputFunc) +} + +func (w *WorkloadImpl) IsTxnKVPattern() bool { + plan := w.config.Plans[w.patternIdx] + return plan.GetTxnKVConfig() != nil +} + +func (w *WorkloadImpl) ContinueToExecute() bool { + return w.patternIdx < len(w.config.Plans) +} + +func (w *WorkloadImpl) BeforeExecute() error { + plan := w.config.Plans[w.patternIdx] + txnConfig := plan.GetTxnKVConfig() + rawConfig := plan.GetRawKVConfig() + if txnConfig != nil { + clientConfig.UpdateGlobal(func(conf *clientConfig.Config) { + conf.TiKVClient.MaxBatchSize = (uint)(txnConfig.TxnSize + 10) + }) + w.txnClients = make([]*clientTxnKV.Client, 0, txnConfig.Global.Threads) + for i := 0; i < txnConfig.Global.Threads; i++ { + client, err := clientTxnKV.NewClient(txnConfig.Global.Targets) + if err != nil { + return err + } + w.txnClients = append(w.txnClients, client) + } + } else if rawConfig != nil { + w.rawClients = make([]*clientRawKV.Client, 0, rawConfig.Global.Threads) + for i := 0; i < rawConfig.Global.Threads; i++ { + client, err := clientRawKV.NewClient(workloads.GlobalContext, rawConfig.Global.Targets, rawConfig.Global.Security) + if err != nil { + return err + } + w.rawClients = append(w.rawClients, client) + } + } + fmt.Println("Start to execute pattern", plan.GetName()) + return nil +} + +func (w *WorkloadImpl) AfterExecute() { + plan := w.config.Plans[w.patternIdx] + w.OutputStats(true) + fmt.Println("Finish executing pattern", plan.GetName()) + // Release the resources + w.rawClients = nil + w.txnClients = nil + w.patternIdx += 1 + w.stats.Clear() +} + +func (w *WorkloadImpl) Execute(cmd string) { + plan := w.config.Plans[w.patternIdx] + txnConfig := plan.GetTxnKVConfig() + rawConfig := plan.GetRawKVConfig() + var globalConfig *config.GlobalConfig + if txnConfig != nil { + globalConfig = txnConfig.Global + } else { + globalConfig = rawConfig.Global + } + + w.wait.Add(globalConfig.Threads) + + ctx, cancel := context.WithCancel(workloads.GlobalContext) + ch := make(chan struct{}, 1) + go func() { + ticker := time.NewTicker(globalConfig.OutputInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + ch <- struct{}{} + return + case <-ticker.C: + w.OutputStats(false) + } + } + }() + + count := globalConfig.TotalCount / globalConfig.Threads + for i := 0; i < globalConfig.Threads; i++ { + go func(index int) { + defer w.wait.Done() + if err := workloads.DispatchExecution(ctx, w, cmd, count, index, globalConfig.Silence, globalConfig.IgnoreError); err != nil { + fmt.Printf("[%s] execute %s failed, err %v\n", time.Now().Format("2006-01-02 15:04:05"), cmd, err) + return + } + }(i) + } + + w.wait.Wait() + cancel() + <-ch +} diff --git a/examples/benchtool/workloads/patterns/pattern_test.go b/examples/benchtool/workloads/patterns/pattern_test.go new file mode 100644 index 0000000000..7078379a0f --- /dev/null +++ b/examples/benchtool/workloads/patterns/pattern_test.go @@ -0,0 +1,46 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package patterns + +import ( + "testing" + + "benchtool/config" + + "gotest.tools/v3/assert" +) + +func TestPatterns(t *testing.T) { + // Read the YAML file + filePath := "./test.yaml" + patternsConfig := &config.PatternsConfig{ + FilePath: filePath, + } + err := patternsConfig.Parse() + assert.Equal(t, err == nil, true) + + err = patternsConfig.Validate() + assert.Equal(t, err != nil, true) // PdAddrs is empty + + for _, pattern := range patternsConfig.Plans { + if pattern.GetRawKVConfig() == nil { + assert.Equal(t, pattern.GetTxnKVConfig() == nil, false) + } else { + assert.Equal(t, pattern.GetRawKVConfig() == nil, false) + } + workloads := pattern.GetWorkloads() + assert.Equal(t, len(workloads) > 0, true) + } +} diff --git a/examples/benchtool/workloads/patterns/template.yaml b/examples/benchtool/workloads/patterns/template.yaml new file mode 100644 index 0000000000..be926a9d47 --- /dev/null +++ b/examples/benchtool/workloads/patterns/template.yaml @@ -0,0 +1,15 @@ +patterns: + - name: # sub pattern name 1 + workload_type: # workload type, [rawkv | txnkv] + workloads: # list of workloads [batch_get | batch_write | get | put | scan | update | begin | commit | rollback] + - # workload name 1 + - # workload name 2 + mode: # transaction mode, only valid for txnkv, [async_commit | 1PC | 2PC] + lock: # lock type, only valid for txnkv, [optimistic | pessimistic] + key_prefix: # prefix of the key, default is ${name} + count: # number of operations + key_size: # size of the key + value_size: # size of the value + threads: # number of threads + random: # whether to use random key, default is false + - name: # sub pattern name 2 \ No newline at end of file diff --git a/examples/benchtool/workloads/patterns/test.yaml b/examples/benchtool/workloads/patterns/test.yaml new file mode 100644 index 0000000000..089b9ad53b --- /dev/null +++ b/examples/benchtool/workloads/patterns/test.yaml @@ -0,0 +1,29 @@ +patterns: + - name: codecov + workload_type: rawkv + workloads: + - batch_get + - batch_write + - get + - put + - scan + - update + key_prefix: "codecov" # default is ${name} + count: 10000 + key_size: 1024 + value_size: 1024 + threads: 10 + random: true # default is false + - name: txn + workload_type: txnkv + workloads: + - begin + - write + - commit + mode: 2PC # (async_commit | 1PC | 2PC) + lock: pessimistic # (optimistic | pessimistic) + count: 10000 + key_size: 1024 + value_size: 1024 + threads: 10 + random: true # default is false \ No newline at end of file diff --git a/examples/benchtool/workloads/rawkv/raw.go b/examples/benchtool/workloads/rawkv/raw.go new file mode 100644 index 0000000000..5bc271d9ea --- /dev/null +++ b/examples/benchtool/workloads/rawkv/raw.go @@ -0,0 +1,356 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rawkv + +import ( + "benchtool/config" + "benchtool/utils" + "benchtool/utils/statistics" + "benchtool/workloads" + "context" + "fmt" + "sync" + "time" + + "github.com/pingcap/log" + "github.com/spf13/cobra" + "github.com/tikv/client-go/v2/rawkv" + "go.uber.org/zap" +) + +func isReadCommand(cmd string) bool { + return cmd == config.RawKVCommandTypeGet || cmd == config.RawKVCommandTypeBatchGet +} + +func getRawKvConfig(ctx context.Context) *config.RawKVConfig { + c := ctx.Value(config.WorkloadTypeRawKV).(*config.RawKVConfig) + return c +} + +func convertCfName(cf string) string { + switch cf { + case "default": + return config.WorkloadColumnFamilyDefault + case "write": + case "lock": + fmt.Printf("Column family %s is not supported, use default instead\n", cf) + return config.WorkloadColumnFamilyDefault + default: + return cf + } + return config.WorkloadColumnFamilyDefault +} + +func execRawKV(cmd string) { + if cmd == "" { + return + } + rawKVConfig := getRawKvConfig(workloads.GlobalContext) + + var workload *WorkloadImpl + var err error + if workload, err = NewRawKVWorkload(rawKVConfig); err != nil { + fmt.Printf("create RawKV workload failed: %v\n", err) + return + } + + timeoutCtx, cancel := context.WithTimeout(workloads.GlobalContext, rawKVConfig.Global.TotalTime) + workloads.GlobalContext = timeoutCtx + defer cancel() + + workload.Execute(cmd) + fmt.Println("RawKV workload finished") + workload.OutputStats(true) +} + +// Register registers the workload to the command line parser +func Register(command *config.CommandLineParser) *config.RawKVConfig { + if command == nil { + return nil + } + rawKVConfig := &config.RawKVConfig{ + Global: command.GetConfig(), + } + + cmd := &cobra.Command{ + Use: config.WorkloadTypeRawKV, + PersistentPreRun: func(cmd *cobra.Command, args []string) { + if err := rawKVConfig.Global.InitLogger(); err != nil { + log.Error("InitLogger failed", zap.Error(err)) + } + rawKVConfig.ColumnFamily = convertCfName(rawKVConfig.ColumnFamily) + workloads.GlobalContext = context.WithValue(workloads.GlobalContext, config.WorkloadTypeRawKV, rawKVConfig) + }, + } + cmd.PersistentFlags().StringVar(&rawKVConfig.ColumnFamily, "cf", "default", "Column family name (default|write|lock)") + cmd.PersistentFlags().StringVar(&rawKVConfig.CommandType, "cmd", "put", "Type of command to execute (put|get|del|batch_put|batch_get|batch_del|scan|reserve_scan|cas)") + cmd.PersistentFlags().IntVar(&rawKVConfig.KeySize, "key-size", 1, "Size of key in bytes") + cmd.PersistentFlags().IntVar(&rawKVConfig.ValueSize, "value-size", 1, "Size of value in bytes") + cmd.PersistentFlags().IntVar(&rawKVConfig.BatchSize, "batch-size", 1, "Size of batch for batch operations") + cmd.PersistentFlags().BoolVar(&rawKVConfig.Randomize, "random", false, "Whether to randomize each value") + + var cmdPrepare = &cobra.Command{ + Use: "prepare", + Short: "Prepare data for RawKV workload", + Run: func(cmd *cobra.Command, _ []string) { + execRawKV("prepare") + }, + } + cmdPrepare.PersistentFlags().IntVar(&rawKVConfig.PrepareRetryCount, "retry-count", 50, "Retry count when errors occur") + cmdPrepare.PersistentFlags().DurationVar(&rawKVConfig.PrepareRetryInterval, "retry-interval", 10*time.Millisecond, "The interval for each retry") + + var cmdRun = &cobra.Command{ + Use: "run", + Short: "Run workload", + Run: func(cmd *cobra.Command, _ []string) { + execRawKV("run") + }, + } + + var cmdCleanup = &cobra.Command{ + Use: "cleanup", + Short: "Cleanup data for the workload", + Run: func(cmd *cobra.Command, _ []string) { + execRawKV("cleanup") + }, + } + + var cmdCheck = &cobra.Command{ + Use: "check", + Short: "Check data consistency for the workload", + Run: func(cmd *cobra.Command, _ []string) { + execRawKV("check") + }, + } + + cmd.AddCommand(cmdRun, cmdPrepare, cmdCleanup, cmdCheck) + + command.GetCommand().AddCommand(cmd) + + return rawKVConfig +} + +func RunRawKVCommand(ctx context.Context, client *rawkv.Client, commandType string, keySize int, valueSize int, batchSize int, randomize bool, stats *statistics.PerfProfile, ignoreErr bool) error { + // For unary operations. + key := config.RawKVCommandDefaultKey + val := config.RawKVCommandDefaultValue + + // For batch operations. + var ( + keys [][]byte + vals [][]byte + err error + ) + switch commandType { + case config.RawKVCommandTypePut, config.RawKVCommandTypeGet, config.RawKVCommandTypeDel, config.RawKVCommandTypeCAS, config.RawKVCommandTypeScan, config.RawKVCommandTypeReverseScan: + if randomize { + key = utils.GenRandomStr(config.RawKVCommandDefaultKey, keySize) + if !isReadCommand(commandType) { + val = utils.GenRandomStr(config.RawKVCommandDefaultValue, valueSize) + } + } + case config.RawKVCommandTypeBatchPut, config.RawKVCommandTypeBatchGet, config.RawKVCommandTypeBatchDel: + if randomize { + keys = utils.GenRandomByteArrs(config.RawKVCommandDefaultKey, keySize, batchSize) + if !isReadCommand(commandType) { + vals = utils.GenRandomByteArrs(config.RawKVCommandDefaultValue, valueSize, batchSize) + } + } + } + + start := time.Now() + switch commandType { + case config.RawKVCommandTypePut: + err = client.Put(ctx, []byte(key), []byte(val)) + case config.RawKVCommandTypeGet: + _, err = client.Get(ctx, []byte(key)) + case config.RawKVCommandTypeDel: + err = client.Delete(ctx, []byte(key)) + case config.RawKVCommandTypeBatchPut: + err = client.BatchPut(ctx, keys, vals) + case config.RawKVCommandTypeBatchGet: + _, err = client.BatchGet(ctx, keys) + case config.RawKVCommandTypeBatchDel: + err = client.BatchDelete(ctx, keys) + case config.RawKVCommandTypeCAS: + var oldVal []byte + oldVal, _ = client.Get(ctx, []byte(key)) + _, _, err = client.CompareAndSwap(ctx, []byte(key), []byte(oldVal), []byte(val)) // Experimental + case config.RawKVCommandTypeScan: + _, _, err = client.Scan(ctx, []byte(key), []byte(config.RawKVCommandDefaultEndKey), batchSize) + case config.RawKVCommandTypeReverseScan: + _, _, err = client.ReverseScan(ctx, []byte(key), []byte(config.RawKVCommandDefaultKey), batchSize) + } + if err != nil && !ignoreErr { + return fmt.Errorf("execute %s failed: %v", commandType, err) + } + stats.Record(commandType, time.Since(start)) + return nil +} + +type WorkloadImpl struct { + cfg *config.RawKVConfig + clients []*rawkv.Client + + wait sync.WaitGroup + + stats *statistics.PerfProfile +} + +func NewRawKVWorkload(cfg *config.RawKVConfig) (*WorkloadImpl, error) { + if err := cfg.Validate(); err != nil { + return nil, err + } + w := &WorkloadImpl{ + cfg: cfg, + stats: statistics.NewPerfProfile(), + } + + w.clients = make([]*rawkv.Client, 0, w.cfg.Global.Threads) + for i := 0; i < w.cfg.Global.Threads; i++ { + client, err := rawkv.NewClient(workloads.GlobalContext, w.cfg.Global.Targets, w.cfg.Global.Security) + if err != nil { + return nil, err + } + w.clients = append(w.clients, client) + } + return w, nil +} + +func (w *WorkloadImpl) Name() string { + return config.WorkloadTypeRawKV +} + +func (w *WorkloadImpl) isValid() bool { + return w.cfg != nil && w.cfg.Global != nil && len(w.clients) > 0 +} + +func (w *WorkloadImpl) isValidThread(threadID int) bool { + return w.isValid() && threadID < len(w.clients) +} + +// InitThread implements WorkloadInterface +func (w *WorkloadImpl) InitThread(ctx context.Context, threadID int) error { + // Nothing to do + if !w.isValidThread(threadID) { + return fmt.Errorf("no valid RawKV clients") + } + client := w.clients[threadID] + client.SetAtomicForCAS(w.cfg.CommandType == config.RawKVCommandTypeCAS) + client.SetColumnFamily(w.cfg.ColumnFamily) + return nil +} + +// CleanupThread implements WorkloadInterface +func (w *WorkloadImpl) CleanupThread(ctx context.Context, threadID int) { + if w.isValidThread(threadID) { + client := w.clients[threadID] + if client != nil { + client.Close() + } + } +} + +// Prepare implements WorkloadInterface +func (w *WorkloadImpl) Prepare(ctx context.Context, threadID int) error { + if !w.isValidThread(threadID) { + return fmt.Errorf("no valid RawKV clients") + } + + // return prepareWorkloadImpl(ctx, w, w.cfg.Threads, w.cfg.Warehouses, threadID) + // TODO: add prepare stage + return nil +} + +// CheckPrepare implements WorkloadInterface +func (w *WorkloadImpl) CheckPrepare(ctx context.Context, threadID int) error { + return nil +} + +func (w *WorkloadImpl) Run(ctx context.Context, threadID int) error { + if !w.isValidThread(threadID) { + return fmt.Errorf("no valid RawKV clients") + } + return RunRawKVCommand(ctx, w.clients[threadID], w.cfg.CommandType, w.cfg.KeySize, w.cfg.ValueSize, w.cfg.BatchSize, w.cfg.Randomize, w.stats, w.cfg.Global.IgnoreError) +} + +// Check implements WorkloadInterface +func (w *WorkloadImpl) Check(ctx context.Context, threadID int) error { + if !w.isValidThread(threadID) { + return fmt.Errorf("no valid RawKV clients") + } + if threadID == 0 { + client := w.clients[threadID] + checksum, err := client.Checksum(ctx, []byte(config.RawKVCommandDefaultKey), []byte(config.RawKVCommandDefaultEndKey)) + if err != nil { + return nil + } else { + fmt.Printf("RawKV checksum: %d\n", checksum) + } + } + return nil +} + +// Cleanup implements WorkloadInterface +func (w *WorkloadImpl) Cleanup(ctx context.Context, threadID int) error { + if !w.isValidThread(threadID) { + return fmt.Errorf("no valid RawKV clients") + } + if threadID == 0 { + client := w.clients[threadID] + client.DeleteRange(ctx, []byte(config.RawKVCommandDefaultKey), []byte(config.RawKVCommandDefaultEndKey)) // delete all keys + } + return nil +} + +func (w *WorkloadImpl) OutputStats(ifSummaryReport bool) { + w.stats.PrintFmt(ifSummaryReport, w.cfg.Global.OutputStyle, statistics.HistogramOutputFunc) +} + +func (w *WorkloadImpl) Execute(cmd string) { + w.wait.Add(w.cfg.Global.Threads) + + ctx, cancel := context.WithCancel(workloads.GlobalContext) + ch := make(chan struct{}, 1) + go func() { + ticker := time.NewTicker(w.cfg.Global.OutputInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + ch <- struct{}{} + return + case <-ticker.C: + w.OutputStats(false) + } + } + }() + + count := w.cfg.Global.TotalCount / w.cfg.Global.Threads + for i := 0; i < w.cfg.Global.Threads; i++ { + go func(index int) { + defer w.wait.Done() + if err := workloads.DispatchExecution(ctx, w, cmd, count, index, w.cfg.Global.Silence, w.cfg.Global.IgnoreError); err != nil { + fmt.Printf("[%s] execute %s failed, err %v\n", time.Now().Format("2006-01-02 15:04:05"), cmd, err) + return + } + }(i) + } + + w.wait.Wait() + cancel() + <-ch +} diff --git a/examples/benchtool/workloads/txnkv/txn.go b/examples/benchtool/workloads/txnkv/txn.go new file mode 100644 index 0000000000..3555a08fc5 --- /dev/null +++ b/examples/benchtool/workloads/txnkv/txn.go @@ -0,0 +1,344 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package txnkv + +import ( + "benchtool/config" + "benchtool/utils" + "benchtool/utils/statistics" + "benchtool/workloads" + "context" + "fmt" + "math/rand" + "sync" + "time" + + "github.com/pingcap/log" + "github.com/spf13/cobra" + clientConfig "github.com/tikv/client-go/v2/config" + tikverr "github.com/tikv/client-go/v2/error" + clientTxnKV "github.com/tikv/client-go/v2/txnkv" + "go.uber.org/zap" +) + +func getTxnKVConfig(ctx context.Context) *config.TxnKVConfig { + c := ctx.Value(config.WorkloadTypeTxnKV).(*config.TxnKVConfig) + return c +} + +// Assistants for TxnKV workload +func prepareLockKeyWithTimeout(ctx context.Context, txn *clientTxnKV.KVTxn, key []byte, timeout int64) error { + if timeout > 0 { + return txn.LockKeysWithWaitTime(ctx, timeout, key) + } + return nil +} + +func execTxnKV(cmd string) { + if cmd == "" { + return + } + TxnKVConfig := getTxnKVConfig(workloads.GlobalContext) + + var workload *WorkloadImpl + var err error + if workload, err = NewTxnKVWorkload(TxnKVConfig); err != nil { + fmt.Printf("create TxnKV workload failed: %v\n", err) + return + } + + timeoutCtx, cancel := context.WithTimeout(workloads.GlobalContext, TxnKVConfig.Global.TotalTime) + workloads.GlobalContext = timeoutCtx + defer cancel() + + workload.Execute(cmd) + fmt.Println("TxnKV workload finished") + workload.OutputStats(true) +} + +// Register registers the workload to the command line parser +func Register(command *config.CommandLineParser) *config.TxnKVConfig { + if command == nil { + return nil + } + txnKVConfig := &config.TxnKVConfig{ + Global: command.GetConfig(), + ReadWriteRatio: utils.NewReadWriteRatio("1:1"), // TODO: generate workloads meeting the read-write ratio + } + + cmd := &cobra.Command{ + Use: config.WorkloadTypeTxnKV, + PersistentPreRun: func(cmd *cobra.Command, args []string) { + if err := txnKVConfig.Global.InitLogger(); err != nil { + log.Error("InitLogger failed", zap.Error(err)) + } + workloads.GlobalContext = context.WithValue(workloads.GlobalContext, config.WorkloadTypeTxnKV, txnKVConfig) + }, + } + cmd.PersistentFlags().StringVar(&txnKVConfig.ReadWriteRatio.Ratio, "read-write-ratio", "1:1", "Read write ratio") + cmd.PersistentFlags().IntVar(&txnKVConfig.KeySize, "key-size", 1, "Size of key in bytes") + cmd.PersistentFlags().IntVar(&txnKVConfig.ValueSize, "value-size", 1, "Size of value in bytes") + cmd.PersistentFlags().IntVar(&txnKVConfig.ColumnSize, "column-size", 1, "Size of column") + cmd.PersistentFlags().IntVar(&txnKVConfig.TxnSize, "txn-size", 1, "Size of transaction (normally, the lines of kv pairs)") + cmd.PersistentFlags().StringVar(&txnKVConfig.TxnMode, "txn-mode", "2PC", "Mode of transaction (2PC/1PC/async-commit)") + cmd.PersistentFlags().IntVar(&txnKVConfig.LockTimeout, "lock-timeout", 0, "Lock timeout for each key in txn (>0 means pessimistic mode, 0 means optimistic mode)") + // TODO: add more flags on txn, such as pessimistic/optimistic lock, etc. + + var cmdPrepare = &cobra.Command{ + Use: "prepare", + Short: "Prepare data for TxnKV workload", + Run: func(cmd *cobra.Command, _ []string) { + execTxnKV("prepare") + }, + } + cmdPrepare.PersistentFlags().IntVar(&txnKVConfig.PrepareRetryCount, "retry-count", 50, "Retry count when errors occur") + cmdPrepare.PersistentFlags().DurationVar(&txnKVConfig.PrepareRetryInterval, "retry-interval", 10*time.Millisecond, "The interval for each retry") + + var cmdRun = &cobra.Command{ + Use: "run", + Short: "Run workload", + Run: func(cmd *cobra.Command, _ []string) { + execTxnKV("run") + }, + } + + var cmdCleanup = &cobra.Command{ + Use: "cleanup", + Short: "Cleanup data for the workload", + Run: func(cmd *cobra.Command, _ []string) { + execTxnKV("cleanup") + }, + } + + var cmdCheck = &cobra.Command{ + Use: "check", + Short: "Check data consistency for the workload", + Run: func(cmd *cobra.Command, _ []string) { + execTxnKV("check") + }, + } + + cmd.AddCommand(cmdRun, cmdPrepare, cmdCleanup, cmdCheck) + + command.GetCommand().AddCommand(cmd) + + return txnKVConfig +} + +// Workload is the implementation of WorkloadInterface +type WorkloadImpl struct { + cfg *config.TxnKVConfig + clients []*clientTxnKV.Client + + wait sync.WaitGroup + + stats *statistics.PerfProfile +} + +func NewTxnKVWorkload(cfg *config.TxnKVConfig) (*WorkloadImpl, error) { + if err := cfg.Validate(); err != nil { + return nil, err + } + w := &WorkloadImpl{ + cfg: cfg, + stats: statistics.NewPerfProfile(), + } + + clientConfig.UpdateGlobal(func(conf *clientConfig.Config) { + conf.TiKVClient.MaxBatchSize = (uint)(cfg.TxnSize + 10) + }) + // TODO: setting batch. + // defer config.UpdateGlobal(func(conf *config.Config) { + // conf.TiKVClient.MaxBatchSize = 0 + // conf.TiKVClient.GrpcConnectionCount = 1 + // })() + + w.clients = make([]*clientTxnKV.Client, 0, w.cfg.Global.Threads) + for i := 0; i < w.cfg.Global.Threads; i++ { + client, err := clientTxnKV.NewClient(w.cfg.Global.Targets) + if err != nil { + return nil, err + } + w.clients = append(w.clients, client) + } + return w, nil +} + +func (w *WorkloadImpl) Name() string { + return config.WorkloadTypeTxnKV +} + +func (w *WorkloadImpl) isValid() bool { + return w.cfg != nil && w.cfg.Global != nil && len(w.clients) > 0 +} + +func (w *WorkloadImpl) isValidThread(threadID int) bool { + return w.isValid() && threadID < len(w.clients) +} + +// InitThread implements WorkloadInterface +func (w *WorkloadImpl) InitThread(ctx context.Context, threadID int) error { + // Nothing to do + return nil +} + +// CleanupThread implements WorkloadInterface +func (w *WorkloadImpl) CleanupThread(ctx context.Context, threadID int) { + if w.isValidThread(threadID) { + client := w.clients[threadID] + if client != nil { + client.Close() + } + } +} + +// Prepare implements WorkloadInterface +func (w *WorkloadImpl) Prepare(ctx context.Context, threadID int) error { + if !w.isValidThread(threadID) { + return fmt.Errorf("no valid TxnKV clients") + } + + // return prepareWorkloadImpl(ctx, w, w.cfg.Threads, w.cfg.Warehouses, threadID) + // TODO: add prepare stage + return nil +} + +// CheckPrepare implements WorkloadInterface +func (w *WorkloadImpl) CheckPrepare(ctx context.Context, threadID int) error { + return nil +} + +func (w *WorkloadImpl) Run(ctx context.Context, threadID int) error { + if !w.isValidThread(threadID) { + return fmt.Errorf("no valid TxnKV clients") + } + + client := w.clients[threadID] + key := config.TxnKVCommandDefaultKey + val := utils.GenRandomStr(config.TxnKVCommandDefaultValue, w.cfg.ValueSize) + lockTimeout := int64(w.cfg.LockTimeout) + + // Constructs the txn client and sets the txn mode + txn, err := client.Begin() + if err != nil { + return fmt.Errorf("txn begin failed, err %v", err) + } + switch w.cfg.TxnMode { + case config.TxnKVMode1PC: + txn.SetEnable1PC(true) + case config.TxnKVModeAsyncCommit: + txn.SetEnableAsyncCommit(true) + } + + // Default is optimistic lock mode. + txn.SetPessimistic(lockTimeout > 0) + + sum := w.cfg.TxnSize * w.cfg.ColumnSize + readCount := sum * w.cfg.ReadWriteRatio.GetPercent(utils.ReadPercent) / 100 + writeCount := sum - readCount + canRead := func(sum, readCount, writeCount int) bool { + return readCount > 0 && (writeCount <= 0 || rand.Intn(sum)/2 == 0) + } + + for row := 0; row < w.cfg.TxnSize; row++ { + key = fmt.Sprintf("%s@col_", utils.GenRandomStr(key, w.cfg.KeySize)) + // Lock the key with timeout if necessary. + if err = prepareLockKeyWithTimeout(ctx, txn, []byte(key), lockTimeout); err != nil { + fmt.Printf("txn lock key failed, err %v", err) + continue + } + for col := 0; col < w.cfg.ColumnSize; col++ { + colKey := fmt.Sprintf("%s%d", key, col) + if canRead(sum, readCount, writeCount) { + _, err = txn.Get(ctx, []byte(colKey)) + if tikverr.IsErrNotFound(err) { + err = txn.Set([]byte(colKey), []byte(val)) + writeCount -= 1 + } + readCount -= 1 + } else { + err = txn.Set([]byte(colKey), []byte(val)) + writeCount -= 1 + } + if err != nil { + return fmt.Errorf("txn set / get failed, err %v", err) + } + } + } + start := time.Now() + err = txn.Commit(ctx) + if err != nil { + return fmt.Errorf("txn commit failed, err %v", err) + } + w.stats.Record(w.cfg.TxnMode, time.Since(start)) + return nil +} + +// Check implements WorkloadInterface +func (w *WorkloadImpl) Check(ctx context.Context, threadID int) error { + return nil +} + +// Cleanup implements WorkloadInterface +func (w *WorkloadImpl) Cleanup(ctx context.Context, threadID int) error { + if !w.isValidThread(threadID) { + return fmt.Errorf("no valid TxnKV clients") + } + if threadID == 0 { + client := w.clients[threadID] + client.DeleteRange(ctx, []byte(config.TxnKVCommandDefaultKey), []byte(config.TxnKVCommandDefaultEndKey), w.cfg.Global.Threads) // delete all keys + } + return nil +} + +func (w *WorkloadImpl) OutputStats(ifSummaryReport bool) { + w.stats.PrintFmt(ifSummaryReport, w.cfg.Global.OutputStyle, statistics.HistogramOutputFunc) +} + +func (w *WorkloadImpl) Execute(cmd string) { + w.wait.Add(w.cfg.Global.Threads) + + ctx, cancel := context.WithCancel(workloads.GlobalContext) + ch := make(chan struct{}, 1) + go func() { + ticker := time.NewTicker(w.cfg.Global.OutputInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + ch <- struct{}{} + return + case <-ticker.C: + w.OutputStats(false) + } + } + }() + + count := w.cfg.Global.TotalCount / w.cfg.Global.Threads + for i := 0; i < w.cfg.Global.Threads; i++ { + go func(index int) { + defer w.wait.Done() + if err := workloads.DispatchExecution(ctx, w, cmd, count, index, w.cfg.Global.Silence, w.cfg.Global.IgnoreError); err != nil { + fmt.Printf("[%s] execute %s failed, err %v\n", time.Now().Format("2006-01-02 15:04:05"), cmd, err) + return + } + }(i) + } + + w.wait.Wait() + cancel() + <-ch +}