Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache size measurements #438

Merged
merged 2 commits into from
May 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions pkg/cache/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package index
import (
"sort"
"sync"
"sync/atomic"
"time"

"github.com/tricksterproxy/trickster/pkg/cache"
Expand Down Expand Up @@ -191,10 +192,10 @@ func (idx *Index) UpdateObject(obj *Object) {
obj.LastWrite = obj.LastAccess

if o, ok := idx.Objects[key]; ok {
idx.CacheSize += o.Size - idx.Objects[key].Size
atomic.AddInt64(&idx.CacheSize, obj.Size-o.Size)
} else {
idx.CacheSize += obj.Size
idx.ObjectCount++
atomic.AddInt64(&idx.CacheSize, obj.Size)
atomic.AddInt64(&idx.ObjectCount, 1)
}

metrics.ObserveCacheSizeChange(idx.name, idx.cacheType, idx.CacheSize, idx.ObjectCount)
Expand All @@ -208,8 +209,8 @@ func (idx *Index) RemoveObject(key string) {
indexLock.Lock()
idx.lastWrite = time.Now()
if o, ok := idx.Objects[key]; ok {
idx.CacheSize -= o.Size
idx.ObjectCount--
atomic.AddInt64(&idx.CacheSize, -o.Size)
atomic.AddInt64(&idx.ObjectCount, -1)

metrics.ObserveCacheOperation(idx.name, idx.cacheType, "del", "none", float64(o.Size))

Expand All @@ -226,8 +227,8 @@ func (idx *Index) RemoveObjects(keys []string, noLock bool) {
}
for _, key := range keys {
if o, ok := idx.Objects[key]; ok {
idx.CacheSize -= o.Size
idx.ObjectCount--
atomic.AddInt64(&idx.CacheSize, -o.Size)
atomic.AddInt64(&idx.ObjectCount, -1)
metrics.ObserveCacheOperation(idx.name, idx.cacheType, "del", "none", float64(o.Size))
delete(idx.Objects, key)
metrics.ObserveCacheSizeChange(idx.name, idx.cacheType, idx.CacheSize, idx.ObjectCount)
Expand Down
55 changes: 21 additions & 34 deletions pkg/proxy/origins/clickhouse/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package clickhouse
import (
"sort"
"sync"
"sync/atomic"
"time"

"github.com/tricksterproxy/trickster/pkg/sort/times"
Expand Down Expand Up @@ -462,52 +463,38 @@ func (re *ResultsEnvelope) ValueCount() int {

// Size returns the approximate memory utilization in bytes of the timeseries
func (re *ResultsEnvelope) Size() int {

var size int
wg := sync.WaitGroup{}

var a int
ma := sync.Mutex{}
c := uint64(24 + // .stepDuration
(25 * len(re.timestamps)) + // time.Time (24) + bool(1)
(24 * len(re.tslist)) + // time.Time (24)
(len(re.ExtentList) * 72) + // time.Time (24) * 3
2, // .isSorted + .isCounted
)
for i := range re.Meta {
wg.Add(1)
go func(j int) {
ma.Lock()
a += len(re.Meta[j].Name) + len(re.Meta[j].Type)
ma.Unlock()
atomic.AddUint64(&c, uint64(len(re.Meta[j].Name)+len(re.Meta[j].Type)))
wg.Done()
}(i)
}

var b int
mb := sync.Mutex{}
for k, v := range re.Data {
b += len(k)
wg.Add(1)
go func(d *DataSet) {
mb.Lock()
b += len(d.Points) * 16
mb.Unlock()
wg.Done()
}(v)
}

var c int
mc := sync.Mutex{}
for _, s := range re.SeriesOrder {
wg.Add(1)
go func(t string) {
mc.Lock()
c += len(t)
mc.Unlock()
atomic.AddUint64(&c, uint64(len(t)))
wg.Done()
}(s)
}

// ExtentList + StepDuration + Timestamps + Times + isCounted + isSorted
d := (len(re.ExtentList) * 24) + 8 + (len(re.timestamps) * 9) + (len(re.tslist) * 8) + 2

for k, v := range re.Data {
atomic.AddUint64(&c, uint64(len(k)))
wg.Add(1)
go func(d *DataSet) {
atomic.AddUint64(&c, uint64(len(d.Points)*32))
for mk := range d.Metric {
atomic.AddUint64(&c, uint64(len(mk)+8)) // + approx len of value (interface)
}
wg.Done()
}(v)
}
wg.Wait()
size = a + b + c + d
return size

return int(c)
}
2 changes: 1 addition & 1 deletion pkg/proxy/origins/clickhouse/series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1505,7 +1505,7 @@ func TestSize(t *testing.T) {
StepDuration: time.Duration(5) * time.Second,
}
i := r.Size()
const expected = 146
const expected = 370
if i != expected {
t.Errorf("expected %d got %d", expected, i)
}
Expand Down
28 changes: 16 additions & 12 deletions pkg/proxy/origins/influxdb/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/tricksterproxy/trickster/pkg/sort/times"
Expand Down Expand Up @@ -459,28 +460,31 @@ func (se *SeriesEnvelope) Sort() {

// Size returns the approximate memory utilization in bytes of the timeseries
func (se *SeriesEnvelope) Size() int {
c := 8 + len(se.Err)
c := uint64(24 + // .stepDuration
len(se.Err) +
se.ExtentList.Size() + // time.Time (24) * 3
(25 * len(se.timestamps)) + // time.Time (24) + bool(1)
(24 * len(se.tslist)) + // time.Time (24)
2, // .isSorted + .isCounted
)
wg := sync.WaitGroup{}
mtx := sync.Mutex{}
for i := range se.Results {
for j := range se.Results[i].Series {

for i, res := range se.Results {
atomic.AddUint64(&c, uint64(8+len(res.Err))) // .StatementID
for j := range res.Series {
wg.Add(1)
go func(r models.Row) {
mtx.Lock()
c += len(r.Name)
atomic.AddUint64(&c, uint64(len(r.Name)+1)) // .Partial
for k, v := range r.Tags {
c += len(k) + len(v)
atomic.AddUint64(&c, uint64(len(k)+len(v)))
}
for _, v := range r.Columns {
c += len(v)
atomic.AddUint64(&c, uint64(len(v)))
}
c += 16 // approximate size of timestamp + value
mtx.Unlock()
atomic.AddUint64(&c, 32) // size of timestamp (24) + approximate value size (8)
wg.Done()
}(se.Results[i].Series[j])
}
}
wg.Wait()
return c
return int(c)
}
2 changes: 1 addition & 1 deletion pkg/proxy/origins/influxdb/series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1573,7 +1573,7 @@ func TestSize(t *testing.T) {
}

i := s.Size()
expected := 94
expected := 226

if i != expected {
t.Errorf("expected %d got %d", expected, i)
Expand Down
7 changes: 3 additions & 4 deletions pkg/proxy/origins/irondb/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,8 @@ func (c Client) UnmarshalInstantaneous(

// Size returns the approximate memory utilization in bytes of the timeseries
func (se *SeriesEnvelope) Size() int {

// TODO this implementation is a rough approximation to ensure we conform to the
// interface specification, it requires refinement in order to be in the ballpark
c := len(se.Data) * 24
c := len(se.Data)*36 + // time.Time (24) + Step (4) + Value (8)
(len(se.ExtentList) * 72) + // time.Time (24) * 3
24 // .StepDuration
return c
}
27 changes: 17 additions & 10 deletions pkg/proxy/origins/irondb/model_df4.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package irondb

import (
"sync"
"sync/atomic"
"time"

"github.com/tricksterproxy/trickster/pkg/timeseries"
Expand Down Expand Up @@ -325,22 +326,28 @@ func (se *DF4SeriesEnvelope) Sort() {

// Size returns the approximate memory utilization in bytes of the timeseries
func (se *DF4SeriesEnvelope) Size() int {

// TODO this implementation is a rough approximation to ensure we conform to the
// interface specification, it requires refinement in order to be in the ballpark

c := 24 + len(se.Ver) // accounts for head + ver
wg := sync.WaitGroup{}
mtx := sync.Mutex{}
c := uint64(len(se.Ver) +
24 + // .Head
24 + // .StepDuration
se.ExtentList.Size(),
)
for i := range se.Meta {
wg.Add(1)
go func(j int) {
for k := range se.Meta[i] {
atomic.AddUint64(&c, uint64(len(k)+8)) // + approximate Meta Value size (8)
}
wg.Done()
}(i)
}
for i := range se.Data {
wg.Add(1)
go func(s []interface{}) {
mtx.Lock()
c += (len(s) * 16)
mtx.Unlock()
atomic.AddUint64(&c, uint64(len(s)*16)) // + approximate data value size
wg.Done()
}(se.Data[i])
}
wg.Wait()
return c
return int(c)
}
10 changes: 5 additions & 5 deletions pkg/proxy/origins/irondb/model_df4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,22 +366,22 @@ func TestUnmarshalDF4Timeseries(t *testing.T) {

se := ts.(*DF4SeriesEnvelope)
if len(se.Data) != 2 {
t.Errorf(`Expected length: 2. got %d`, len(se.Data))
t.Errorf(`expected length: 2. got %d`, len(se.Data))
return
}

if se.Data[1][1] != 2.0 {
t.Errorf(`Expected value: 2.0. got %f`, se.Data[1][1])
t.Errorf(`expected value: 2.0. got %f`, se.Data[1][1])
return
}

if se.Head.Start != 300 {
t.Errorf(`Expected time start: 300. got %d`, se.Head.Start)
t.Errorf(`expected time start: 300. got %d`, se.Head.Start)
return
}

if se.Head.Period != 300 {
t.Errorf(`Expected time period: 300. got %d`, se.Head.Period)
t.Errorf(`expected time period: 300. got %d`, se.Head.Period)
return
}
}
Expand All @@ -390,7 +390,7 @@ func TestSize(t *testing.T) {

client := &Client{}
s, _ := client.UnmarshalTimeseries([]byte(testDF4Response))
expected := 75
expected := 136

if s.Size() != expected {
t.Errorf("expected %d got %d", expected, s.Size())
Expand Down
4 changes: 2 additions & 2 deletions pkg/proxy/origins/irondb/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,11 +497,11 @@ func TestTSSize(t *testing.T) {

s, _ := client.UnmarshalTimeseries(bytes)

expected := 48
expected := 96
size := s.Size()

if size != expected {
t.Errorf("got %d expected %d", size, expected)
t.Errorf("expected %d got %d", expected, size)
}

}
Expand Down
17 changes: 10 additions & 7 deletions pkg/proxy/origins/prometheus/matrix.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package prometheus
import (
"sort"
"sync"
"sync/atomic"
"time"

"github.com/tricksterproxy/trickster/pkg/sort/times"
Expand Down Expand Up @@ -401,19 +402,21 @@ func (me *MatrixEnvelope) ValueCount() int {

// Size returns the approximate memory utilization in bytes of the timeseries
func (me *MatrixEnvelope) Size() int {

c := 0
wg := sync.WaitGroup{}
mtx := sync.Mutex{}
c := uint64(len(me.Status) +
me.ExtentList.Size() +
24 + // me.StepDuration
(25 * len(me.timestamps)) +
(24 * len(me.tslist)) +
2 + // isSorted + isCounted
len(me.Data.ResultType))
for i := range me.Data.Result {
wg.Add(1)
go func(s *model.SampleStream) {
mtx.Lock()
c += (len(s.Values) * 16) + len(s.Metric.String())
mtx.Unlock()
atomic.AddUint64(&c, uint64((len(s.Values)*32)+len(s.Metric.String())))
wg.Done()
}(me.Data.Result[i])
}
wg.Wait()
return c
return int(c)
}
2 changes: 1 addition & 1 deletion pkg/proxy/origins/prometheus/matrix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1701,7 +1701,7 @@ func TestSize(t *testing.T) {
StepDuration: time.Duration(5) * time.Second,
}
i := m.Size()
expected := 17
expected := 144

if i != expected {
t.Errorf("expected %d got %d", expected, i)
Expand Down
5 changes: 5 additions & 0 deletions pkg/timeseries/extent_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ func (el ExtentList) Clone() ExtentList {
return c
}

// Size returns the approximate memory utilization in bytes of the timeseries
func (el ExtentList) Size() int {
return len(el) * 72
}

// ExtentListLRU is a type of []Extent used for sorting the slice by LRU
type ExtentListLRU []Extent

Expand Down