Skip to content

Commit

Permalink
Merge pull request #11 from xyctruth/develop
Browse files Browse the repository at this point in the history
optimize  remove clear func, add withTTL , add unit test
  • Loading branch information
xyctruth authored Dec 22, 2021
2 parents 5d2d355 + f053832 commit 2db55ca
Show file tree
Hide file tree
Showing 8 changed files with 323 additions and 307 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
IMG ?= profiler:latest
TEST_PATH ?=./pkg/...

.PHONY: docker-build
docker-build:
Expand All @@ -18,7 +19,7 @@ docker-push:

.PHONY: test
test:
go test -race -v -coverprofile=cover.out ./pkg/...
go test -race -v -coverprofile=cover.out ${TEST_PATH}

.PHONY: cover-ui
cover-ui: test
Expand Down
43 changes: 3 additions & 40 deletions pkg/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,9 @@ func (collector *Collector) run() {

collector.log.Info("collector run")

collector.mangerWg.Add(2)
collector.mangerWg.Add(1)

go collector.scrapeLoop(collector.Interval)
go collector.clearLoop()

}

func (collector *Collector) scrapeLoop(interval time.Duration) {
Expand All @@ -76,27 +74,6 @@ func (collector *Collector) scrapeLoop(interval time.Duration) {
}
}

func (collector *Collector) clearLoop() {
defer collector.mangerWg.Done()
collector.clear()

for {
// 每天24点执行
now := time.Now()
next := now.Add(time.Hour * 24)
next = time.Date(next.Year(), next.Month(), next.Day(), 0, 0, 0, 0, next.Location())
t := time.NewTimer(next.Sub(now))

select {
case <-collector.exitChan:
collector.log.Info("clear loop exit")
return
case <-t.C:
collector.clear()
}
}
}

func (collector *Collector) reload(target TargetConfig) {
collector.mu.Lock()
defer collector.mu.Unlock()
Expand All @@ -119,20 +96,6 @@ func (collector *Collector) exit() {
close(collector.exitChan)
}

func (collector *Collector) clear() {
collector.mu.RLock()
defer collector.mu.RUnlock()

if collector.Expiration <= 0 {
return
}
collector.log.Info("collector clear start")
err := collector.store.Clear(collector.TargetName, collector.Expiration)
if err != nil {
collector.log.WithError(err).Error("collector clear error")
}
}

func (collector *Collector) scrape() {
collector.mu.RLock()
defer collector.mu.RUnlock()
Expand Down Expand Up @@ -203,7 +166,7 @@ func (collector *Collector) analysis(profileType string, profileBytes []byte) er
return err
}

profileID, err := collector.store.SaveProfile(b.Bytes())
profileID, err := collector.store.SaveProfile(b.Bytes(), time.Duration(collector.Expiration)*time.Hour*24)
if err != nil {
collector.log.WithError(err).Error("save profile error")
return err
Expand All @@ -230,7 +193,7 @@ func (collector *Collector) analysis(profileType string, profileBytes []byte) er
metas = append(metas, meta)
}

err = collector.store.SaveProfileMeta(metas)
err = collector.store.SaveProfileMeta(metas, time.Duration(collector.Expiration)*time.Hour*24)
if err != nil {
collector.log.WithError(err).Error("save profile meta error")
return err
Expand Down
49 changes: 45 additions & 4 deletions pkg/storage/badger/func.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package badger

import (
"bytes"
"strconv"
"strings"
"time"

"github.com/dgraph-io/badger/v3"
"github.com/sirupsen/logrus"
"github.com/xyctruth/profiler/pkg/storage"
)

Expand Down Expand Up @@ -43,16 +46,54 @@ func buildSampleTypeKey(sampleType string) []byte {
return buf.Bytes()
}

func deleteSampleTypeKey(sampleType []byte) string {
return strings.Replace(string(sampleType), string(PrefixSampleType), "", 1)
}

func buildTargetKey(target string) []byte {
buf := bytes.NewBuffer(PrefixTarget)
buf.Write([]byte(target))
return buf.Bytes()
}

func newProfileEntry(id uint64, val []byte, ttl time.Duration) *badger.Entry {
entry := badger.NewEntry(buildProfileKey(strconv.FormatUint(id, 10)), val)

if ttl > 0 {
entry = entry.WithTTL(ttl)
}
return entry
}

func newProfileMetaEntry(meta *storage.ProfileMeta, ttl time.Duration) *badger.Entry {
metaBytes, err := meta.Encode()
if err != nil {
logrus.WithError(err).Error("ProfileMeta encode error")
return nil
}
entry := badger.NewEntry(buildProfileMetaKey(meta.SampleType, meta.TargetName, time.Now()), metaBytes)
if ttl > 0 {
entry = entry.WithTTL(ttl)
}
return entry
}

func newSampleTypeEntry(sampleType string, profileType string, ttl time.Duration) *badger.Entry {
entry := badger.NewEntry(buildSampleTypeKey(sampleType), []byte(profileType))
if ttl > 0 {
entry = entry.WithTTL(ttl)
}
return entry
}

func newTargetKeyEntry(target string, ttl time.Duration) *badger.Entry {
entry := badger.NewEntry(buildTargetKey(target), []byte{})
if ttl > 0 {
entry = entry.WithTTL(ttl)
}
return entry
}

func deleteSampleTypeKey(sampleType []byte) string {
return strings.Replace(string(sampleType), string(PrefixSampleType), "", 1)
}

func deleteTargetKey(target []byte) string {
return strings.Replace(string(target), string(PrefixTarget), "", 1)
}
85 changes: 6 additions & 79 deletions pkg/storage/badger/store.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package badger

import (
"strconv"
"time"

"github.com/dgraph-io/badger/v3"
Expand Down Expand Up @@ -69,15 +68,14 @@ func (s *store) GetProfile(id string) ([]byte, error) {
return date, err
}

func (s *store) SaveProfile(profileData []byte) (uint64, error) {
func (s *store) SaveProfile(profileData []byte, ttl time.Duration) (uint64, error) {
id, err := s.seq.Next()
if err != nil {
return 0, err
}

err = s.db.Update(func(txn *badger.Txn) error {
return txn.Set(buildProfileKey(strconv.FormatUint(id, 10)), profileData)

return txn.SetEntry(newProfileEntry(id, profileData, ttl))
})

if err != nil {
Expand All @@ -86,25 +84,20 @@ func (s *store) SaveProfile(profileData []byte) (uint64, error) {
return id, nil
}

func (s *store) SaveProfileMeta(metas []*storage.ProfileMeta) error {
func (s *store) SaveProfileMeta(metas []*storage.ProfileMeta, ttl time.Duration) error {
return s.db.Update(func(txn *badger.Txn) error {
for _, meta := range metas {
err := txn.Set(buildSampleTypeKey(meta.SampleType), []byte(meta.ProfileType))
err := txn.SetEntry(newSampleTypeEntry(meta.SampleType, meta.ProfileType, ttl))
if err != nil {
return err
}

err = txn.Set(buildTargetKey(meta.TargetName), []byte(meta.TargetName))
err = txn.SetEntry(newTargetKeyEntry(meta.TargetName, ttl))
if err != nil {
return err
}

metaBytes, err := meta.Encode()
if err != nil {
return err
}

err = txn.Set(buildProfileMetaKey(meta.SampleType, meta.TargetName, time.Now()), metaBytes)
err = txn.SetEntry(newProfileMetaEntry(meta, ttl))
if err != nil {
return err
}
Expand Down Expand Up @@ -243,72 +236,6 @@ func (s *store) ListTarget() ([]string, error) {
return targets, nil
}

func (s *store) Clear(targetName string, agoDays int64) error {
sampleTypes, err := s.ListSampleType()
if err != nil {
return err
}

now := time.Now()
today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
ago := today.Add(-time.Hour * 24 * time.Duration(agoDays))

err = s.db.Update(func(txn *badger.Txn) error {
for _, sampleType := range sampleTypes {
max := buildProfileMetaKey(sampleType, targetName, ago)
opts := badger.DefaultIteratorOptions
opts.PrefetchSize = 100
opts.Prefix = buildBaseProfileMetaKey(sampleType, targetName)
it := txn.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
k := item.Key()
var profileID uint64
if !storage.CompareKey(k, max) {
break
}
err = item.Value(func(v []byte) error {
sample := &storage.ProfileMeta{}
err = sample.Decode(v)
if err != nil {
return err
}
profileID = sample.ProfileID
return nil
})
if err != nil {
return err
}

err = s.delete(k, buildProfileKey(strconv.FormatUint(profileID, 10)))
if err != nil {
return err
}
}
}
return nil
})

if err != nil {
return err
}

return nil
}

func (s *store) delete(keys ...[]byte) error {
return s.db.Update(func(txn *badger.Txn) error {
for _, key := range keys {
err := txn.Delete(key)
if err != nil {
return err
}
}
return nil
})
}

func (s *store) Release() {
err := s.seq.Release()
if err != nil {
Expand Down
Loading

0 comments on commit 2db55ca

Please sign in to comment.