diff --git a/pkg/cache/index/index.go b/pkg/cache/index/index.go index f60877e10..c3ebb9e15 100644 --- a/pkg/cache/index/index.go +++ b/pkg/cache/index/index.go @@ -20,6 +20,7 @@ package index import ( "sort" "sync" + "sync/atomic" "time" "github.com/tricksterproxy/trickster/pkg/cache" @@ -191,10 +192,10 @@ func (idx *Index) UpdateObject(obj *Object) { obj.LastWrite = obj.LastAccess if o, ok := idx.Objects[key]; ok { - idx.CacheSize += o.Size - idx.Objects[key].Size + atomic.AddInt64(&idx.CacheSize, obj.Size-o.Size) } else { - idx.CacheSize += obj.Size - idx.ObjectCount++ + atomic.AddInt64(&idx.CacheSize, obj.Size) + atomic.AddInt64(&idx.ObjectCount, 1) } metrics.ObserveCacheSizeChange(idx.name, idx.cacheType, idx.CacheSize, idx.ObjectCount) @@ -208,8 +209,8 @@ func (idx *Index) RemoveObject(key string) { indexLock.Lock() idx.lastWrite = time.Now() if o, ok := idx.Objects[key]; ok { - idx.CacheSize -= o.Size - idx.ObjectCount-- + atomic.AddInt64(&idx.CacheSize, -o.Size) + atomic.AddInt64(&idx.ObjectCount, -1) metrics.ObserveCacheOperation(idx.name, idx.cacheType, "del", "none", float64(o.Size)) @@ -226,8 +227,8 @@ func (idx *Index) RemoveObjects(keys []string, noLock bool) { } for _, key := range keys { if o, ok := idx.Objects[key]; ok { - idx.CacheSize -= o.Size - idx.ObjectCount-- + atomic.AddInt64(&idx.CacheSize, -o.Size) + atomic.AddInt64(&idx.ObjectCount, -1) metrics.ObserveCacheOperation(idx.name, idx.cacheType, "del", "none", float64(o.Size)) delete(idx.Objects, key) metrics.ObserveCacheSizeChange(idx.name, idx.cacheType, idx.CacheSize, idx.ObjectCount) diff --git a/pkg/proxy/origins/clickhouse/series.go b/pkg/proxy/origins/clickhouse/series.go index 8b89fdc38..c17a98d6b 100644 --- a/pkg/proxy/origins/clickhouse/series.go +++ b/pkg/proxy/origins/clickhouse/series.go @@ -19,6 +19,7 @@ package clickhouse import ( "sort" "sync" + "sync/atomic" "time" "github.com/tricksterproxy/trickster/pkg/sort/times" @@ -462,52 +463,38 @@ func (re *ResultsEnvelope) ValueCount() int { // Size returns the approximate memory utilization in bytes of the timeseries func (re *ResultsEnvelope) Size() int { - - var size int wg := sync.WaitGroup{} - - var a int - ma := sync.Mutex{} + c := uint64(24 + // .stepDuration + (25 * len(re.timestamps)) + // time.Time (24) + bool(1) + (24 * len(re.tslist)) + // time.Time (24) + (len(re.ExtentList) * 72) + // time.Time (24) * 3 + 2, // .isSorted + .isCounted + ) for i := range re.Meta { wg.Add(1) go func(j int) { - ma.Lock() - a += len(re.Meta[j].Name) + len(re.Meta[j].Type) - ma.Unlock() + atomic.AddUint64(&c, uint64(len(re.Meta[j].Name)+len(re.Meta[j].Type))) wg.Done() }(i) } - - var b int - mb := sync.Mutex{} - for k, v := range re.Data { - b += len(k) - wg.Add(1) - go func(d *DataSet) { - mb.Lock() - b += len(d.Points) * 16 - mb.Unlock() - wg.Done() - }(v) - } - - var c int - mc := sync.Mutex{} for _, s := range re.SeriesOrder { wg.Add(1) go func(t string) { - mc.Lock() - c += len(t) - mc.Unlock() + atomic.AddUint64(&c, uint64(len(t))) wg.Done() }(s) } - - // ExtentList + StepDuration + Timestamps + Times + isCounted + isSorted - d := (len(re.ExtentList) * 24) + 8 + (len(re.timestamps) * 9) + (len(re.tslist) * 8) + 2 - + for k, v := range re.Data { + atomic.AddUint64(&c, uint64(len(k))) + wg.Add(1) + go func(d *DataSet) { + atomic.AddUint64(&c, uint64(len(d.Points)*32)) + for mk := range d.Metric { + atomic.AddUint64(&c, uint64(len(mk)+8)) // + approx len of value (interface) + } + wg.Done() + }(v) + } wg.Wait() - size = a + b + c + d - return size - + return int(c) } diff --git a/pkg/proxy/origins/clickhouse/series_test.go b/pkg/proxy/origins/clickhouse/series_test.go index afeb70c1a..fc02d1968 100644 --- a/pkg/proxy/origins/clickhouse/series_test.go +++ b/pkg/proxy/origins/clickhouse/series_test.go @@ -1505,7 +1505,7 @@ func TestSize(t *testing.T) { StepDuration: time.Duration(5) * time.Second, } i := r.Size() - const expected = 146 + const expected = 370 if i != expected { t.Errorf("expected %d got %d", expected, i) } diff --git a/pkg/proxy/origins/influxdb/series.go b/pkg/proxy/origins/influxdb/series.go index 51e35d8b0..4fd185823 100644 --- a/pkg/proxy/origins/influxdb/series.go +++ b/pkg/proxy/origins/influxdb/series.go @@ -21,6 +21,7 @@ import ( "sort" "strings" "sync" + "sync/atomic" "time" "github.com/tricksterproxy/trickster/pkg/sort/times" @@ -459,28 +460,31 @@ func (se *SeriesEnvelope) Sort() { // Size returns the approximate memory utilization in bytes of the timeseries func (se *SeriesEnvelope) Size() int { - c := 8 + len(se.Err) + c := uint64(24 + // .stepDuration + len(se.Err) + + se.ExtentList.Size() + // time.Time (24) * 3 + (25 * len(se.timestamps)) + // time.Time (24) + bool(1) + (24 * len(se.tslist)) + // time.Time (24) + 2, // .isSorted + .isCounted + ) wg := sync.WaitGroup{} - mtx := sync.Mutex{} - for i := range se.Results { - for j := range se.Results[i].Series { - + for i, res := range se.Results { + atomic.AddUint64(&c, uint64(8+len(res.Err))) // .StatementID + for j := range res.Series { wg.Add(1) go func(r models.Row) { - mtx.Lock() - c += len(r.Name) + atomic.AddUint64(&c, uint64(len(r.Name)+1)) // .Partial for k, v := range r.Tags { - c += len(k) + len(v) + atomic.AddUint64(&c, uint64(len(k)+len(v))) } for _, v := range r.Columns { - c += len(v) + atomic.AddUint64(&c, uint64(len(v))) } - c += 16 // approximate size of timestamp + value - mtx.Unlock() + atomic.AddUint64(&c, 32) // size of timestamp (24) + approximate value size (8) wg.Done() }(se.Results[i].Series[j]) } } wg.Wait() - return c + return int(c) } diff --git a/pkg/proxy/origins/influxdb/series_test.go b/pkg/proxy/origins/influxdb/series_test.go index eb174499e..13ba97039 100644 --- a/pkg/proxy/origins/influxdb/series_test.go +++ b/pkg/proxy/origins/influxdb/series_test.go @@ -1573,7 +1573,7 @@ func TestSize(t *testing.T) { } i := s.Size() - expected := 94 + expected := 226 if i != expected { t.Errorf("expected %d got %d", expected, i) diff --git a/pkg/proxy/origins/irondb/model.go b/pkg/proxy/origins/irondb/model.go index 1533f4407..8ad2fd1ca 100644 --- a/pkg/proxy/origins/irondb/model.go +++ b/pkg/proxy/origins/irondb/model.go @@ -359,9 +359,8 @@ func (c Client) UnmarshalInstantaneous( // Size returns the approximate memory utilization in bytes of the timeseries func (se *SeriesEnvelope) Size() int { - - // TODO this implementation is a rough approximation to ensure we conform to the - // interface specification, it requires refinement in order to be in the ballpark - c := len(se.Data) * 24 + c := len(se.Data)*36 + // time.Time (24) + Step (4) + Value (8) + (len(se.ExtentList) * 72) + // time.Time (24) * 3 + 24 // .StepDuration return c } diff --git a/pkg/proxy/origins/irondb/model_df4.go b/pkg/proxy/origins/irondb/model_df4.go index 5bcca921a..19920b606 100644 --- a/pkg/proxy/origins/irondb/model_df4.go +++ b/pkg/proxy/origins/irondb/model_df4.go @@ -18,6 +18,7 @@ package irondb import ( "sync" + "sync/atomic" "time" "github.com/tricksterproxy/trickster/pkg/timeseries" @@ -325,22 +326,28 @@ func (se *DF4SeriesEnvelope) Sort() { // Size returns the approximate memory utilization in bytes of the timeseries func (se *DF4SeriesEnvelope) Size() int { - - // TODO this implementation is a rough approximation to ensure we conform to the - // interface specification, it requires refinement in order to be in the ballpark - - c := 24 + len(se.Ver) // accounts for head + ver wg := sync.WaitGroup{} - mtx := sync.Mutex{} + c := uint64(len(se.Ver) + + 24 + // .Head + 24 + // .StepDuration + se.ExtentList.Size(), + ) + for i := range se.Meta { + wg.Add(1) + go func(j int) { + for k := range se.Meta[i] { + atomic.AddUint64(&c, uint64(len(k)+8)) // + approximate Meta Value size (8) + } + wg.Done() + }(i) + } for i := range se.Data { wg.Add(1) go func(s []interface{}) { - mtx.Lock() - c += (len(s) * 16) - mtx.Unlock() + atomic.AddUint64(&c, uint64(len(s)*16)) // + approximate data value size wg.Done() }(se.Data[i]) } wg.Wait() - return c + return int(c) } diff --git a/pkg/proxy/origins/irondb/model_df4_test.go b/pkg/proxy/origins/irondb/model_df4_test.go index 68c85fa20..de413645f 100644 --- a/pkg/proxy/origins/irondb/model_df4_test.go +++ b/pkg/proxy/origins/irondb/model_df4_test.go @@ -366,22 +366,22 @@ func TestUnmarshalDF4Timeseries(t *testing.T) { se := ts.(*DF4SeriesEnvelope) if len(se.Data) != 2 { - t.Errorf(`Expected length: 2. got %d`, len(se.Data)) + t.Errorf(`expected length: 2. got %d`, len(se.Data)) return } if se.Data[1][1] != 2.0 { - t.Errorf(`Expected value: 2.0. got %f`, se.Data[1][1]) + t.Errorf(`expected value: 2.0. got %f`, se.Data[1][1]) return } if se.Head.Start != 300 { - t.Errorf(`Expected time start: 300. got %d`, se.Head.Start) + t.Errorf(`expected time start: 300. got %d`, se.Head.Start) return } if se.Head.Period != 300 { - t.Errorf(`Expected time period: 300. got %d`, se.Head.Period) + t.Errorf(`expected time period: 300. got %d`, se.Head.Period) return } } @@ -390,7 +390,7 @@ func TestSize(t *testing.T) { client := &Client{} s, _ := client.UnmarshalTimeseries([]byte(testDF4Response)) - expected := 75 + expected := 136 if s.Size() != expected { t.Errorf("expected %d got %d", expected, s.Size()) diff --git a/pkg/proxy/origins/irondb/model_test.go b/pkg/proxy/origins/irondb/model_test.go index a1d2dad6b..8a706c9bd 100644 --- a/pkg/proxy/origins/irondb/model_test.go +++ b/pkg/proxy/origins/irondb/model_test.go @@ -497,11 +497,11 @@ func TestTSSize(t *testing.T) { s, _ := client.UnmarshalTimeseries(bytes) - expected := 48 + expected := 96 size := s.Size() if size != expected { - t.Errorf("got %d expected %d", size, expected) + t.Errorf("expected %d got %d", expected, size) } } diff --git a/pkg/proxy/origins/prometheus/matrix.go b/pkg/proxy/origins/prometheus/matrix.go index 03ed4278a..2351dd1d8 100644 --- a/pkg/proxy/origins/prometheus/matrix.go +++ b/pkg/proxy/origins/prometheus/matrix.go @@ -19,6 +19,7 @@ package prometheus import ( "sort" "sync" + "sync/atomic" "time" "github.com/tricksterproxy/trickster/pkg/sort/times" @@ -401,19 +402,21 @@ func (me *MatrixEnvelope) ValueCount() int { // Size returns the approximate memory utilization in bytes of the timeseries func (me *MatrixEnvelope) Size() int { - - c := 0 wg := sync.WaitGroup{} - mtx := sync.Mutex{} + c := uint64(len(me.Status) + + me.ExtentList.Size() + + 24 + // me.StepDuration + (25 * len(me.timestamps)) + + (24 * len(me.tslist)) + + 2 + // isSorted + isCounted + len(me.Data.ResultType)) for i := range me.Data.Result { wg.Add(1) go func(s *model.SampleStream) { - mtx.Lock() - c += (len(s.Values) * 16) + len(s.Metric.String()) - mtx.Unlock() + atomic.AddUint64(&c, uint64((len(s.Values)*32)+len(s.Metric.String()))) wg.Done() }(me.Data.Result[i]) } wg.Wait() - return c + return int(c) } diff --git a/pkg/proxy/origins/prometheus/matrix_test.go b/pkg/proxy/origins/prometheus/matrix_test.go index 735ae4a9e..0ad41bb2e 100644 --- a/pkg/proxy/origins/prometheus/matrix_test.go +++ b/pkg/proxy/origins/prometheus/matrix_test.go @@ -1701,7 +1701,7 @@ func TestSize(t *testing.T) { StepDuration: time.Duration(5) * time.Second, } i := m.Size() - expected := 17 + expected := 144 if i != expected { t.Errorf("expected %d got %d", expected, i) diff --git a/pkg/timeseries/extent_list.go b/pkg/timeseries/extent_list.go index 6f58d7c40..203a2cade 100644 --- a/pkg/timeseries/extent_list.go +++ b/pkg/timeseries/extent_list.go @@ -164,6 +164,11 @@ func (el ExtentList) Clone() ExtentList { return c } +// Size returns the approximate memory utilization in bytes of the timeseries +func (el ExtentList) Size() int { + return len(el) * 72 +} + // ExtentListLRU is a type of []Extent used for sorting the slice by LRU type ExtentListLRU []Extent