diff --git a/pkg/backends/irondb/model/model.go b/pkg/backends/irondb/model/model.go index 234d3add8..0e326f0df 100644 --- a/pkg/backends/irondb/model/model.go +++ b/pkg/backends/irondb/model/model.go @@ -35,10 +35,11 @@ import ( // SeriesEnvelope values represent a time series data response from the // IRONdb API. type SeriesEnvelope struct { - Data DataPoints `json:"data"` - ExtentList timeseries.ExtentList `json:"extents,omitempty"` - StepDuration time.Duration `json:"step,omitempty"` - timeRangeQuery *timeseries.TimeRangeQuery + Data DataPoints `json:"data"` + ExtentList timeseries.ExtentList `json:"extents,omitempty"` + StepDuration time.Duration `json:"step,omitempty"` + timeRangeQuery *timeseries.TimeRangeQuery + VolatileExtentList timeseries.ExtentList `json:"-"` } // MarshalJSON encodes a series envelope value into a JSON byte slice. @@ -102,6 +103,14 @@ func (se *SeriesEnvelope) UnmarshalJSON(b []byte) error { return err } +func (se *SeriesEnvelope) VolatileExtents() timeseries.ExtentList { + return se.VolatileExtentList +} + +func (se *SeriesEnvelope) SetVolatileExtents(e timeseries.ExtentList) { + se.VolatileExtentList = e +} + // DataPoint values represent a single data element of a time series data // response from the IRONdb API. type DataPoint struct { diff --git a/pkg/backends/irondb/model/model_df4.go b/pkg/backends/irondb/model/model_df4.go index b26999672..9a0124b84 100644 --- a/pkg/backends/irondb/model/model_df4.go +++ b/pkg/backends/irondb/model/model_df4.go @@ -34,6 +34,9 @@ type DF4SeriesEnvelope struct { StepDuration time.Duration `json:"step,omitempty"` ExtentList timeseries.ExtentList `json:"extents,omitempty"` timeRangeQuery *timeseries.TimeRangeQuery + // VolatileExtents is the list extents in the dataset that should be refreshed + // on the next request to the Origin + VolatileExtentList timeseries.ExtentList `json:"-"` } // DF4Info values contain information about the timestamps of the data elements @@ -364,3 +367,11 @@ func (se *DF4SeriesEnvelope) Size() int64 { wg.Wait() return c } + +func (se *DF4SeriesEnvelope) VolatileExtents() timeseries.ExtentList { + return se.VolatileExtentList +} + +func (se *DF4SeriesEnvelope) SetVolatileExtents(e timeseries.ExtentList) { + se.VolatileExtentList = e +} diff --git a/pkg/backends/options/defaults.go b/pkg/backends/options/defaults.go index deacbb7b4..2e244dafb 100644 --- a/pkg/backends/options/defaults.go +++ b/pkg/backends/options/defaults.go @@ -47,6 +47,8 @@ const ( DefaultTracingConfigName = "default" // DefaultBackfillToleranceMS is the default Backfill Tolerance setting for Backends DefaultBackfillToleranceMS = 0 + // DefaultBackfillTolerancePoints is the default Backfill Tolerance setting for Backends + DefaultBackfillTolerancePoints = 0 // DefaultKeepAliveTimeoutMS is the default Keep Alive Timeout for Backends' upstream client pools DefaultKeepAliveTimeoutMS = 300000 // DefaultMaxIdleConns is the default number of Idle Connections in Backends' upstream client pools diff --git a/pkg/backends/options/options.go b/pkg/backends/options/options.go index 4d790b1cb..dd1f35d5f 100644 --- a/pkg/backends/options/options.go +++ b/pkg/backends/options/options.go @@ -77,10 +77,14 @@ type Options struct { // TimeseriesEvictionMethodName specifies which methodology ("oldest", "lru") is used to identify //timeseries to evict from a full cache object TimeseriesEvictionMethodName string `yaml:"timeseries_eviction_method,omitempty"` - // BackfillToleranceMS prevents values with timestamps newer than the provided - // number of seconds from being cached this allows propagation of upstream backfill operations - // that modify recently-served data + // BackfillToleranceMS prevents values with timestamps newer than the provided number of + // milliseconds from being cached. this allows propagation of upstream backfill operations + // that modify recently-cached data BackfillToleranceMS int64 `yaml:"backfill_tolerance_ms,omitempty"` + // BackfillTolerancePoints is similar to the MS version, except that it's final value is dependent + // on the query step value to determine the relative duration of backfill tolerance per-query + // When both are set, the higher of the two values is used + BackfillTolerancePoints int `yaml:"backfill_tolerance_points,omitempty"` // PathList is a list of Path Options that control the behavior of the given paths when requested Paths map[string]*po.Options `yaml:"paths,omitempty"` // NegativeCacheName provides the name of the Negative Cache Config to be used by this Backend @@ -188,8 +192,9 @@ type Options struct { // New will return a pointer to a Backend Options with the default configuration settings func New() *Options { return &Options{ - BackfillTolerance: DefaultBackfillToleranceMS, + BackfillTolerance: time.Duration(DefaultBackfillToleranceMS) * time.Millisecond, BackfillToleranceMS: DefaultBackfillToleranceMS, + BackfillTolerancePoints: DefaultBackfillTolerancePoints, CacheKeyPrefix: "", CacheName: DefaultBackendCacheName, CompressableTypeList: DefaultCompressableTypes(), @@ -226,6 +231,7 @@ func (o *Options) Clone() *Options { no.DearticulateUpstreamRanges = o.DearticulateUpstreamRanges no.BackfillTolerance = o.BackfillTolerance no.BackfillToleranceMS = o.BackfillToleranceMS + no.BackfillTolerancePoints = o.BackfillTolerancePoints no.CacheName = o.CacheName no.CacheKeyPrefix = o.CacheKeyPrefix no.FastForwardDisable = o.FastForwardDisable @@ -552,6 +558,10 @@ func SetDefaults( no.BackfillToleranceMS = o.BackfillToleranceMS } + if metadata.IsDefined("backends", name, "backfill_tolerance_points") { + no.BackfillTolerancePoints = o.BackfillTolerancePoints + } + if metadata.IsDefined("backends", name, "paths") { err := po.SetDefaults(name, metadata, o.Paths, crw) if err != nil { diff --git a/pkg/backends/options/options_data_test.go b/pkg/backends/options/options_data_test.go index ba9a13b96..858a32974 100644 --- a/pkg/backends/options/options_data_test.go +++ b/pkg/backends/options/options_data_test.go @@ -52,6 +52,7 @@ backends: timeseries_eviction_method: lru fast_forward_disable: true backfill_tolerance_ms: 301000 + backfill_tolerance_points: 2 timeout_ms: 37000 health_check_endpoint: /test_health health_check_upstream_path: /test/upstream/endpoint @@ -84,6 +85,12 @@ backends: collapsed_forwarding: basic response_headers: X-Header-Test: test-value + prometheus: + labels: + testlabel: trickster + alb: + methodology: rr + pool: [ test ] tls: full_chain_cert_path: file.that.should.not.exist.ever.pem private_key_path: file.that.should.not.exist.ever.pem diff --git a/pkg/proxy/engines/deltaproxycache.go b/pkg/proxy/engines/deltaproxycache.go index 7b3623db5..6067fafa0 100644 --- a/pkg/proxy/engines/deltaproxycache.go +++ b/pkg/proxy/engines/deltaproxycache.go @@ -87,17 +87,11 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request, modeler *tim pr := newProxyRequest(r, w) rlo.FastForwardDisable = o.FastForwardDisable || rlo.FastForwardDisable trq.NormalizeExtent() - - // this is used to ensure the head of the cache respects the BackFill Tolerance - bf := timeseries.Extent{Start: time.Unix(0, 0), End: trq.Extent.End} - bt := trq.GetBackfillTolerance(o.BackfillTolerance) - - if !trq.IsOffset && bt > 0 { // TODO: research if we need this clause: && !time.Now().Add(-bt).After(bf.End) { - bf.End = bf.End.Add(-bt) - } - now := time.Now() + bt := trq.GetBackfillTolerance(o.BackfillTolerance, o.BackfillTolerancePoints) + bfs := now.Add(-bt).Truncate(trq.Step) // start of the backfill tolerance window + OldestRetainedTimestamp := time.Time{} if o.TimeseriesEvictionMethod == evictionmethods.EvictionMethodOldest { OldestRetainedTimestamp = now.Truncate(trq.Step).Add(-(trq.Step * o.TimeseriesRetention)) @@ -108,13 +102,6 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request, modeler *tim DoProxy(w, r, true) return } - if trq.Extent.Start.After(bf.End) { - tl.Debug(pr.Logger, "timerange is too new to cache due to backfill tolerance", - tl.Pairs{"backFillToleranceSecs": bt, - "newestRetainedTimestamp": bf.End, "queryStart": trq.Extent.Start}) - DoProxy(w, r, true) - return - } } client.SetExtent(pr.upstreamRequest, trq, &trq.Extent) @@ -198,18 +185,6 @@ checkCache: DoProxy(w, r, true) return } - if trq.Extent.Start.After(el[len(el)-1].End) { - pr.cacheLock.RRelease() - tl.Debug(pr.Logger, "timerange not cached due to backfill tolerance", - tl.Pairs{ - "backFillToleranceSecs": bt, - "newestRetainedTimestamp": bf.End, - "queryStart": trq.Extent.Start, - }, - ) - DoProxy(w, r, true) - return - } } } cacheStatus = status.LookupStatusPartialHit @@ -218,9 +193,21 @@ checkCache: } // Find the ranges that we want, but which are not currently cached - var missRanges timeseries.ExtentList + var missRanges, cvr timeseries.ExtentList + vr := cts.VolatileExtents() if cacheStatus == status.LookupStatusPartialHit { missRanges = cts.Extents().CalculateDeltas(trq.Extent, trq.Step) + // this is the backfill part of backfill tolerance. if there are any volatile + // ranges in the timeseries, this determines if any fall within the client's + // requested range and ensures they are re-requested. this only happens if + // the request is already a phit + if bt > 0 && len(missRanges) > 0 && len(vr) > 0 { + // this checks the timeseries's volatile ranges for any overlap with + // the request extent, and adds those to the missRanges to refresh + if cvr = vr.Crop(trq.Extent); len(cvr) > 0 { + missRanges = append(missRanges, cvr...).Compress(trq.Step) + } + } } if len(missRanges) == 0 && cacheStatus == status.LookupStatusPartialHit { @@ -244,7 +231,7 @@ checkCache: // in this case, it's not a cache hit, so something is _likely_ going to be cached now. // we write lock here, so as to prevent other concurrent client requests for the same url, // which will have the same cacheStatus, from causing the same or similar HTTP requests - // to be made against the origin, since just one should doc. + // to be made against the origin, since just one should do. // acquire a write lock via the Upgrade method, which will swap the read lock for a // write lock, and return true if this client was the only one, or otherwise the first @@ -383,6 +370,43 @@ checkCache: cts.Merge(true, mts...) } + // this handles the tolerance part of backfill tolerance, by adding new tolerable ranges to + // the timeseries's volatile list, and removing those that no longer tolerate backfill + if bt > 0 && cacheStatus != status.LookupStatusHit { + + var shouldCompress bool + ve := cts.VolatileExtents() + + // first, remove those that are now too old to tolerate backfill. + if len(cvr) > 0 { + // this updates the timeseries's volatile list to remove anything just fetched that is + // older than the current backfill tolerance timestamp; so it is now immutable in cache + ve = ve.Remove(cvr, trq.Step) + shouldCompress = true + } + + // now add in any new time ranges that should tolerate backfill + var adds timeseries.Extent + if trq.Extent.End.After(bfs) { + adds.End = trq.Extent.End + if trq.Extent.Start.Before(bfs) { + adds.Start = bfs + } else { + adds.Start = trq.Extent.Start + } + } + if !adds.End.IsZero() { + ve = append(ve, adds) + shouldCompress = true + } + + // if any changes happened to the volatile list, set it in the cached timeseries + if shouldCompress { + cts.SetVolatileExtents(ve.Compress(trq.Step)) + } + + } + // cts is the cacheable time series, rts is the user's response timeseries var rts timeseries.Timeseries if cacheStatus != status.LookupStatusKeyMiss { @@ -399,9 +423,9 @@ checkCache: // Backfill Tolerance before storing to cache switch o.TimeseriesEvictionMethod { case evictionmethods.EvictionMethodLRU: - cts.CropToSize(o.TimeseriesRetentionFactor, bf.End, trq.Extent) + cts.CropToSize(o.TimeseriesRetentionFactor, now, trq.Extent) default: - cts.CropToRange(timeseries.Extent{End: bf.End, Start: OldestRetainedTimestamp}) + cts.CropToRange(timeseries.Extent{End: now, Start: OldestRetainedTimestamp}) } // Don't cache datasets with empty extents // (everything was cropped so there is nothing to cache) diff --git a/pkg/proxy/engines/deltaproxycache_test.go b/pkg/proxy/engines/deltaproxycache_test.go index 34e3a6022..a65b54759 100644 --- a/pkg/proxy/engines/deltaproxycache_test.go +++ b/pkg/proxy/engines/deltaproxycache_test.go @@ -154,64 +154,6 @@ func TestDeltaProxyCacheRequestMissThenHit(t *testing.T) { } } -func TestDeltaProxyCacheRequestAllItemsTooNew(t *testing.T) { - - ts, w, r, rsc, err := setupTestHarnessDPC() - if err != nil { - t.Error(err) - } - defer ts.Close() - - client := rsc.BackendClient.(*TestClient) - o := rsc.BackendOptions - rsc.CacheConfig.Provider = "test" - - o.FastForwardDisable = true - o.BackfillToleranceMS = 600000 - o.BackfillTolerance = time.Millisecond * time.Duration(o.BackfillToleranceMS) - - step := time.Duration(300) * time.Second - end := time.Now() - - extr := timeseries.Extent{Start: end.Add(-time.Duration(5) * time.Minute), End: end} - - expected, _, _ := mockprom.GetTimeSeriesData(queryReturnsOKNoLatency, extr.Start, extr.End, step) - - u := r.URL - u.Path = "/prometheus/api/v1/query_range" - u.RawQuery = fmt.Sprintf("step=%d&start=%d&end=%d&query=%s", - int(step.Seconds()), extr.Start.Unix(), extr.End.Unix(), queryReturnsOKNoLatency) - - client.QueryRangeHandler(w, r) - resp := w.Result() - - bodyBytes, err := io.ReadAll(resp.Body) - if err != nil { - t.Error(err) - } - - err = testStringMatch(string(bodyBytes), expected) - if err != nil { - t.Error(err) - } - - err = testStatusCodeMatch(resp.StatusCode, http.StatusOK) - if err != nil { - t.Error(err) - } - - if resp.Header.Get("status") != "" { - t.Errorf("status header should not be present. Found with value %s", resp.Header.Get("stattus")) - } - - // ensure the request was sent through the proxy instead of the DeltaProxyCache - err = testResultHeaderPartMatch(resp.Header, map[string]string{"engine": "HTTPProxy"}) - if err != nil { - t.Error(err) - } - -} - func TestDeltaProxyCacheRequestRemoveStale(t *testing.T) { ts, w, r, rsc, err := setupTestHarnessDPC() @@ -1465,7 +1407,6 @@ func TestDeltaProxyCacheRequest_BackfillTolerance(t *testing.T) { xn := timeseries.Extent{Start: now.Add(-time.Duration(6) * time.Hour).Truncate(step), End: now.Truncate(step)} // We can predict what slice will need to be fetched and ensure that is only what is requested upstream - expectedFetched := fmt.Sprintf("[%s]", timeseries.Extent{Start: xn.End, End: xn.End}) expected, _, _ := mockprom.GetTimeSeriesData(query, xn.Start, xn.End, step) u := r.URL @@ -1519,12 +1460,7 @@ func TestDeltaProxyCacheRequest_BackfillTolerance(t *testing.T) { t.Error(err) } - err = testResultHeaderPartMatch(resp.Header, map[string]string{"status": "phit"}) - if err != nil { - t.Error(err) - } - - err = testResultHeaderPartMatch(resp.Header, map[string]string{"fetched": expectedFetched}) + err = testResultHeaderPartMatch(resp.Header, map[string]string{"status": "hit"}) if err != nil { t.Error(err) } diff --git a/pkg/timeseries/dataset/dataset.go b/pkg/timeseries/dataset/dataset.go index a45b0a923..de5d1643c 100644 --- a/pkg/timeseries/dataset/dataset.go +++ b/pkg/timeseries/dataset/dataset.go @@ -53,6 +53,9 @@ type DataSet struct { Warnings []string `msg:"warnings"` // TimeRangeQuery is the trq associated with the Timeseries TimeRangeQuery *timeseries.TimeRangeQuery `msg:"trq"` + // VolatileExtents is the list extents in the dataset that should be refreshed + // on the next request to the Origin + VolatileExtentList timeseries.ExtentList `msg:"volatile_extents"` // Sorter is the DataSet's Sort function, which defaults to DefaultSort Sorter func() `msg:"-"` // Merger is the DataSet's Merge function, which defaults to DefaultMerge @@ -129,6 +132,7 @@ func (ds *DataSet) CroppedClone(e timeseries.Extent) timeseries.Timeseries { return clone } clone.ExtentList = ds.ExtentList.Clone().Crop(e) + clone.VolatileExtentList = ds.VolatileExtentList.Clone().Crop(e) startNS := epoch.Epoch(e.Start.UnixNano()) endNS := epoch.Epoch(e.End.UnixNano()) @@ -210,6 +214,10 @@ func (ds *DataSet) Clone() timeseries.Timeseries { clone.ExtentList = ds.ExtentList.Clone() } + if ds.VolatileExtentList != nil { + clone.VolatileExtentList = ds.VolatileExtentList.Clone() + } + for i := range ds.Results { clone.Results[i] = ds.Results[i].Clone() } @@ -398,6 +406,7 @@ func (ds *DataSet) DefaultRangeCropper(e timeseries.Extent) { } ds.ExtentList = ds.ExtentList.Crop(e) + ds.VolatileExtentList = ds.VolatileExtentList.Clone().Crop(e) // if the series extent is entirely inside the extent of the crop range, // simply adjust down its ExtentList @@ -553,3 +562,13 @@ func MarshalDataSet(ts timeseries.Timeseries, rlo *timeseries.RequestOptions, st } return ds.MarshalMsg(nil) } + +// VolatileExtents returns the list of time Extents in the dataset that should be re-fetched +func (ds *DataSet) VolatileExtents() timeseries.ExtentList { + return ds.VolatileExtentList +} + +// SetVolatileExtents sets the list of time Extents in the dataset that should be re-fetched +func (ds *DataSet) SetVolatileExtents(e timeseries.ExtentList) { + ds.VolatileExtentList = e +} diff --git a/pkg/timeseries/extent.go b/pkg/timeseries/extent.go index f79bd76f4..1b7bc9e72 100644 --- a/pkg/timeseries/extent.go +++ b/pkg/timeseries/extent.go @@ -40,11 +40,31 @@ func (e *Extent) StartsAt(t time.Time) bool { return t.Equal(e.Start) } +// StartsAtOrBefore returns true if t is equal or before to the Extent's start time +func (e *Extent) StartsAtOrBefore(t time.Time) bool { + return t.Equal(e.Start) || e.Start.Before(t) +} + +// StartsAtOrAfter returns true if t is equal to or after the Extent's start time +func (e *Extent) StartsAtOrAfter(t time.Time) bool { + return t.Equal(e.Start) || e.Start.After(t) +} + // EndsAt returns true if t is equal to the Extent's end time func (e *Extent) EndsAt(t time.Time) bool { return t.Equal(e.End) } +// EndsAtOrBefore returns true if t is equal to or earlier than the Extent's end time +func (e *Extent) EndsAtOrBefore(t time.Time) bool { + return t.Equal(e.End) || e.End.Before(t) +} + +// EndsAtOrAfter returns true if t is equal to or after the Extent's end time +func (e *Extent) EndsAtOrAfter(t time.Time) bool { + return t.Equal(e.End) || e.End.After(t) +} + // After returns true if the range of the Extent is completely after the provided time func (e *Extent) After(t time.Time) bool { return t.Before(e.Start) diff --git a/pkg/timeseries/extent_list.go b/pkg/timeseries/extent_list.go index 95a254024..29021b1ca 100644 --- a/pkg/timeseries/extent_list.go +++ b/pkg/timeseries/extent_list.go @@ -169,7 +169,7 @@ func (el ExtentList) Clone() ExtentList { // CloneRange returns a perfect copy of the ExtentList, cloning only the // Extents in the provided index range (upper-bound exclusive) func (el ExtentList) CloneRange(start, end int) ExtentList { - if end < start { + if end < start || start < 0 || end < 0 { return nil } size := end - start @@ -187,6 +187,92 @@ func (el ExtentList) CloneRange(start, end int) ExtentList { return c } +// Equal returns true if the provided extent list is identical to the subject list +func (el ExtentList) Equal(el2 ExtentList) bool { + if el2 == nil { + return false + } + + l := len(el) + l2 := len(el2) + if l != l2 { + return false + } + + for i := range el { + if el2[i] != el[i] { + return false + } + } + return true +} + +// Remove removes the provided extent list ranges from the subject extent list +func (el ExtentList) Remove(r ExtentList, step time.Duration) ExtentList { + if len(r) == 0 { + return el + } + if len(el) == 0 { + return r + } + + splices := make(map[int]interface{}) + spliceIns := make(map[int]Extent) + c := el.Clone() + for _, rem := range r { + for i, ex := range c { + + if rem.End.Before(ex.Start) || rem.Start.After(ex.End) { + // removal range is not relevant to this extent + continue + } + + if rem.StartsAtOrBefore(ex.Start) && rem.EndsAtOrAfter(ex.End) { + // removal range end is >= extent.End, and start is <= extent.Start + // so the entire range will be spliced out of the list + splices[i] = nil + continue + } + + // the removal is fully inside of the extent, it must be split into two + if rem.Start.After(ex.Start) && rem.End.Before(ex.End) { + // the first piece will be inserted back in later + spliceIns[i] = Extent{Start: ex.Start, End: rem.Start.Add(-step)} + // the existing piece will be adjusted in place + c[i].Start = rem.End.Add(step) + continue + } + + // The removal is attached to only one side of the extent, so the + // boundaries can be adjusted + if rem.Start.After(ex.Start) { + c[i].End = rem.Start.Add(-step) + } else if rem.End.Before(ex.End) { + c[i].Start = rem.End.Add(step) + } + + } + } + + // if the clone is final, return it now + if len(splices) == 0 && len(spliceIns) == 0 { + return c + } + + // otherwise, make a version of the does not include the splice out indexes + // and includes any splice-in indexes + r = make(ExtentList, 0, len(r)+len(spliceIns)) + for i, ex := range c { + if ex2, ok := spliceIns[i]; ok { + r = append(r, ex2) + } + if _, ok := splices[i]; !ok { + r = append(r, ex) + } + } + return r +} + // TimestampCount returns the calculated number of timestamps based on the extents // in the list and the provided duration func (el ExtentList) TimestampCount(d time.Duration) int64 { diff --git a/pkg/timeseries/extent_list_test.go b/pkg/timeseries/extent_list_test.go index 32486a3e5..217253581 100644 --- a/pkg/timeseries/extent_list_test.go +++ b/pkg/timeseries/extent_list_test.go @@ -167,6 +167,158 @@ func TestInsideOf(t *testing.T) { } +func TestRemove(t *testing.T) { + + step := time.Second * 1 + + tests := []struct { + el ExtentList + removals ExtentList + expected ExtentList + }{ + { // Case 0 (splice entire line) + ExtentList{ + Extent{Start: t100, End: t200}, + Extent{Start: t600, End: t900}, + Extent{Start: t1100, End: t1300}, + }, + ExtentList{ + Extent{Start: t100, End: t200}, + }, + ExtentList{ + Extent{Start: t600, End: t900}, + Extent{Start: t1100, End: t1300}, + }, + }, + { // case 1 (adjust start) + ExtentList{ + Extent{Start: t100, End: t200}, + Extent{Start: t600, End: t900}, + Extent{Start: t1100, End: t1300}, + }, + ExtentList{ + Extent{Start: t100, End: t100}, + }, + ExtentList{ + Extent{Start: t101, End: t200}, + Extent{Start: t600, End: t900}, + Extent{Start: t1100, End: t1300}, + }, + }, + { // case 2 (adjust end) + ExtentList{ + Extent{Start: t100, End: t201}, + Extent{Start: t600, End: t900}, + Extent{Start: t1100, End: t1300}, + }, + ExtentList{ + Extent{Start: t201, End: t201}, + }, + ExtentList{ + Extent{Start: t100, End: t200}, + Extent{Start: t600, End: t900}, + Extent{Start: t1100, End: t1300}, + }, + }, + { // case 3 (adjust start and end) + ExtentList{ + Extent{Start: t100, End: t201}, + Extent{Start: t600, End: t900}, + Extent{Start: t1100, End: t1300}, + }, + ExtentList{ + Extent{Start: t100, End: t100}, + Extent{Start: t201, End: t201}, + }, + ExtentList{ + Extent{Start: t101, End: t200}, + Extent{Start: t600, End: t900}, + Extent{Start: t1100, End: t1300}, + }, + }, + { // case 4 (overlap) + ExtentList{ + Extent{Start: t100, End: t201}, + Extent{Start: t600, End: t900}, + Extent{Start: t1100, End: t1300}, + }, + ExtentList{ + Extent{Start: t101, End: t200}, + }, + ExtentList{ + Extent{Start: t100, End: t100}, + Extent{Start: t201, End: t201}, + Extent{Start: t600, End: t900}, + Extent{Start: t1100, End: t1300}, + }, + }, + { // Case 5 (splice entire line 2) + ExtentList{ + Extent{Start: t101, End: t200}, + Extent{Start: t600, End: t900}, + Extent{Start: t1100, End: t1300}, + }, + ExtentList{ + Extent{Start: t100, End: t200}, + }, + ExtentList{ + Extent{Start: t600, End: t900}, + Extent{Start: t1100, End: t1300}, + }, + }, + { // Case 6 (splice entire line 3) + ExtentList{ + Extent{Start: t101, End: t200}, + Extent{Start: t600, End: t900}, + Extent{Start: t1100, End: t1300}, + }, + ExtentList{ + Extent{Start: t100, End: t201}, + }, + ExtentList{ + Extent{Start: t600, End: t900}, + Extent{Start: t1100, End: t1300}, + }, + }, + { // Case 7 empty removals + ExtentList{ + Extent{Start: t101, End: t200}, + Extent{Start: t600, End: t900}, + Extent{Start: t1100, End: t1300}, + }, + ExtentList{}, + ExtentList{ + Extent{Start: t101, End: t200}, + Extent{Start: t600, End: t900}, + Extent{Start: t1100, End: t1300}, + }, + }, + { // Case 8 subject list + ExtentList{}, + ExtentList{ + Extent{Start: t101, End: t200}, + Extent{Start: t600, End: t900}, + Extent{Start: t1100, End: t1300}, + }, + ExtentList{ + Extent{Start: t101, End: t200}, + Extent{Start: t600, End: t900}, + Extent{Start: t1100, End: t1300}, + }, + }, + } + + for i, test := range tests { + t.Run(strconv.Itoa(i), func(t *testing.T) { + v := test.el.Remove(test.removals, step) + if !v.Equal(test.expected) { + t.Errorf("expected %v got %v", test.expected, v) + } + }) + } + +} + func TestOutsideOf(t *testing.T) { el := ExtentList{ @@ -240,6 +392,24 @@ func TestString(t *testing.T) { } +func TestCloneRange(t *testing.T) { + + el := ExtentList{ + Extent{Start: t600, End: t900}, + } + + res := el.CloneRange(-1, -1) + if res != nil { + t.Error("expected nil result", res) + } + + res = el.CloneRange(0, 200) + if res != nil { + t.Error("expected nil result", res) + } + +} + func TestCrop(t *testing.T) { el := ExtentList{ @@ -493,6 +663,36 @@ func TestCrop(t *testing.T) { } }) } +} + +func TestEqual(t *testing.T) { + + el := ExtentList{ + Extent{Start: t100, End: t200}, + Extent{Start: t600, End: t900}, + Extent{Start: t1100, End: t1300}, + } + + b := el.Equal(nil) + if b { + t.Error("expected false") + } + + b = el.Equal(ExtentList{}) + if b { + t.Error("expected false") + } + + el2 := ExtentList{ + Extent{Start: t101, End: t200}, + Extent{Start: t600, End: t900}, + Extent{Start: t1100, End: t1300}, + } + + b = el.Equal(el2) + if b { + t.Error("expected false") + } } @@ -679,3 +879,20 @@ func TestCalculateDeltas(t *testing.T) { }) } } + +func TestTimestampCount(t *testing.T) { + + el := ExtentList{ + Extent{Start: t100, End: t200}, + Extent{Start: t600, End: t900}, + Extent{}, + Extent{Start: t1100, End: t1300}, + } + + const expected int64 = 9 + + if v := el.TimestampCount(time.Second * 100); v != expected { + t.Errorf("expected %d got %d", expected, v) + } + +} diff --git a/pkg/timeseries/extent_test.go b/pkg/timeseries/extent_test.go new file mode 100644 index 000000000..576a70463 --- /dev/null +++ b/pkg/timeseries/extent_test.go @@ -0,0 +1,55 @@ +/* + * Copyright 2018 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package timeseries + +import "testing" + +func TestStartsAtOrAfter(t *testing.T) { + + e := Extent{Start: t101, End: t200} + if !e.StartsAtOrAfter(t100) { + t.Error(("expected true")) + } + + if e.StartsAtOrAfter(t200) { + t.Error(("expected false")) + } + +} + +func TestEndsAtOrBefore(t *testing.T) { + + e := Extent{Start: t101, End: t200} + if e.EndsAtOrBefore(t100) { + t.Error(("expected false")) + } + + if !e.EndsAtOrBefore(t201) { + t.Error(("expected true")) + } + +} + +// // StartsAtOrAfter returns true if t is equal to or after the Extent's start time +// func (e *Extent) StartsAtOrAfter(t time.Time) bool { +// return t.Equal(e.Start) || e.Start.After(t) +// } + +// // EndsAtOrBefore returns true if t is equal to or earlier than the Extent's end time +// func (e *Extent) EndsAtOrBefore(t time.Time) bool { +// return t.Equal(e.End) || e.End.Before(t) +// } diff --git a/pkg/timeseries/field_definition_test.go b/pkg/timeseries/field_definition_test.go index 3cf8e2ef1..189d60cc7 100644 --- a/pkg/timeseries/field_definition_test.go +++ b/pkg/timeseries/field_definition_test.go @@ -32,3 +32,20 @@ func TestFieldDefinitionClone(t *testing.T) { } } + +func TestFieldDefinitionString(t *testing.T) { + + fd := FieldDefinitions{ + FieldDefinition{ + Name: "test", + DataType: FieldDataType(1), + }, + } + + const expected = `[{"name":"test","type":1,"pos":0,"stype":"","provider1":0}]` + + if fd.String() != expected { + t.Errorf("expected `%s` got `%s`", expected, fd.String()) + } + +} diff --git a/pkg/timeseries/modeler_test.go b/pkg/timeseries/modeler_test.go new file mode 100644 index 000000000..1cca18098 --- /dev/null +++ b/pkg/timeseries/modeler_test.go @@ -0,0 +1,37 @@ +/* + * Copyright 2018 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package timeseries + +import "testing" + +func TestNewModeler(t *testing.T) { + + f := func([]byte, *TimeRangeQuery) (Timeseries, error) { + return nil, nil + } + + m := NewModeler(f, nil, nil, nil, f, nil) + + if m.WireUnmarshaler == nil { + t.Error("expected non-nil WireUnmarshaler") + } + + if m.CacheUnmarshaler == nil { + t.Error("expected non-nil CacheUnmarshaler") + } + +} diff --git a/pkg/timeseries/request_options_test.go b/pkg/timeseries/request_options_test.go new file mode 100644 index 000000000..68b4ebc56 --- /dev/null +++ b/pkg/timeseries/request_options_test.go @@ -0,0 +1,29 @@ +/* + * Copyright 2018 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package timeseries + +import ( + "testing" +) + +func TestExtractFastForwardDisabled(t *testing.T) { + ro := &RequestOptions{} + ro.ExtractFastForwardDisabled("test query trickster-fast-forward:off ") + if !ro.FastForwardDisable { + t.Error("expected true") + } +} diff --git a/pkg/timeseries/timerangequery.go b/pkg/timeseries/timerangequery.go index 79ddf180a..be1e0d573 100644 --- a/pkg/timeseries/timerangequery.go +++ b/pkg/timeseries/timerangequery.go @@ -116,14 +116,22 @@ func (trq *TimeRangeQuery) String() string { } // GetBackfillTolerance will return the backfill tolerance for the query based on the provided -// default, and any query-specific tolerance directives included in the query comments -func (trq *TimeRangeQuery) GetBackfillTolerance(def time.Duration) time.Duration { +// defaults, and any query-specific tolerance directives included in the query comments +func (trq *TimeRangeQuery) GetBackfillTolerance(def time.Duration, points int) time.Duration { if trq.BackfillTolerance > 0 { return trq.BackfillTolerance } if trq.BackfillTolerance < 0 { return 0 } + + if points > 0 { + sd := time.Duration(points) * trq.Step + if sd > def { + return sd + } + } + return def } diff --git a/pkg/timeseries/timerangequery_test.go b/pkg/timeseries/timerangequery_test.go index 9acbcfa85..ee195cebb 100644 --- a/pkg/timeseries/timerangequery_test.go +++ b/pkg/timeseries/timerangequery_test.go @@ -76,12 +76,41 @@ func TestClone(t *testing.T) { u, _ := url.Parse("http://127.0.0.1/") trq := &TimeRangeQuery{Statement: "1234", Extent: Extent{Start: time.Unix(5, 0), End: time.Unix(10, 0)}, Step: time.Duration(5) * time.Second, TemplateURL: u} + + trq.TagFieldDefintions = []FieldDefinition{{}} + trq.ValueFieldDefinitions = []FieldDefinition{{}} + trq.Labels = map[string]string{"test": "trickster"} + c := trq.Clone() if !reflect.DeepEqual(trq, c) { t.Errorf("expected %s got %s", trq.String(), c.String()) } } +func TestSizeTRQ(t *testing.T) { + + u, _ := url.Parse("http://127.0.0.1/") + trq := &TimeRangeQuery{Statement: "1234", Extent: Extent{Start: time.Unix(5, 0), + End: time.Unix(10, 0)}, Step: time.Duration(5) * time.Second, TemplateURL: u} + + size := trq.Size() + + if size != 119 { + t.Errorf("expected %d got %d", 119, size) + } +} + +func TestExtractBackfillTolerance(t *testing.T) { + + trq := &TimeRangeQuery{} + + trq.ExtractBackfillTolerance("testing trickster-backfill-tolerance:30 ") + + if trq.BackfillTolerance != time.Second*30 { + t.Error("expected 30 got", trq.BackfillTolerance) + } +} + func TestStringTRQ(t *testing.T) { const expected = `{ "statement": "1234", "step": "5s", "extent": "5000-10000", "tsd": "{"name":"","type":0,"pos":0,"stype":"","provider1":0}", "td": [], "vd": [] }` trq := &TimeRangeQuery{Statement: "1234", Extent: Extent{Start: time.Unix(5, 0), @@ -98,15 +127,30 @@ func TestGetBackfillTolerance(t *testing.T) { expected := time.Second * 5 trq := &TimeRangeQuery{Statement: "1234"} - i := trq.GetBackfillTolerance(expected) + i := trq.GetBackfillTolerance(expected, 0) if i != expected { t.Errorf("expected %s got %s", expected, i) } trq.BackfillTolerance = time.Second * 30 - i = trq.GetBackfillTolerance(expected) + i = trq.GetBackfillTolerance(expected, 0) if i == expected { t.Errorf("expected %s got %s", time.Second*30, i) } + trq.Step = 5 * time.Second + trq.BackfillTolerance = 0 + + expected = time.Second * 50 + i = trq.GetBackfillTolerance(time.Second*5, 10) + if i != expected { + t.Errorf("expected %s got %s", expected, i) + } + + trq.BackfillTolerance = -1 + i = trq.GetBackfillTolerance(time.Second*5, 10) + if i != 0 { + t.Errorf("expected %d got %d", 0, i) + } + } diff --git a/pkg/timeseries/timeseries.go b/pkg/timeseries/timeseries.go index 626c572ee..1b3c0aba1 100644 --- a/pkg/timeseries/timeseries.go +++ b/pkg/timeseries/timeseries.go @@ -46,6 +46,12 @@ type Timeseries interface { SetExtents(ExtentList) // Extents should return the list of time Extents having data present in the Timeseries Extents() ExtentList + // VolatileExtents should return the list of time Extents in the cached Timeseries + // that should be re-fetched + VolatileExtents() ExtentList + // SetVolatileExtents sets the list of time Extents in the cached Timeseries + // that should be re-fetched + SetVolatileExtents(ExtentList) // TimeStampCount should return the number of unique timestamps across the timeseries TimestampCount() int64 // Step should return the Step Interval of the Timeseries