Skip to content

Commit

Permalink
add more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed May 13, 2016
1 parent 7e9261e commit 23adff7
Show file tree
Hide file tree
Showing 5 changed files with 361 additions and 36 deletions.
3 changes: 0 additions & 3 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1565,9 +1565,6 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error {
if min, max, got := 3, 4, len(expr.Args); got > max || got < min {
return fmt.Errorf("invalid number of arguments for %s, expected at least %d but no more than %d, got %d", expr.Name, min, max, got)
}
if _, ok := expr.Args[0].(*VarRef); !ok {
return fmt.Errorf("expected field argument as first arg in %s()", expr.Name)
}
for i, arg := range expr.Args[1:3] {
if _, ok := arg.(*IntegerLiteral); !ok {
return fmt.Errorf("expected integer argument as %dth arg in %s()", i+1, expr.Name)
Expand Down
73 changes: 55 additions & 18 deletions influxql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,13 +338,12 @@ type FloatHoltWintersReducer struct {

// Interval between points
interval int64
// Time of last point
last int64

// Whether to include all data or only future values
includeAllData bool

y []float64
y []float64
points []FloatPoint
}

const (
Expand Down Expand Up @@ -374,12 +373,14 @@ func NewFloatHoltWintersReducer(h, m int, includeAllData bool) *FloatHoltWinters

// AggregateFloat aggregates a point into the reducer and updates the current window.
func (r *FloatHoltWintersReducer) AggregateFloat(p *FloatPoint) {
//TODO: handle missing values based on expected interval between points
if r.last != 0 && r.interval == 0 {
r.interval = p.Time - r.last
// Keep track of the smallest interval
if l := len(r.points); l > 1 {
interval := p.Time - r.points[l-1].Time
if r.interval == 0 || interval < r.interval {
r.interval = interval
}
}
r.last = p.Time
r.y = append(r.y, p.Value)
r.points = append(r.points, *p)
}

// AggregateInteger aggregates a point into the reducer and updates the current window.
Expand All @@ -388,9 +389,26 @@ func (r *FloatHoltWintersReducer) AggregateInteger(p *IntegerPoint) {
}

func (r *FloatHoltWintersReducer) Emit() []FloatPoint {
if l := len(r.y); l < 2 || r.seasonal && l < r.m {
if l := len(r.points); l < 2 || r.seasonal && l < r.m {
return nil
}
// First fill in r.y with values and NaNs for missing values
start, stop := r.points[0].Time, r.points[len(r.points)-1].Time
count := (stop - start) / r.interval
r.y = make([]float64, 1, count)
r.y[0] = r.points[0].Value
t := r.points[0].Time
for _, p := range r.points[1:] {
t += r.interval
// Add any missing values before the next point
for p.Time != t {
// Add in a NaN so we can skip it later.
r.y = append(r.y, math.NaN())
t += r.interval
}
r.y = append(r.y, p.Value)
}

// Smoothing parameters
alpha, beta, gamma := r.alpha, r.beta, r.gamma

Expand All @@ -401,10 +419,16 @@ func (r *FloatHoltWintersReducer) Emit() []FloatPoint {
phi := r.phi

// Starting guesses
// NOTE: Since these values are guesses
// in the cases where we were missing data can just skip
// the value and call it good.

l_0 := 0.0
if r.seasonal {
for i := 0; i < m; i++ {
l_0 += (1 / float64(m)) * r.y[i]
if !math.IsNaN(r.y[i]) {
l_0 += (1 / float64(m)) * r.y[i]
}
}
} else {
l_0 += alpha * r.y[0]
Expand All @@ -413,17 +437,25 @@ func (r *FloatHoltWintersReducer) Emit() []FloatPoint {
b_0 := 0.0
if r.seasonal {
for i := 0; i < m && m+i < len(r.y); i++ {
b_0 += 1 / float64(m*m) * (r.y[m+i] - r.y[i])
if !math.IsNaN(r.y[i]) && !math.IsNaN(r.y[m+i]) {
b_0 += 1 / float64(m*m) * (r.y[m+i] - r.y[i])
}
}
} else {
b_0 = beta * (r.y[1] - r.y[0])
if !math.IsNaN(r.y[1]) {
b_0 = beta * (r.y[1] - r.y[0])
}
}

var s []float64
if r.seasonal {
s = make([]float64, m)
for i := 0; i < m; i++ {
s[i] = r.y[i] / l_0
if !math.IsNaN(r.y[i]) {
s[i] = r.y[i] / l_0
} else {
s[i] = 0
}
}
}

Expand All @@ -448,19 +480,21 @@ func (r *FloatHoltWintersReducer) Emit() []FloatPoint {
forecasted := r.forecast(r.h, params)
var points []FloatPoint
if r.includeAllData {
start := r.points[0].Time
points = make([]FloatPoint, len(forecasted))
for i, v := range forecasted {
t := r.last + r.interval*(int64(i)+1) - r.interval*int64(len(r.y))
t := start + r.interval*(int64(i))
points[i] = FloatPoint{
Value: v,
Time: t,
}
}
} else {
last := r.points[len(r.points)-1].Time
points = make([]FloatPoint, r.h)
forecasted := r.forecast(r.h, params)
for i, v := range forecasted[len(r.y):] {
t := r.last + r.interval*(int64(i)+1)
t := last + r.interval*(int64(i)+1)
points[i] = FloatPoint{
Value: v,
Time: t,
Expand Down Expand Up @@ -543,9 +577,12 @@ func (r *FloatHoltWintersReducer) sse(params []float64) float64 {
sse := 0.0
forecasted := r.forecast(0, params)
for i := range forecasted {
// Compute error
diff := forecasted[i] - r.y[i]
sse += diff * diff
// Skip missing values since we cannot use them to compute an error.
if !math.IsNaN(r.y[i]) {
// Compute error
diff := forecasted[i] - r.y[i]
sse += diff * diff
}
}
return sse
}
Expand Down
158 changes: 156 additions & 2 deletions influxql/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (
"github.com/influxdata/influxdb/influxql"
)

func almostEqual(got, exp float64) bool {
return math.Abs(got-exp) < 1e-5 && !math.IsNaN(got)
}

func TestHoltWinters_AusTourists(t *testing.T) {
hw := influxql.NewFloatHoltWintersReducer(10, 4, false)
// Dataset from http://www.inside-r.org/packages/cran/fpp/docs/austourists
Expand Down Expand Up @@ -87,7 +91,87 @@ func TestHoltWinters_AusTourists(t *testing.T) {
if exp, got := forecasted[i].Time, points[i].Time; got != exp {
t.Errorf("unexpected time on points[%d] got %v exp %v", i, got, exp)
}
if exp, got := forecasted[i].Value, points[i].Value; math.Abs(got-exp) > 1e-5 {
if exp, got := forecasted[i].Value, points[i].Value; !almostEqual(got, exp) {
t.Errorf("unexpected value on points[%d] got %v exp %v", i, got, exp)
}
}
}

func TestHoltWinters_AusTourists_Missing(t *testing.T) {
hw := influxql.NewFloatHoltWintersReducer(10, 4, false)
// Dataset from http://www.inside-r.org/packages/cran/fpp/docs/austourists
austourists := []influxql.FloatPoint{
{Time: 1, Value: 30.052513},
{Time: 3, Value: 25.317692},
{Time: 4, Value: 27.591437},
{Time: 5, Value: 32.076456},
{Time: 6, Value: 23.487961},
{Time: 7, Value: 28.47594},
{Time: 9, Value: 36.838485},
{Time: 10, Value: 25.007017},
{Time: 11, Value: 30.72223},
{Time: 12, Value: 28.693759},
{Time: 13, Value: 36.640986},
{Time: 14, Value: 23.824609},
{Time: 15, Value: 29.311683},
{Time: 16, Value: 31.770309},
{Time: 17, Value: 35.177877},
{Time: 19, Value: 29.60175},
{Time: 20, Value: 34.538842},
{Time: 21, Value: 41.273599},
{Time: 22, Value: 26.655862},
{Time: 23, Value: 28.279859},
{Time: 24, Value: 35.191153},
{Time: 25, Value: 41.727458},
{Time: 26, Value: 24.04185},
{Time: 27, Value: 32.328103},
{Time: 28, Value: 37.328708},
{Time: 30, Value: 29.346326},
{Time: 31, Value: 36.48291},
{Time: 32, Value: 42.977719},
{Time: 34, Value: 31.180221},
{Time: 35, Value: 37.717881},
{Time: 36, Value: 40.420211},
{Time: 37, Value: 51.206863},
{Time: 38, Value: 31.887228},
{Time: 41, Value: 55.558567},
{Time: 42, Value: 33.850915},
{Time: 43, Value: 42.076383},
{Time: 44, Value: 45.642292},
{Time: 45, Value: 59.76678},
{Time: 46, Value: 35.191877},
{Time: 47, Value: 44.319737},
{Time: 48, Value: 47.913736},
}

for _, p := range austourists {
hw.AggregateFloat(&p)
}
points := hw.Emit()
t.Log(points)

forecasted := []influxql.FloatPoint{
{Time: 49, Value: 54.39825435294697},
{Time: 50, Value: 41.93726334513928},
{Time: 51, Value: 54.909838091213345},
{Time: 52, Value: 57.1641355392107},
{Time: 53, Value: 57.164128921488114},
{Time: 54, Value: 44.06955989656805},
{Time: 55, Value: 57.701724090970124},
{Time: 56, Value: 60.07064109901599},
{Time: 57, Value: 60.0706341448159},
{Time: 58, Value: 46.310272882941966},
}

if exp, got := len(forecasted), len(points); exp != got {
t.Fatalf("unexpected number of points emitted: got %d exp %d", got, exp)
}

for i := range forecasted {
if exp, got := forecasted[i].Time, points[i].Time; got != exp {
t.Errorf("unexpected time on points[%d] got %v exp %v", i, got, exp)
}
if exp, got := forecasted[i].Value, points[i].Value; !almostEqual(got, exp) {
t.Errorf("unexpected value on points[%d] got %v exp %v", i, got, exp)
}
}
Expand Down Expand Up @@ -160,7 +244,77 @@ func TestHoltWinters_USPopulation(t *testing.T) {
if exp, got := forecasted[i].Time, points[i].Time; got != exp {
t.Errorf("unexpected time on points[%d] got %v exp %v", i, got, exp)
}
if exp, got := forecasted[i].Value, points[i].Value; math.Abs(got-exp) > 1e-5 {
if exp, got := forecasted[i].Value, points[i].Value; !almostEqual(got, exp) {
t.Errorf("unexpected value on points[%d] got %v exp %v", i, got, exp)
}
}
}

func TestHoltWinters_USPopulation_Missing(t *testing.T) {
series := []influxql.FloatPoint{
{Time: 1, Value: 3.93},
{Time: 2, Value: 5.31},
{Time: 3, Value: 7.24},
{Time: 4, Value: 9.64},
{Time: 5, Value: 12.90},
{Time: 6, Value: 17.10},
{Time: 7, Value: 23.20},
{Time: 8, Value: 31.40},
{Time: 10, Value: 50.20},
{Time: 11, Value: 62.90},
{Time: 12, Value: 76.00},
{Time: 13, Value: 92.00},
{Time: 15, Value: 122.80},
{Time: 16, Value: 131.70},
{Time: 17, Value: 151.30},
{Time: 19, Value: 203.20},
}
hw := influxql.NewFloatHoltWintersReducer(10, 0, true)
for _, p := range series {
hw.AggregateFloat(&p)
}
points := hw.Emit()

forecasted := []influxql.FloatPoint{
{Time: 1, Value: 3.93},
{Time: 2, Value: -0.48020223172549237},
{Time: 3, Value: 4.802781783666482},
{Time: 4, Value: 9.877595464844614},
{Time: 5, Value: 15.247582981982251},
{Time: 6, Value: 21.11408248318643},
{Time: 7, Value: 27.613033499005994},
{Time: 8, Value: 34.85894117561601},
{Time: 9, Value: 42.96187408308528},
{Time: 10, Value: 52.03593886054132},
{Time: 11, Value: 62.20424563556273},
{Time: 12, Value: 73.60229860599725},
{Time: 13, Value: 86.38069796377347},
{Time: 14, Value: 100.70760028319269},
{Time: 15, Value: 116.77117933639244},
{Time: 16, Value: 134.78222852768778},
{Time: 17, Value: 154.97699617972333},
{Time: 18, Value: 177.6203209232622},
{Time: 19, Value: 203.00912417351864},
{Time: 20, Value: 231.47631387044993},
{Time: 21, Value: 263.39515509958034},
{Time: 22, Value: 299.18416724280326},
{Time: 23, Value: 339.31261310834543},
{Time: 24, Value: 384.3066526673811},
{Time: 25, Value: 434.756242430451},
{Time: 26, Value: 491.3228711104303},
{Time: 27, Value: 554.748233097815},
{Time: 28, Value: 625.8639535249647},
{Time: 29, Value: 705.6024924601211},
}

if exp, got := len(forecasted), len(points); exp != got {
t.Fatalf("unexpected number of points emitted: got %d exp %d", got, exp)
}
for i := range forecasted {
if exp, got := forecasted[i].Time, points[i].Time; got != exp {
t.Errorf("unexpected time on points[%d] got %v exp %v", i, got, exp)
}
if exp, got := forecasted[i].Value, points[i].Value; !almostEqual(got, exp) {
t.Errorf("unexpected value on points[%d] got %v exp %v", i, got, exp)
}
}
Expand Down
Loading

0 comments on commit 23adff7

Please sign in to comment.