Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add big key tool #2195

Merged
merged 10 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions .github/workflows/pika.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ jobs:
- name: Run Go E2E Tests
working-directory: ${{ github.workspace }}/build
run: |
cd ../tests/integration/
cd ../tools/pika_keys_analysis/
go test -v ./...
cd ../../tests/integration/
chmod +x integrate_test.sh
sh integrate_test.sh

Expand Down Expand Up @@ -152,7 +154,9 @@ jobs:
- name: Run Go E2E Tests
working-directory: ${{ github.workspace }}/build
run: |
cd ../tests/integration/
cd ../tools/pika_keys_analysis/
go test -v ./...
cd ../../tests/integration/
chmod +x integrate_test.sh
sh integrate_test.sh

Expand Down Expand Up @@ -210,7 +214,9 @@ jobs:
- name: Run Go E2E Tests
working-directory: ${{ github.workspace }}/build
run: |
cd ../tests/integration/
cd ../tools/pika_keys_analysis/
go test -v ./...
cd ../../tests/integration/
chmod +x integrate_test.sh
sh integrate_test.sh

Expand Down
Empty file modified tests/integration/start_master_and_slave.sh
100644 → 100755
Empty file.
138 changes: 138 additions & 0 deletions tools/pika_keys_analysis/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package pika_keys_analysis

import (
"context"
"fmt"
"strings"
"time"

"github.com/desertbit/grumble"
"github.com/fatih/color"
)

var App = grumble.New(&grumble.Config{
Name: "pika_keys_analysis",
Description: "A tool for analyzing keys in Pika",
HistoryFile: "/tmp/.pika_keys_analysis_history",
Prompt: "pika_keys_analysis > ",
HistoryLimit: 100,
ErrorColor: color.New(color.FgRed, color.Bold, color.Faint),
HelpHeadlineColor: color.New(color.FgGreen),
HelpHeadlineUnderline: false,
HelpSubCommands: true,
PromptColor: color.New(color.FgBlue, color.Bold),
Flags: func(f *grumble.Flags) {},
})

func init() {
App.OnInit(func(a *grumble.App, fm grumble.FlagMap) error {
return nil
})
App.SetPrintASCIILogo(func(a *grumble.App) {
fmt.Println(strings.Join([]string{`
............. .... ..... ..... .....
################# #### ##### ##### #######
#### ##### #### ##### ##### #########
#### ##### #### ##### ##### #### #####
#### ##### #### ##### ##### #### #####
################ #### ##### ##### #### #####
#### #### ##### ##### #################
#### #### ##### ###### ##### #####
#### #### ##### ###### ##### #####
`}, "\r\n"))
})
register(App)
}

func register(app *grumble.App) {
app.AddCommand(&grumble.Command{
Name: "bigKey",
Help: "list the big keys",
LongHelp: "list the big keys",
Run: func(c *grumble.Context) error {
listBigKeys, err := PikaInstance.ListBigKeysByScan(context.Background())
if err != nil {
return err
}
start := time.Now()
for keyType, data := range listBigKeys {
fmt.Printf("Type: %s, Head: %d\n", keyType, Head)
if len(data.GetTopN(Head)) == 0 {
fmt.Println("No big key found")
}
for _, v := range data.GetTopN(Head) {
fmt.Printf("Key : %s, Size: %d, From: %s\n", v.Key, v.UsedSize, v.Client)
}
}
end := time.Now()
if PrintKeyNum {
fmt.Println("Total Key Number:", PikaInstance.GetTotalKeyNumber())
}
fmt.Println("Cost Time:", end.Sub(start))
return nil
},
})

app.AddCommand(&grumble.Command{
Name: "apply",
Help: "Apply the settings to Pika",
LongHelp: "Apply the settings to Pika",
Args: func(a *grumble.Args) {
a.String("filename", "The configuration file")
},
Run: func(c *grumble.Context) error {
filename := c.Args.String("filename")
return Init(filename)
},
})

app.AddCommand(&grumble.Command{
Name: "compress",
Help: "Compress the big keys",
LongHelp: "Compress the big keys and store them to pika",
Args: func(a *grumble.Args) {
a.String("key", "The key to compress")
},
Run: func(c *grumble.Context) error {
key := c.Args.String("key")
return PikaInstance.CompressKey(context.Background(), key)
},
})

app.AddCommand(&grumble.Command{
Name: "decompress",
Help: "Decompress the big keys",
LongHelp: "Decompress the big keys and store them to pika",
Args: func(a *grumble.Args) {
a.String("key", "The key to decompress")
},
Flags: func(f *grumble.Flags) {
f.Bool("s", "save", false, "Save the decompressed value to pika")
},
Run: func(c *grumble.Context) error {
key := c.Args.String("key")
save := c.Flags.Bool("save")
decompressKey, err := PikaInstance.DecompressKey(context.Background(), key, save)
if err != nil {
return err
}
fmt.Printf("Key: %s, Decompress: %s\n", key, decompressKey)
return nil
},
})

app.AddCommand(&grumble.Command{
Name: "recover",
Help: "Recover the big keys",
LongHelp: "Recover the big keys and store them to pika",
Args: func(a *grumble.Args) {
a.String("key", "The key to recover")
a.String("newKey", "The new key to store the recovered value")
},
Run: func(c *grumble.Context) error {
key := c.Args.String("key")
newKey := c.Args.String("newKey")
return PikaInstance.RecoverKey(context.Background(), key, newKey)
},
})
}
39 changes: 39 additions & 0 deletions tools/pika_keys_analysis/cli/Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# What is this?
This is a tool to analyze the keys of a pika cluster.
# How to use?
## 1. Install
```shell
go build -o pika_keys_analysis main.go
```
## 2. Start
```shell
./pika_keys_analysis config.yaml
```
## 3. List big keys
```shell
bigKey
```
## 4. Apply Config
```shell
apply config.yaml
```
## 5. Compress Key
```shell
compress <key>
```
## 6. Decompress Key
- not save to pika
```shell
decompress <key>
```
- save to pika
```shell
decompress -s <key>
```
## 7. Recover Key
```shell
recover <from> <to>
```
# Notice

When using compression and decompression functions, errors in operation may cause duplicate compression or decompression, and the files used for recovery may be overwritten. If they are overwritten, the decompress command can be used to reach a state where decompression cannot continue, and then continue to compress to use the recover command normally
21 changes: 21 additions & 0 deletions tools/pika_keys_analysis/cli/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
pika:
- addr: 127.0.0.1:9221
db: 0
password: ""

- addr: 127.0.0.1:9221
db: 1
password: ""

scan-size: 1000 # scan size per time
concurrency: 1000 # goroutine num
head: 30 # show top head keys
type:
- string
- hash
- list
- set
- zset
memory: 2000 # Memory limit, unit: MB
print: true # Print key number or not, will use keys command
save: ./save/ # Save dir path, will save key value pairs when compress
24 changes: 24 additions & 0 deletions tools/pika_keys_analysis/cli/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package main

import (
"fmt"
"os"
"pika/tools/pika_keys_analysis"

"github.com/desertbit/grumble"
)

func main() {
if len(os.Args) != 2 {
fmt.Println("Usage: pika_keys_analysis <config file>")
os.Exit(1)
}
err := pika_keys_analysis.Init(os.Args[1])
if err != nil {
fmt.Println(err)
os.Exit(1)
}

os.Args = os.Args[0:1]
grumble.Main(pika_keys_analysis.App)
}
72 changes: 72 additions & 0 deletions tools/pika_keys_analysis/compress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package pika_keys_analysis

import (
"bytes"
"compress/gzip"
"io/ioutil"
"os"
"path/filepath"
)

func compress(data []byte) ([]byte, error) {
var compressedData bytes.Buffer
writer := gzip.NewWriter(&compressedData)

_, err := writer.Write(data)
if err != nil {
return nil, err
}

err = writer.Close()
if err != nil {
return nil, err
}

return compressedData.Bytes(), nil
}

func decompress(compressedData []byte) ([]byte, error) {
reader, err := gzip.NewReader(bytes.NewReader(compressedData))
if err != nil {
return nil, err
}

decompressedData, err := ioutil.ReadAll(reader)
if err != nil {
return nil, err
}

err = reader.Close()
if err != nil {
return nil, err
}

return decompressedData, nil
}

func isCompressed(data []byte) bool {
return len(data) > 2 && data[0] == 0x1f && data[1] == 0x8b
}

// saveLocal saves the key-value pair to local file system.
func saveLocal(key []byte, value []byte) error {
_, err := os.ReadDir(Save)
if err != nil {
if os.IsNotExist(err) {
err = os.MkdirAll(Save, 0755)
if err != nil {
return err
}
} else {
return err
}
}
filename := filepath.Join(Save, string(key))
file, err := os.Create(filename)
if err != nil {
return err
}
defer file.Close()
_, err = file.Write(value)
return err
}
56 changes: 56 additions & 0 deletions tools/pika_keys_analysis/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package pika_keys_analysis

import (
"os"

"gopkg.in/yaml.v3"
)

var (
PikaInstance *Pika
ScanSize = 1000
GoroutineNum = 100
Head = 10
Type = []string{"string", "hash", "list", "set", "zset"}
MemoryLimit = 1024 * 1024 * 200
PrintKeyNum = false
Save = "./save/"
)

type Config struct {
PikaConfig []PikaConfig `yaml:"pika"`
Concurrency int `yaml:"concurrency"`
ScanSize int `yaml:"scan-size"`
Head int `yaml:"head"`
MemoryLimit int `yaml:"memory"`
Type []string `yaml:"type"`
PrintKeyNum bool `yaml:"print"`
Save string `yaml:"save"`
}

type PikaConfig struct {
Addr string `yaml:"addr"`
Password string `yaml:"password"`
DB int `yaml:"db"`
}

func Init(filename string) error {
bytes, err := os.ReadFile(filename)
if err != nil {
return err
}
config := Config{}
err = yaml.Unmarshal(bytes, &config)
if err != nil {
return err
}
PikaInstance = NewPika(config.PikaConfig)
ScanSize = config.ScanSize
GoroutineNum = config.Concurrency
Head = config.Head
Type = config.Type
MemoryLimit = config.MemoryLimit * 1024 * 1024
PrintKeyNum = config.PrintKeyNum
Save = config.Save
return nil
}
Loading