From 98f2b66b9b7a711c427d5f55de82bc55d135ddfc Mon Sep 17 00:00:00 2001 From: PJ Doerner Date: Wed, 24 May 2023 15:10:41 -0700 Subject: [PATCH] array moving average --- .../aggregate/bench_moving_window_avg_test.go | 14 ++- .../aggregate/moving_window_average_array.go | 92 +++++++++++++++++++ 2 files changed, 104 insertions(+), 2 deletions(-) diff --git a/common/aggregate/bench_moving_window_avg_test.go b/common/aggregate/bench_moving_window_avg_test.go index 706460c2c30..4ce34c9fac7 100644 --- a/common/aggregate/bench_moving_window_avg_test.go +++ b/common/aggregate/bench_moving_window_avg_test.go @@ -31,10 +31,12 @@ import ( ) // BenchmarkRingMovingWindowAvg -// BenchmarkRingMovingWindowAvg-10 12283236 94.76 ns/op +// BenchmarkRingMovingWindowAvg-10 12622564 92.76 ns/op +// BenchmarkArrayMovingWindowAvg +// BenchmarkArrayMovingWindowAvg-10 12022722 99.94 ns/op const ( - testWindowSize = 3 * time.Second + testWindowSize = 10 * time.Millisecond testBufferSize = 200 ) @@ -45,3 +47,11 @@ func BenchmarkRingMovingWindowAvg(b *testing.B) { avg.Average() } } + +func BenchmarkArrayMovingWindowAvg(b *testing.B) { + avg := NewMovingWindowAvgArrayImpl(testWindowSize, testBufferSize) + for i := 0; i < b.N; i++ { + avg.Record(rand.Int63()) + avg.Average() + } +} diff --git a/common/aggregate/moving_window_average_array.go b/common/aggregate/moving_window_average_array.go index 17fdfeda482..a6bfcd62962 100644 --- a/common/aggregate/moving_window_average_array.go +++ b/common/aggregate/moving_window_average_array.go @@ -1 +1,93 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + package aggregate + +import ( + "sync" + "time" +) + +type ( + MovingWindowAvgArrayImpl struct { + sync.Mutex + windowSize time.Duration + maxBufferSize int + buffer []timestampedData + headIdx int + tailIdx int + sum int64 + count int64 + } +) + +func NewMovingWindowAvgArrayImpl( + windowSize time.Duration, + maxBufferSize int, +) *MovingWindowAvgArrayImpl { + return &MovingWindowAvgArrayImpl{ + windowSize: windowSize, + maxBufferSize: maxBufferSize, + buffer: make([]timestampedData, maxBufferSize), + } +} + +func (a *MovingWindowAvgArrayImpl) Record(val int64) { + a.Lock() + defer a.Unlock() + + a.buffer[a.tailIdx] = timestampedData{timestamp: time.Now(), value: val} + a.tailIdx = (a.tailIdx + 1) % a.maxBufferSize + + a.sum += val + a.count++ + + if a.tailIdx == a.headIdx { + // buffer full, expire oldest element + a.sum -= a.buffer[a.headIdx].value + a.count-- + a.headIdx = (a.headIdx + 1) % a.maxBufferSize + } +} + +func (a *MovingWindowAvgArrayImpl) Average() float64 { + a.Lock() + defer a.Unlock() + + a.expireOldValuesLocked() + if a.count == 0 { + return 0 + } + return float64(a.sum) / float64(a.count) +} + +func (a *MovingWindowAvgArrayImpl) expireOldValuesLocked() { + for ; a.headIdx != a.tailIdx; a.headIdx = (a.headIdx + 1) % a.maxBufferSize { + if time.Since(a.buffer[a.headIdx].timestamp) < a.windowSize { + break + } + a.sum -= a.buffer[a.headIdx].value + a.count-- + } +}