From 2d73331ac18d3382abb36625122f464c22fd2931 Mon Sep 17 00:00:00 2001 From: SJC <88550136+sjcsjc123@users.noreply.github.com> Date: Fri, 22 Dec 2023 07:48:54 +0800 Subject: [PATCH] feat: add big key tool (#2195) * feat: add big key tool Signed-off-by: sjcsjc123 <1401189096@qq.com> * modify heap to maxHeap Signed-off-by: sjcsjc123 <1401189096@qq.com> * modify comment Signed-off-by: sjcsjc123 <1401189096@qq.com> * add ci test Signed-off-by: sjcsjc123 <1401189096@qq.com> * add compress and decompress Signed-off-by: sjcsjc123 <1401189096@qq.com> * fix ci Signed-off-by: sjcsjc123 <1401189096@qq.com> * fix ci Signed-off-by: sjcsjc123 <1401189096@qq.com> * fix ci Signed-off-by: sjcsjc123 <1401189096@qq.com> * fix ci Signed-off-by: sjcsjc123 <1401189096@qq.com> * fix ci Signed-off-by: sjcsjc123 <1401189096@qq.com> --------- Signed-off-by: sjcsjc123 <1401189096@qq.com> --- .github/workflows/pika.yml | 12 +- tools/pika_keys_analysis/app.go | 138 ++++++ tools/pika_keys_analysis/cli/Readme.md | 39 ++ tools/pika_keys_analysis/cli/config.yaml | 21 + tools/pika_keys_analysis/cli/main.go | 24 + tools/pika_keys_analysis/compress.go | 72 +++ tools/pika_keys_analysis/config.go | 56 +++ tools/pika_keys_analysis/go.mod | 28 ++ tools/pika_keys_analysis/go.sum | 93 ++++ tools/pika_keys_analysis/max_heap.go | 80 ++++ tools/pika_keys_analysis/max_heap_test.go | 33 ++ tools/pika_keys_analysis/pika.go | 520 ++++++++++++++++++++++ tools/pika_keys_analysis/pika_test.go | 148 ++++++ 13 files changed, 1261 insertions(+), 3 deletions(-) create mode 100644 tools/pika_keys_analysis/app.go create mode 100644 tools/pika_keys_analysis/cli/Readme.md create mode 100644 tools/pika_keys_analysis/cli/config.yaml create mode 100644 tools/pika_keys_analysis/cli/main.go create mode 100644 tools/pika_keys_analysis/compress.go create mode 100644 tools/pika_keys_analysis/config.go create mode 100644 tools/pika_keys_analysis/go.mod create mode 100644 tools/pika_keys_analysis/go.sum create mode 100644 tools/pika_keys_analysis/max_heap.go create mode 100644 tools/pika_keys_analysis/max_heap_test.go create mode 100644 tools/pika_keys_analysis/pika.go create mode 100644 tools/pika_keys_analysis/pika_test.go diff --git a/.github/workflows/pika.yml b/.github/workflows/pika.yml index 8541f15c42..6e817fdd3d 100644 --- a/.github/workflows/pika.yml +++ b/.github/workflows/pika.yml @@ -84,7 +84,9 @@ jobs: - name: Run Go E2E Tests working-directory: ${{ github.workspace }}/build run: | - cd ../tests/integration/ + cd ../tools/pika_keys_analysis/ + go test -v ./... + cd ../../tests/integration/ chmod +x integrate_test.sh sh integrate_test.sh @@ -152,7 +154,9 @@ jobs: - name: Run Go E2E Tests working-directory: ${{ github.workspace }}/build run: | - cd ../tests/integration/ + cd ../tools/pika_keys_analysis/ + go test -v ./... + cd ../../tests/integration/ chmod +x integrate_test.sh sh integrate_test.sh @@ -210,7 +214,9 @@ jobs: - name: Run Go E2E Tests working-directory: ${{ github.workspace }}/build run: | - cd ../tests/integration/ + cd ../tools/pika_keys_analysis/ + go test -v ./... + cd ../../tests/integration/ chmod +x integrate_test.sh sh integrate_test.sh diff --git a/tools/pika_keys_analysis/app.go b/tools/pika_keys_analysis/app.go new file mode 100644 index 0000000000..a078b84172 --- /dev/null +++ b/tools/pika_keys_analysis/app.go @@ -0,0 +1,138 @@ +package pika_keys_analysis + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/desertbit/grumble" + "github.com/fatih/color" +) + +var App = grumble.New(&grumble.Config{ + Name: "pika_keys_analysis", + Description: "A tool for analyzing keys in Pika", + HistoryFile: "/tmp/.pika_keys_analysis_history", + Prompt: "pika_keys_analysis > ", + HistoryLimit: 100, + ErrorColor: color.New(color.FgRed, color.Bold, color.Faint), + HelpHeadlineColor: color.New(color.FgGreen), + HelpHeadlineUnderline: false, + HelpSubCommands: true, + PromptColor: color.New(color.FgBlue, color.Bold), + Flags: func(f *grumble.Flags) {}, +}) + +func init() { + App.OnInit(func(a *grumble.App, fm grumble.FlagMap) error { + return nil + }) + App.SetPrintASCIILogo(func(a *grumble.App) { + fmt.Println(strings.Join([]string{` + ............. .... ..... ..... ..... + ################# #### ##### ##### ####### + #### ##### #### ##### ##### ######### + #### ##### #### ##### ##### #### ##### + #### ##### #### ##### ##### #### ##### + ################ #### ##### ##### #### ##### + #### #### ##### ##### ################# + #### #### ##### ###### ##### ##### + #### #### ##### ###### ##### ##### +`}, "\r\n")) + }) + register(App) +} + +func register(app *grumble.App) { + app.AddCommand(&grumble.Command{ + Name: "bigKey", + Help: "list the big keys", + LongHelp: "list the big keys", + Run: func(c *grumble.Context) error { + listBigKeys, err := PikaInstance.ListBigKeysByScan(context.Background()) + if err != nil { + return err + } + start := time.Now() + for keyType, data := range listBigKeys { + fmt.Printf("Type: %s, Head: %d\n", keyType, Head) + if len(data.GetTopN(Head)) == 0 { + fmt.Println("No big key found") + } + for _, v := range data.GetTopN(Head) { + fmt.Printf("Key : %s, Size: %d, From: %s\n", v.Key, v.UsedSize, v.Client) + } + } + end := time.Now() + if PrintKeyNum { + fmt.Println("Total Key Number:", PikaInstance.GetTotalKeyNumber()) + } + fmt.Println("Cost Time:", end.Sub(start)) + return nil + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "apply", + Help: "Apply the settings to Pika", + LongHelp: "Apply the settings to Pika", + Args: func(a *grumble.Args) { + a.String("filename", "The configuration file") + }, + Run: func(c *grumble.Context) error { + filename := c.Args.String("filename") + return Init(filename) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "compress", + Help: "Compress the big keys", + LongHelp: "Compress the big keys and store them to pika", + Args: func(a *grumble.Args) { + a.String("key", "The key to compress") + }, + Run: func(c *grumble.Context) error { + key := c.Args.String("key") + return PikaInstance.CompressKey(context.Background(), key) + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "decompress", + Help: "Decompress the big keys", + LongHelp: "Decompress the big keys and store them to pika", + Args: func(a *grumble.Args) { + a.String("key", "The key to decompress") + }, + Flags: func(f *grumble.Flags) { + f.Bool("s", "save", false, "Save the decompressed value to pika") + }, + Run: func(c *grumble.Context) error { + key := c.Args.String("key") + save := c.Flags.Bool("save") + decompressKey, err := PikaInstance.DecompressKey(context.Background(), key, save) + if err != nil { + return err + } + fmt.Printf("Key: %s, Decompress: %s\n", key, decompressKey) + return nil + }, + }) + + app.AddCommand(&grumble.Command{ + Name: "recover", + Help: "Recover the big keys", + LongHelp: "Recover the big keys and store them to pika", + Args: func(a *grumble.Args) { + a.String("key", "The key to recover") + a.String("newKey", "The new key to store the recovered value") + }, + Run: func(c *grumble.Context) error { + key := c.Args.String("key") + newKey := c.Args.String("newKey") + return PikaInstance.RecoverKey(context.Background(), key, newKey) + }, + }) +} diff --git a/tools/pika_keys_analysis/cli/Readme.md b/tools/pika_keys_analysis/cli/Readme.md new file mode 100644 index 0000000000..ebd37edcad --- /dev/null +++ b/tools/pika_keys_analysis/cli/Readme.md @@ -0,0 +1,39 @@ +# What is this? +This is a tool to analyze the keys of a pika cluster. +# How to use? +## 1. Install +```shell +go build -o pika_keys_analysis main.go +``` +## 2. Start +```shell +./pika_keys_analysis config.yaml +``` +## 3. List big keys +```shell +bigKey +``` +## 4. Apply Config +```shell +apply config.yaml +``` +## 5. Compress Key +```shell +compress +``` +## 6. Decompress Key +- not save to pika +```shell +decompress +``` +- save to pika +```shell +decompress -s +``` +## 7. Recover Key +```shell +recover +``` +# Notice + +When using compression and decompression functions, errors in operation may cause duplicate compression or decompression, and the files used for recovery may be overwritten. If they are overwritten, the decompress command can be used to reach a state where decompression cannot continue, and then continue to compress to use the recover command normally \ No newline at end of file diff --git a/tools/pika_keys_analysis/cli/config.yaml b/tools/pika_keys_analysis/cli/config.yaml new file mode 100644 index 0000000000..e8dfca3588 --- /dev/null +++ b/tools/pika_keys_analysis/cli/config.yaml @@ -0,0 +1,21 @@ +pika: + - addr: 127.0.0.1:9221 + db: 0 + password: "" + + - addr: 127.0.0.1:9221 + db: 1 + password: "" + +scan-size: 1000 # scan size per time +concurrency: 1000 # goroutine num +head: 30 # show top head keys +type: + - string + - hash + - list + - set + - zset +memory: 2000 # Memory limit, unit: MB +print: true # Print key number or not, will use keys command +save: ./save/ # Save dir path, will save key value pairs when compress diff --git a/tools/pika_keys_analysis/cli/main.go b/tools/pika_keys_analysis/cli/main.go new file mode 100644 index 0000000000..449528e2ca --- /dev/null +++ b/tools/pika_keys_analysis/cli/main.go @@ -0,0 +1,24 @@ +package main + +import ( + "fmt" + "os" + "pika/tools/pika_keys_analysis" + + "github.com/desertbit/grumble" +) + +func main() { + if len(os.Args) != 2 { + fmt.Println("Usage: pika_keys_analysis ") + os.Exit(1) + } + err := pika_keys_analysis.Init(os.Args[1]) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + os.Args = os.Args[0:1] + grumble.Main(pika_keys_analysis.App) +} diff --git a/tools/pika_keys_analysis/compress.go b/tools/pika_keys_analysis/compress.go new file mode 100644 index 0000000000..37b9381ce4 --- /dev/null +++ b/tools/pika_keys_analysis/compress.go @@ -0,0 +1,72 @@ +package pika_keys_analysis + +import ( + "bytes" + "compress/gzip" + "io/ioutil" + "os" + "path/filepath" +) + +func compress(data []byte) ([]byte, error) { + var compressedData bytes.Buffer + writer := gzip.NewWriter(&compressedData) + + _, err := writer.Write(data) + if err != nil { + return nil, err + } + + err = writer.Close() + if err != nil { + return nil, err + } + + return compressedData.Bytes(), nil +} + +func decompress(compressedData []byte) ([]byte, error) { + reader, err := gzip.NewReader(bytes.NewReader(compressedData)) + if err != nil { + return nil, err + } + + decompressedData, err := ioutil.ReadAll(reader) + if err != nil { + return nil, err + } + + err = reader.Close() + if err != nil { + return nil, err + } + + return decompressedData, nil +} + +func isCompressed(data []byte) bool { + return len(data) > 2 && data[0] == 0x1f && data[1] == 0x8b +} + +// saveLocal saves the key-value pair to local file system. +func saveLocal(key []byte, value []byte) error { + _, err := os.ReadDir(Save) + if err != nil { + if os.IsNotExist(err) { + err = os.MkdirAll(Save, 0755) + if err != nil { + return err + } + } else { + return err + } + } + filename := filepath.Join(Save, string(key)) + file, err := os.Create(filename) + if err != nil { + return err + } + defer file.Close() + _, err = file.Write(value) + return err +} diff --git a/tools/pika_keys_analysis/config.go b/tools/pika_keys_analysis/config.go new file mode 100644 index 0000000000..40166f2c7b --- /dev/null +++ b/tools/pika_keys_analysis/config.go @@ -0,0 +1,56 @@ +package pika_keys_analysis + +import ( + "os" + + "gopkg.in/yaml.v3" +) + +var ( + PikaInstance *Pika + ScanSize = 1000 + GoroutineNum = 100 + Head = 10 + Type = []string{"string", "hash", "list", "set", "zset"} + MemoryLimit = 1024 * 1024 * 200 + PrintKeyNum = false + Save = "./save/" +) + +type Config struct { + PikaConfig []PikaConfig `yaml:"pika"` + Concurrency int `yaml:"concurrency"` + ScanSize int `yaml:"scan-size"` + Head int `yaml:"head"` + MemoryLimit int `yaml:"memory"` + Type []string `yaml:"type"` + PrintKeyNum bool `yaml:"print"` + Save string `yaml:"save"` +} + +type PikaConfig struct { + Addr string `yaml:"addr"` + Password string `yaml:"password"` + DB int `yaml:"db"` +} + +func Init(filename string) error { + bytes, err := os.ReadFile(filename) + if err != nil { + return err + } + config := Config{} + err = yaml.Unmarshal(bytes, &config) + if err != nil { + return err + } + PikaInstance = NewPika(config.PikaConfig) + ScanSize = config.ScanSize + GoroutineNum = config.Concurrency + Head = config.Head + Type = config.Type + MemoryLimit = config.MemoryLimit * 1024 * 1024 + PrintKeyNum = config.PrintKeyNum + Save = config.Save + return nil +} diff --git a/tools/pika_keys_analysis/go.mod b/tools/pika_keys_analysis/go.mod new file mode 100644 index 0000000000..6b271bffb3 --- /dev/null +++ b/tools/pika_keys_analysis/go.mod @@ -0,0 +1,28 @@ +module pika/tools/pika_keys_analysis + +go 1.19 + +require ( + github.com/desertbit/grumble v1.1.3 + github.com/fatih/color v1.16.0 + github.com/go-redis/redis/v8 v8.11.5 + github.com/google/uuid v1.4.0 + github.com/stretchr/testify v1.8.4 + gopkg.in/yaml.v3 v3.0.1 +) + +require ( + github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/desertbit/closer/v3 v3.1.2 // indirect + github.com/desertbit/columnize v2.1.0+incompatible // indirect + github.com/desertbit/go-shlex v0.1.1 // indirect + github.com/desertbit/readline v1.5.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/hashicorp/go-multierror v1.1.0 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/sys v0.14.0 // indirect +) diff --git a/tools/pika_keys_analysis/go.sum b/tools/pika_keys_analysis/go.sum new file mode 100644 index 0000000000..57a19c5e35 --- /dev/null +++ b/tools/pika_keys_analysis/go.sum @@ -0,0 +1,93 @@ +github.com/Netflix/go-expect v0.0.0-20180615182759-c93bf25de8e8/go.mod h1:oX5x61PbNXchhh0oikYAH+4Pcfw5LKv21+Jnpr6r6Pc= +github.com/Netflix/go-expect v0.0.0-20190729225929-0e00d9168667/go.mod h1:oX5x61PbNXchhh0oikYAH+4Pcfw5LKv21+Jnpr6r6Pc= +github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= +github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/chzyer/logex v1.1.10 h1:Swpa1K6QvQznwJRcfTfQJmTE72DqScAa40E+fbHEXEE= +github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= +github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1 h1:q763qf9huN11kDQavWsoZXJNW3xEE4JJyHa5Q25/sd8= +github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= +github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/desertbit/closer/v3 v3.1.2 h1:a6+2DmwIcNygW04XXWYq+Qp2X9uIk9QbZCP9//qEkb0= +github.com/desertbit/closer/v3 v3.1.2/go.mod h1:AAC4KRd8DC40nwvV967J/kDFhujMEiuwIKQfN0IDxXw= +github.com/desertbit/columnize v2.1.0+incompatible h1:h55rYmdrWoTj7w9aAnCkxzM3C2Eb8zuFa2W41t0o5j0= +github.com/desertbit/columnize v2.1.0+incompatible/go.mod h1:5kPrzQwKbQ8E5D28nvTVPqIBJyj+8jvJzwt6HXZvXgI= +github.com/desertbit/go-shlex v0.1.1 h1:c65HnbgX1QyC6kPL1dMzUpZ4puNUE6ai/eVucWNLNsk= +github.com/desertbit/go-shlex v0.1.1/go.mod h1:Qbb+mJNud5AypgHZ81EL8syOGaWlwvAOTqS7XmWI4pQ= +github.com/desertbit/grumble v1.1.3 h1:gbdgVGWsHmNraJ7Gn6Q4TiUEIHU/UHfbc1KUSbBlgYU= +github.com/desertbit/grumble v1.1.3/go.mod h1:r7j3ShNy5EmOsegRD2DzTutIaGiLiA3M5yBTXXeLwcs= +github.com/desertbit/readline v1.5.1 h1:/wOIZkWYl1s+IvJm/5bOknfUgs6MhS9svRNZpFM53Os= +github.com/desertbit/readline v1.5.1/go.mod h1:pHQgTsCFs9Cpfh5mlSUFi9Xa5kkL4d8L1Jo4UVWzPw0= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= +github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= +github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= +github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568 h1:BHsljHzVlRcyQhjrss6TZTdY2VfCqZPbv5k3iBFa2ZQ= +github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= +github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI= +github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA= +github.com/hinshun/vt10x v0.0.0-20180616224451-1954e6464174/go.mod h1:DqJ97dSdRW1W22yXSB90986pcOyQ7r45iio1KN2ez1A= +github.com/hinshun/vt10x v0.0.0-20180809195222-d55458df857c/go.mod h1:DqJ97dSdRW1W22yXSB90986pcOyQ7r45iio1KN2ez1A= +github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= +github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= +github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= +github.com/nbutton23/zxcvbn-go v0.0.0-20180912185939-ae427f1e4c1d/go.mod h1:o96djdrsSGy3AWPyBgZMAGfxZNfgntdJG+11KU4QvbU= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +golang.org/x/crypto v0.0.0-20190123085648-057139ce5d2b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0= +golang.org/x/sys v0.0.0-20180606202747-9527bec2660b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201009025420-dfb3f7c4e634/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= +golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +gopkg.in/AlecAivazis/survey.v1 v1.8.5/go.mod h1:iBNOmqKz/NUbZx3bA+4hAGLRC7fSK7tgtVDT4tB22XA= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/tools/pika_keys_analysis/max_heap.go b/tools/pika_keys_analysis/max_heap.go new file mode 100644 index 0000000000..fe8affc8ca --- /dev/null +++ b/tools/pika_keys_analysis/max_heap.go @@ -0,0 +1,80 @@ +package pika_keys_analysis + +import ( + "container/heap" + "runtime" + "sync" +) + +var memStats runtime.MemStats + +type KeyWithMemory struct { + Key string + UsedSize int64 + Client string +} + +type MaxHeap struct { + data []KeyWithMemory + mu sync.Mutex +} + +func NewFixedSizeMinHeap() *MaxHeap { + return &MaxHeap{ + data: make([]KeyWithMemory, 0), + mu: sync.Mutex{}, + } +} + +// Add element to heap +func (h *MaxHeap) Add(value KeyWithMemory) { + h.mu.Lock() + defer h.mu.Unlock() + runtime.ReadMemStats(&memStats) + for memStats.Alloc > uint64(MemoryLimit) { + heap.Pop(h) + runtime.ReadMemStats(&memStats) + } + heap.Push(h, value) +} + +// Len implements heap.Interface's Len method +func (h *MaxHeap) Len() int { + return len(h.data) +} + +// Less implements heap.Interface's Less method +func (h *MaxHeap) Less(i, j int) bool { + return h.data[i].UsedSize > h.data[j].UsedSize +} + +// Swap implements heap.Interface's Swap method +func (h *MaxHeap) Swap(i, j int) { + h.data[i], h.data[j] = h.data[j], h.data[i] +} + +// Push implements heap.Interface's Push method +func (h *MaxHeap) Push(x interface{}) { + h.data = append(h.data, x.(KeyWithMemory)) +} + +// Pop implements heap.Interface's Pop method +func (h *MaxHeap) Pop() interface{} { + old := h.data + n := len(old) + x := old[n-1] + h.data = old[0 : n-1] + return x +} + +// GetTopN get top n elements from heap +func (h *MaxHeap) GetTopN(head int) []KeyWithMemory { + res := make([]KeyWithMemory, 0) + for i := 0; i < head; i++ { + if h.Len() == 0 { + break + } + res = append(res, heap.Pop(h).(KeyWithMemory)) + } + return res +} diff --git a/tools/pika_keys_analysis/max_heap_test.go b/tools/pika_keys_analysis/max_heap_test.go new file mode 100644 index 0000000000..325ae96f5f --- /dev/null +++ b/tools/pika_keys_analysis/max_heap_test.go @@ -0,0 +1,33 @@ +package pika_keys_analysis + +import ( + "math/rand" + "sort" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestHeap(t *testing.T) { + keyList := make([]KeyWithMemory, 0) + for i := 0; i < 100; i++ { + keyList = append(keyList, KeyWithMemory{ + Key: "test", + UsedSize: int64(rand.Intn(2000)), + Client: strconv.Itoa(i), + }) + } + maxHeap := NewFixedSizeMinHeap() + for i := 0; i < 100; i++ { + maxHeap.Add(keyList[i]) + } + // data from heap + data := maxHeap.GetTopN(100) + // data from sort + sort.Slice(keyList, func(i, j int) bool { + return keyList[i].UsedSize > keyList[j].UsedSize + }) + // compare + assert.ElementsMatch(t, data, keyList[:100]) +} diff --git a/tools/pika_keys_analysis/pika.go b/tools/pika_keys_analysis/pika.go new file mode 100644 index 0000000000..47a7cf5eaf --- /dev/null +++ b/tools/pika_keys_analysis/pika.go @@ -0,0 +1,520 @@ +package pika_keys_analysis + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + + "github.com/go-redis/redis/v8" +) + +// Pika contains all clients +type Pika struct { + clients []*redis.Client +} + +// NewPika create a new pika instance +func NewPika(configs []PikaConfig) *Pika { + pika := &Pika{} + for _, config := range configs { + pika.clients = append(pika.clients, redis.NewClient(&redis.Options{ + Addr: config.Addr, + Password: config.Password, + DB: config.DB, + })) + } + return pika +} + +// SolveSingleClient solve single client +func SolveSingleClient(client *redis.Client, ctx context.Context, wg *sync.WaitGroup, ch chan map[string]*MaxHeap) { + defer wg.Done() + localHeapMap := make(map[string]*MaxHeap) + for k := range Type { + localHeapMap[Type[k]] = NewFixedSizeMinHeap() + } + var cursor int64 = 0 + goroutineCh := make(chan struct{}, GoroutineNum/len(PikaInstance.clients)) + wgClient := sync.WaitGroup{} + startIterator := client.Scan(ctx, 0, "*", 1).Iterator() + endKey := "" + if startIterator.Next(ctx) { + endKey = startIterator.Val() + } else { + fmt.Printf("Current client %s done, waiting for task to finish\n", client) + return + } + start := false + for { + wgClient.Add(1) + go func(curCursor int64) { + defer wgClient.Done() + goroutineCh <- struct{}{} + defer func() { + <-goroutineCh + }() + keys, _, err := client.Scan(ctx, uint64(curCursor), "*", int64(ScanSize)).Result() + if err != nil { + _ = fmt.Errorf("scan error: %s", err) + return + } + for _, key := range keys { + dataType, err := client.Type(ctx, key).Result() + if err != nil { + continue + } + if stringInSlice(dataType, Type) { + var usedSize int64 = 0 + switch dataType { + case "string": + usedSize = client.StrLen(ctx, key).Val() + case "hash": + allMap := client.HGetAll(ctx, key).Val() + for k, v := range allMap { + usedSize += int64(len(k) + len(v)) + } + case "list": + list := client.LRange(ctx, key, 0, -1).Val() + for _, v := range list { + usedSize += int64(len(v)) + } + case "set": + set := client.SMembers(ctx, key).Val() + for _, v := range set { + usedSize += int64(len(v)) + } + case "zset": + zset := client.ZRangeWithScores(ctx, key, 0, -1).Val() + for _, v := range zset { + usedSize += int64(len(v.Member.(string)) + 8) + } + } + localHeapMap[dataType].Add(KeyWithMemory{ + Key: key, + UsedSize: usedSize, + Client: client.String(), + }) + } + } + }(cursor) + keys, _, err := client.Scan(ctx, uint64(cursor), "*", 1).Result() + if err != nil { + _ = fmt.Errorf("scan error: %s", err) + return + } + if start { + if len(keys) == 0 || keys[0] == endKey { + break + } + } + cursor += int64(ScanSize) + start = true + } + fmt.Printf("Current client %s done, waiting for task to finish\n", client) + wgClient.Wait() + ch <- localHeapMap +} + +// ListBigKeysByScan list all keys in all clients +func (pika *Pika) ListBigKeysByScan(ctx context.Context) (map[string]*MaxHeap, error) { + result := make(map[string]*MaxHeap) + wg := sync.WaitGroup{} + resultCh := make(chan map[string]*MaxHeap, len(pika.clients)) + + for _, client := range pika.clients { + wg.Add(1) + go func(c *redis.Client) { + SolveSingleClient(c, ctx, &wg, resultCh) + }(client) + } + + wg.Wait() + close(resultCh) + + for localHeapMap := range resultCh { + for dataType, localHeap := range localHeapMap { + if _, ok := result[dataType]; !ok { + result[dataType] = NewFixedSizeMinHeap() + } + for _, keyWithMemory := range localHeap.data { + result[dataType].Add(keyWithMemory) + } + } + } + + return result, nil +} + +func (pika *Pika) GetTotalKeyNumber() int { + var totalKeyNumber = 0 + for _, client := range pika.clients { + keyNumber := client.Keys(context.Background(), "*").Val() + totalKeyNumber += len(keyNumber) + } + return totalKeyNumber +} + +func (pika *Pika) CompressKey(context context.Context, key string) error { + for _, client := range pika.clients { + exist := client.Exists(context, key).Val() + if exist == 0 { + continue + } + dataType := client.Type(context, key).Val() + switch dataType { + case "string": + val := client.Get(context, key).Val() + err := saveLocal([]byte(key), []byte(val)) + if err != nil { + fmt.Printf("Current client %s cannot save %s to local: %s\n", client, key, err) + continue + } + compressedVal, err := compress([]byte(val)) + if err != nil { + fmt.Printf("Current client %s compress key %s error: %s\n", client, key, err) + continue + } + _, err = client.Set(context, key, compressedVal, 0).Result() + if err != nil { + fmt.Printf("Current client %s compress key %s error: %s\n", client, key, err) + continue + } + fmt.Printf("Original value: %s\n", val) + fmt.Printf("Compressed value: %s\n", compressedVal) + fmt.Printf("Current client %s compress key %s success\n", client, key) + case "hash": + val := client.HGetAll(context, key).Val() + err := saveLocal([]byte(key), []byte(fmt.Sprintf("%v", val))) + if err != nil { + fmt.Printf("Current client %s cannot save %s to local: %s\n", client, key, err) + continue + } + for k, v := range val { + compressedVal, err := compress([]byte(v)) + if err != nil { + fmt.Printf("Current client %s compress key %s error: %s\n", client, key, err) + continue + } + _, err = client.HSet(context, key, k, compressedVal).Result() + if err != nil { + fmt.Printf("Current client %s compress key %s error: %s\n", client, key, err) + continue + } + fmt.Printf("Original value: %s\n", val) + fmt.Printf("Compressed value: %s\n", compressedVal) + } + fmt.Printf("Current client %s compress key %s success\n", client, key) + case "list": + val := client.LRange(context, key, 0, -1).Val() + err := saveLocal([]byte(key), []byte(fmt.Sprintf("%v", val))) + if err != nil { + fmt.Printf("Current client %s cannot save %s to local: %s\n", client, key, err) + continue + } + for _, v := range val { + compressedVal, err := compress([]byte(v)) + if err != nil { + fmt.Printf("Current client %s compress key %s error: %s\n", client, key, err) + continue + } + _, err = client.LPush(context, key, compressedVal).Result() + if err != nil { + fmt.Printf("Current client %s compress key %s error: %s\n", client, key, err) + continue + } + // delete the original value + _, err = client.LRem(context, key, 1, v).Result() + if err != nil { + fmt.Printf("Current client %s compress key %s error: %s\n", client, key, err) + continue + } + fmt.Printf("Original value: %s\n", val) + fmt.Printf("Compressed value: %s\n", compressedVal) + } + fmt.Printf("Current client %s compress key %s success\n", client, key) + case "set": + val := client.SMembers(context, key).Val() + err := saveLocal([]byte(key), []byte(fmt.Sprintf("%v", val))) + if err != nil { + fmt.Printf("Current client %s cannot save %s to local: %s\n", client, key, err) + continue + } + for _, v := range val { + compressedVal, err := compress([]byte(v)) + if err != nil { + fmt.Printf("Current client %s compress key %s error: %s\n", client, key, err) + continue + } + _, err = client.SAdd(context, key, compressedVal).Result() + if err != nil { + fmt.Printf("Current client %s compress key %s error: %s\n", client, key, err) + continue + } + // delete the original value + _, err = client.SRem(context, key, v).Result() + if err != nil { + fmt.Printf("Current client %s compress key %s error: %s\n", client, key, err) + continue + } + fmt.Printf("Original value: %s\n", val) + fmt.Printf("Compressed value: %s\n", compressedVal) + } + fmt.Printf("Current client %s compress key %s success\n", client, key) + case "zset": + val := client.ZRangeWithScores(context, key, 0, -1).Val() + err := saveLocal([]byte(key), []byte(fmt.Sprintf("%v", val))) + if err != nil { + fmt.Printf("Current client %s cannot save %s to local: %s\n", client, key, err) + continue + } + for _, v := range val { + compressedVal, err := compress([]byte(v.Member.(string))) + if err != nil { + fmt.Printf("Current client %s compress key %s error: %s\n", client, key, err) + continue + } + _, err = client.ZAdd(context, key, &redis.Z{ + Score: v.Score, + Member: compressedVal, + }).Result() + if err != nil { + fmt.Printf("Current client %s compress key %s error: %s\n", client, key, err) + continue + } + // delete the original value + _, err = client.ZRem(context, key, v.Member.(string)).Result() + if err != nil { + fmt.Printf("Current client %s compress key %s error: %s\n", client, key, err) + continue + } + fmt.Printf("Original value: %v\n", val) + fmt.Printf("Compressed value: %s\n", compressedVal) + } + fmt.Printf("Current client %s compress key %s success\n", client, key) + } + } + return nil +} + +func (pika *Pika) DecompressKey(context context.Context, key string, save bool) (interface{}, error) { + for _, client := range pika.clients { + exist := client.Exists(context, key).Val() + if exist == 0 { + continue + } + dataType := client.Type(context, key).Val() + switch dataType { + case "string": + val := client.Get(context, key).Val() + decompressedVal, err := decompress([]byte(val)) + if err != nil { + // maybe not compress, just continue + continue + } + if save { + _, err := client.Set(context, key, string(decompressedVal), 0).Result() + if err != nil { + fmt.Printf("Current client %s decompress key %s error: %s\n", client, key, err) + continue + } + } + return string(decompressedVal), nil + case "hash": + val := client.HGetAll(context, key).Val() + type Hash struct { + Field string + Value string + } + var hash []Hash + for k, v := range val { + decompressedVal, err := decompress([]byte(v)) + if err != nil { + // maybe not compress, just continue + continue + } + if save { + _, err := client.HSet(context, key, k, string(decompressedVal)).Result() + if err != nil { + fmt.Printf("Current client %s decompress key %s error: %s\n", client, key, err) + continue + } + } + hash = append(hash, Hash{ + Field: k, + Value: string(decompressedVal), + }) + } + return hash, nil + case "list": + val := client.LRange(context, key, 0, -1).Val() + list := make([]string, 0) + for _, v := range val { + decompressedVal, err := decompress([]byte(v)) + if err != nil { + // maybe not compress, just continue + continue + } + if save { + _, err := client.LPush(context, key, string(decompressedVal)).Result() + if err != nil { + fmt.Printf("Current client %s decompress key %s error: %s\n", client, key, err) + continue + } + } + list = append(list, string(decompressedVal)) + } + return list, nil + case "set": + val := client.SMembers(context, key).Val() + set := make([]string, 0) + for _, v := range val { + decompressedVal, err := decompress([]byte(v)) + if err != nil { + continue + } + if save { + _, err := client.SAdd(context, key, string(decompressedVal)).Result() + if err != nil { + fmt.Printf("Current client %s decompress key %s error: %s\n", client, key, err) + continue + } + } + set = append(set, string(decompressedVal)) + } + return set, nil + case "zset": + val := client.ZRangeWithScores(context, key, 0, -1).Val() + zset := make([]redis.Z, 0) + for _, v := range val { + decompressedVal, err := decompress([]byte(v.Member.(string))) + if err != nil { + continue + } + if save { + _, err := client.ZAdd(context, key, &redis.Z{ + Score: v.Score, + Member: string(decompressedVal), + }).Result() + if err != nil { + fmt.Printf("Current client %s decompress key %s error: %s\n", client, key, err) + continue + } + } + zset = append(zset, redis.Z{ + Score: v.Score, + Member: string(decompressedVal), + }) + } + return zset, nil + } + } + return nil, fmt.Errorf("key %s not found or has been decompressed", key) +} + +func (pika *Pika) RecoverKey(context context.Context, from string, to string) error { + file, err := os.ReadFile(filepath.Join(Save, from)) + if err != nil { + return err + } + for _, client := range pika.clients { + exist := client.Exists(context, from).Val() + if exist == 0 { + continue + } + dataType := client.Type(context, from).Val() + switch dataType { + case "string": + _, err := client.Set(context, to, string(file), 0).Result() + if err != nil { + fmt.Printf("Current client %s recover key %s error: %s\n", client, to, err) + continue + } + fmt.Printf("New Key: %s, Value: %s\n", to, string(file)) + return nil + case "hash": + var hash []map[string]string + err := json.Unmarshal(file, &hash) + if err != nil { + fmt.Printf("Current client %s recover key %s error: %s\n", client, from, err) + continue + } + for _, v := range hash { + for k, v := range v { + _, err := client.HSet(context, to, k, v).Result() + if err != nil { + fmt.Printf("Current client %s recover key %s error: %s\n", client, to, err) + continue + } + } + } + fmt.Printf("New Key: %s, Value: %s\n", to, string(file)) + return nil + case "list": + var list []string + err := json.Unmarshal(file, &list) + if err != nil { + fmt.Printf("Current client %s recover key %s error: %s\n", client, to, err) + continue + } + for _, v := range list { + _, err := client.LPush(context, to, v).Result() + if err != nil { + fmt.Printf("Current client %s recover key %s error: %s\n", client, to, err) + continue + } + } + fmt.Printf("New Key: %s, Value: %s\n", to, string(file)) + return nil + case "set": + var set []string + err := json.Unmarshal(file, &set) + if err != nil { + fmt.Printf("Current client %s recover key %s error: %s\n", client, to, err) + continue + } + for _, v := range set { + _, err := client.SAdd(context, to, v).Result() + if err != nil { + fmt.Printf("Current client %s recover key %s error: %s\n", client, to, err) + continue + } + } + fmt.Printf("New Key: %s, Value: %s\n", to, string(file)) + return nil + case "zset": + var zset []redis.Z + err := json.Unmarshal(file, &zset) + if err != nil { + fmt.Printf("Current client %s recover key %s error: %s\n", client, to, err) + continue + } + for _, v := range zset { + _, err := client.ZAdd(context, to, &redis.Z{ + Score: v.Score, + Member: v.Member, + }).Result() + if err != nil { + fmt.Printf("Current client %s recover key %s error: %s\n", client, to, err) + continue + } + } + fmt.Printf("New Key: %s, Value: %s\n", to, string(file)) + return nil + } + } + return fmt.Errorf("key %s not found", from) +} + +// stringInSlice Helper function to check if a string is in a slice of strings +func stringInSlice(str string, slice []string) bool { + for _, s := range slice { + if strings.ToLower(s) == strings.ToLower(str) { + return true + } + } + return false +} diff --git a/tools/pika_keys_analysis/pika_test.go b/tools/pika_keys_analysis/pika_test.go new file mode 100644 index 0000000000..e73c48d2a2 --- /dev/null +++ b/tools/pika_keys_analysis/pika_test.go @@ -0,0 +1,148 @@ +package pika_keys_analysis + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" +) + +func TestBefore(t *testing.T) { + err := Init("./cli/config.yaml") + assert.Nil(t, err) + for _, client := range PikaInstance.clients { + result, err := client.FlushAll(context.Background()).Result() + assert.Nil(t, err) + t.Log(result) + } +} + +func TestPutKeyToPika(t *testing.T) { + err := Init("./cli/config.yaml") + assert.Nil(t, err) + c1 := PikaInstance.clients[0] + c2 := PikaInstance.clients[1] + for i := 0; i < 5; i++ { + key := "first_big_key_" + uuid.New().String() + value := uuid.New().String() + value += value + value += value + value += value + value += value + value += value + c1.Set(context.Background(), key, value, 0) + } + for i := 0; i < 5; i++ { + key := "second_big_key_" + uuid.New().String() + value := uuid.New().String() + value += value + value += value + value += value + value += value + c2.Set(context.Background(), key, value, 0) + } + for i := 0; i < 5; i++ { + key := "third_big_key_" + uuid.New().String() + value := uuid.New().String() + value += value + value += value + value += value + c1.Set(context.Background(), key, value, 0) + } + for i := 0; i < 5; i++ { + key := "fourth_big_key_" + uuid.New().String() + value := uuid.New().String() + value += value + c1.Set(context.Background(), key, value, 0) + } + for i := 0; i < 10000; i++ { + key := "fifth_big_key_" + uuid.New().String() + value := uuid.New().String() + c1.Set(context.Background(), key, value, 0) + } + for i := 0; i < 10000; i++ { + key := "fifth_big_key_" + uuid.New().String() + value := uuid.New().String() + c2.Set(context.Background(), key, value, 0) + } + for i := 0; i < 10000; i++ { + key := "fifth_big_key_" + uuid.New().String() + value := uuid.New().String() + c1.Set(context.Background(), key, value, 0) + } + totalKeyNumber := PikaInstance.GetTotalKeyNumber() + assert.Equal(t, 30020, totalKeyNumber) +} + +func TestListBigKey(t *testing.T) { + err := Init("./cli/config.yaml") + assert.Nil(t, err) + totalKeyNumber := PikaInstance.GetTotalKeyNumber() + start := time.Now() + bigKeys, err := PikaInstance.ListBigKeysByScan(context.Background()) + assert.Nil(t, err) + for keyType, minHeap := range bigKeys { + data := minHeap.GetTopN(Head) + fmt.Printf("Type: %s, Head: %d\n", keyType, Head) + if len(data) == 0 { + fmt.Println("No big key found") + } + for _, v := range data { + prefixKey := v.Key + if len(prefixKey) > 20 { + prefixKey = prefixKey[:20] + } + fmt.Printf("Key Prefix: %s, Size: %d, From: %s\n", prefixKey, v.UsedSize, v.Client) + } + } + end := time.Now() + t.Log("Total Key Number:", totalKeyNumber) + t.Log("Cost Time:", end.Sub(start)) +} + +func TestCompressKeyAndDeCompress(t *testing.T) { + err := Init("./cli/config.yaml") + assert.Nil(t, err) + key := "test_compress_key" + value := uuid.New().String() + value += value + value += value + value += value + value += value + PikaInstance.clients[0].Set(context.Background(), key, value, 0) + // compress + err = PikaInstance.CompressKey(context.Background(), key) + assert.Nil(t, err) + // get value after compress + compressValue := PikaInstance.clients[0].Get(context.Background(), key).Val() + // decompress when not save to pika + decompressVal, err := PikaInstance.DecompressKey(context.Background(), key, false) + assert.Nil(t, err) + assert.Equal(t, value, decompressVal) + notSaveVal := PikaInstance.clients[0].Get(context.Background(), key).Val() + assert.Equal(t, compressValue, notSaveVal) + // decompress when save to pika + decompressVal, err = PikaInstance.DecompressKey(context.Background(), key, true) + assert.Nil(t, err) + saveVal := PikaInstance.clients[0].Get(context.Background(), key).Val() + assert.Equal(t, value, decompressVal) + assert.Equal(t, value, saveVal) + // recover + err = PikaInstance.RecoverKey(context.Background(), key, "recover_key") + assert.Nil(t, err) + recoverVal := PikaInstance.clients[0].Get(context.Background(), "recover_key").Val() + assert.Equal(t, value, recoverVal) +} + +func TestAfter(t *testing.T) { + err := Init("./cli/config.yaml") + assert.Nil(t, err) + for _, client := range PikaInstance.clients { + result, err := client.FlushAll(context.Background()).Result() + assert.Nil(t, err) + t.Log(result) + } +}