Skip to content

Commit

Permalink
2.0 backfill tolerance update (#550)
Browse files Browse the repository at this point in the history
* make backfill tolerance cacheable until next phit

* add a ledger of volatile ranges to dataset for managing freshness

* add points-based options for backfill tolerance
  • Loading branch information
James Ranson authored Mar 12, 2021
1 parent 0a786f9 commit ba6119f
Show file tree
Hide file tree
Showing 18 changed files with 647 additions and 110 deletions.
17 changes: 13 additions & 4 deletions pkg/backends/irondb/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ import (
// SeriesEnvelope values represent a time series data response from the
// IRONdb API.
type SeriesEnvelope struct {
Data DataPoints `json:"data"`
ExtentList timeseries.ExtentList `json:"extents,omitempty"`
StepDuration time.Duration `json:"step,omitempty"`
timeRangeQuery *timeseries.TimeRangeQuery
Data DataPoints `json:"data"`
ExtentList timeseries.ExtentList `json:"extents,omitempty"`
StepDuration time.Duration `json:"step,omitempty"`
timeRangeQuery *timeseries.TimeRangeQuery
VolatileExtentList timeseries.ExtentList `json:"-"`
}

// MarshalJSON encodes a series envelope value into a JSON byte slice.
Expand Down Expand Up @@ -102,6 +103,14 @@ func (se *SeriesEnvelope) UnmarshalJSON(b []byte) error {
return err
}

func (se *SeriesEnvelope) VolatileExtents() timeseries.ExtentList {
return se.VolatileExtentList
}

func (se *SeriesEnvelope) SetVolatileExtents(e timeseries.ExtentList) {
se.VolatileExtentList = e
}

// DataPoint values represent a single data element of a time series data
// response from the IRONdb API.
type DataPoint struct {
Expand Down
11 changes: 11 additions & 0 deletions pkg/backends/irondb/model/model_df4.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type DF4SeriesEnvelope struct {
StepDuration time.Duration `json:"step,omitempty"`
ExtentList timeseries.ExtentList `json:"extents,omitempty"`
timeRangeQuery *timeseries.TimeRangeQuery
// VolatileExtents is the list extents in the dataset that should be refreshed
// on the next request to the Origin
VolatileExtentList timeseries.ExtentList `json:"-"`
}

// DF4Info values contain information about the timestamps of the data elements
Expand Down Expand Up @@ -364,3 +367,11 @@ func (se *DF4SeriesEnvelope) Size() int64 {
wg.Wait()
return c
}

func (se *DF4SeriesEnvelope) VolatileExtents() timeseries.ExtentList {
return se.VolatileExtentList
}

func (se *DF4SeriesEnvelope) SetVolatileExtents(e timeseries.ExtentList) {
se.VolatileExtentList = e
}
2 changes: 2 additions & 0 deletions pkg/backends/options/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const (
DefaultTracingConfigName = "default"
// DefaultBackfillToleranceMS is the default Backfill Tolerance setting for Backends
DefaultBackfillToleranceMS = 0
// DefaultBackfillTolerancePoints is the default Backfill Tolerance setting for Backends
DefaultBackfillTolerancePoints = 0
// DefaultKeepAliveTimeoutMS is the default Keep Alive Timeout for Backends' upstream client pools
DefaultKeepAliveTimeoutMS = 300000
// DefaultMaxIdleConns is the default number of Idle Connections in Backends' upstream client pools
Expand Down
18 changes: 14 additions & 4 deletions pkg/backends/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,14 @@ type Options struct {
// TimeseriesEvictionMethodName specifies which methodology ("oldest", "lru") is used to identify
//timeseries to evict from a full cache object
TimeseriesEvictionMethodName string `yaml:"timeseries_eviction_method,omitempty"`
// BackfillToleranceMS prevents values with timestamps newer than the provided
// number of seconds from being cached this allows propagation of upstream backfill operations
// that modify recently-served data
// BackfillToleranceMS prevents values with timestamps newer than the provided number of
// milliseconds from being cached. this allows propagation of upstream backfill operations
// that modify recently-cached data
BackfillToleranceMS int64 `yaml:"backfill_tolerance_ms,omitempty"`
// BackfillTolerancePoints is similar to the MS version, except that it's final value is dependent
// on the query step value to determine the relative duration of backfill tolerance per-query
// When both are set, the higher of the two values is used
BackfillTolerancePoints int `yaml:"backfill_tolerance_points,omitempty"`
// PathList is a list of Path Options that control the behavior of the given paths when requested
Paths map[string]*po.Options `yaml:"paths,omitempty"`
// NegativeCacheName provides the name of the Negative Cache Config to be used by this Backend
Expand Down Expand Up @@ -188,8 +192,9 @@ type Options struct {
// New will return a pointer to a Backend Options with the default configuration settings
func New() *Options {
return &Options{
BackfillTolerance: DefaultBackfillToleranceMS,
BackfillTolerance: time.Duration(DefaultBackfillToleranceMS) * time.Millisecond,
BackfillToleranceMS: DefaultBackfillToleranceMS,
BackfillTolerancePoints: DefaultBackfillTolerancePoints,
CacheKeyPrefix: "",
CacheName: DefaultBackendCacheName,
CompressableTypeList: DefaultCompressableTypes(),
Expand Down Expand Up @@ -226,6 +231,7 @@ func (o *Options) Clone() *Options {
no.DearticulateUpstreamRanges = o.DearticulateUpstreamRanges
no.BackfillTolerance = o.BackfillTolerance
no.BackfillToleranceMS = o.BackfillToleranceMS
no.BackfillTolerancePoints = o.BackfillTolerancePoints
no.CacheName = o.CacheName
no.CacheKeyPrefix = o.CacheKeyPrefix
no.FastForwardDisable = o.FastForwardDisable
Expand Down Expand Up @@ -552,6 +558,10 @@ func SetDefaults(
no.BackfillToleranceMS = o.BackfillToleranceMS
}

if metadata.IsDefined("backends", name, "backfill_tolerance_points") {
no.BackfillTolerancePoints = o.BackfillTolerancePoints
}

if metadata.IsDefined("backends", name, "paths") {
err := po.SetDefaults(name, metadata, o.Paths, crw)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions pkg/backends/options/options_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ backends:
timeseries_eviction_method: lru
fast_forward_disable: true
backfill_tolerance_ms: 301000
backfill_tolerance_points: 2
timeout_ms: 37000
health_check_endpoint: /test_health
health_check_upstream_path: /test/upstream/endpoint
Expand Down Expand Up @@ -84,6 +85,12 @@ backends:
collapsed_forwarding: basic
response_headers:
X-Header-Test: test-value
prometheus:
labels:
testlabel: trickster
alb:
methodology: rr
pool: [ test ]
tls:
full_chain_cert_path: file.that.should.not.exist.ever.pem
private_key_path: file.that.should.not.exist.ever.pem
Expand Down
88 changes: 56 additions & 32 deletions pkg/proxy/engines/deltaproxycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,11 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request, modeler *tim
pr := newProxyRequest(r, w)
rlo.FastForwardDisable = o.FastForwardDisable || rlo.FastForwardDisable
trq.NormalizeExtent()

// this is used to ensure the head of the cache respects the BackFill Tolerance
bf := timeseries.Extent{Start: time.Unix(0, 0), End: trq.Extent.End}
bt := trq.GetBackfillTolerance(o.BackfillTolerance)

if !trq.IsOffset && bt > 0 { // TODO: research if we need this clause: && !time.Now().Add(-bt).After(bf.End) {
bf.End = bf.End.Add(-bt)
}

now := time.Now()

bt := trq.GetBackfillTolerance(o.BackfillTolerance, o.BackfillTolerancePoints)
bfs := now.Add(-bt).Truncate(trq.Step) // start of the backfill tolerance window

OldestRetainedTimestamp := time.Time{}
if o.TimeseriesEvictionMethod == evictionmethods.EvictionMethodOldest {
OldestRetainedTimestamp = now.Truncate(trq.Step).Add(-(trq.Step * o.TimeseriesRetention))
Expand All @@ -108,13 +102,6 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request, modeler *tim
DoProxy(w, r, true)
return
}
if trq.Extent.Start.After(bf.End) {
tl.Debug(pr.Logger, "timerange is too new to cache due to backfill tolerance",
tl.Pairs{"backFillToleranceSecs": bt,
"newestRetainedTimestamp": bf.End, "queryStart": trq.Extent.Start})
DoProxy(w, r, true)
return
}
}

client.SetExtent(pr.upstreamRequest, trq, &trq.Extent)
Expand Down Expand Up @@ -198,18 +185,6 @@ checkCache:
DoProxy(w, r, true)
return
}
if trq.Extent.Start.After(el[len(el)-1].End) {
pr.cacheLock.RRelease()
tl.Debug(pr.Logger, "timerange not cached due to backfill tolerance",
tl.Pairs{
"backFillToleranceSecs": bt,
"newestRetainedTimestamp": bf.End,
"queryStart": trq.Extent.Start,
},
)
DoProxy(w, r, true)
return
}
}
}
cacheStatus = status.LookupStatusPartialHit
Expand All @@ -218,9 +193,21 @@ checkCache:
}

// Find the ranges that we want, but which are not currently cached
var missRanges timeseries.ExtentList
var missRanges, cvr timeseries.ExtentList
vr := cts.VolatileExtents()
if cacheStatus == status.LookupStatusPartialHit {
missRanges = cts.Extents().CalculateDeltas(trq.Extent, trq.Step)
// this is the backfill part of backfill tolerance. if there are any volatile
// ranges in the timeseries, this determines if any fall within the client's
// requested range and ensures they are re-requested. this only happens if
// the request is already a phit
if bt > 0 && len(missRanges) > 0 && len(vr) > 0 {
// this checks the timeseries's volatile ranges for any overlap with
// the request extent, and adds those to the missRanges to refresh
if cvr = vr.Crop(trq.Extent); len(cvr) > 0 {
missRanges = append(missRanges, cvr...).Compress(trq.Step)
}
}
}

if len(missRanges) == 0 && cacheStatus == status.LookupStatusPartialHit {
Expand All @@ -244,7 +231,7 @@ checkCache:
// in this case, it's not a cache hit, so something is _likely_ going to be cached now.
// we write lock here, so as to prevent other concurrent client requests for the same url,
// which will have the same cacheStatus, from causing the same or similar HTTP requests
// to be made against the origin, since just one should doc.
// to be made against the origin, since just one should do.

// acquire a write lock via the Upgrade method, which will swap the read lock for a
// write lock, and return true if this client was the only one, or otherwise the first
Expand Down Expand Up @@ -383,6 +370,43 @@ checkCache:
cts.Merge(true, mts...)
}

// this handles the tolerance part of backfill tolerance, by adding new tolerable ranges to
// the timeseries's volatile list, and removing those that no longer tolerate backfill
if bt > 0 && cacheStatus != status.LookupStatusHit {

var shouldCompress bool
ve := cts.VolatileExtents()

// first, remove those that are now too old to tolerate backfill.
if len(cvr) > 0 {
// this updates the timeseries's volatile list to remove anything just fetched that is
// older than the current backfill tolerance timestamp; so it is now immutable in cache
ve = ve.Remove(cvr, trq.Step)
shouldCompress = true
}

// now add in any new time ranges that should tolerate backfill
var adds timeseries.Extent
if trq.Extent.End.After(bfs) {
adds.End = trq.Extent.End
if trq.Extent.Start.Before(bfs) {
adds.Start = bfs
} else {
adds.Start = trq.Extent.Start
}
}
if !adds.End.IsZero() {
ve = append(ve, adds)
shouldCompress = true
}

// if any changes happened to the volatile list, set it in the cached timeseries
if shouldCompress {
cts.SetVolatileExtents(ve.Compress(trq.Step))
}

}

// cts is the cacheable time series, rts is the user's response timeseries
var rts timeseries.Timeseries
if cacheStatus != status.LookupStatusKeyMiss {
Expand All @@ -399,9 +423,9 @@ checkCache:
// Backfill Tolerance before storing to cache
switch o.TimeseriesEvictionMethod {
case evictionmethods.EvictionMethodLRU:
cts.CropToSize(o.TimeseriesRetentionFactor, bf.End, trq.Extent)
cts.CropToSize(o.TimeseriesRetentionFactor, now, trq.Extent)
default:
cts.CropToRange(timeseries.Extent{End: bf.End, Start: OldestRetainedTimestamp})
cts.CropToRange(timeseries.Extent{End: now, Start: OldestRetainedTimestamp})
}
// Don't cache datasets with empty extents
// (everything was cropped so there is nothing to cache)
Expand Down
66 changes: 1 addition & 65 deletions pkg/proxy/engines/deltaproxycache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,64 +154,6 @@ func TestDeltaProxyCacheRequestMissThenHit(t *testing.T) {
}
}

func TestDeltaProxyCacheRequestAllItemsTooNew(t *testing.T) {

ts, w, r, rsc, err := setupTestHarnessDPC()
if err != nil {
t.Error(err)
}
defer ts.Close()

client := rsc.BackendClient.(*TestClient)
o := rsc.BackendOptions
rsc.CacheConfig.Provider = "test"

o.FastForwardDisable = true
o.BackfillToleranceMS = 600000
o.BackfillTolerance = time.Millisecond * time.Duration(o.BackfillToleranceMS)

step := time.Duration(300) * time.Second
end := time.Now()

extr := timeseries.Extent{Start: end.Add(-time.Duration(5) * time.Minute), End: end}

expected, _, _ := mockprom.GetTimeSeriesData(queryReturnsOKNoLatency, extr.Start, extr.End, step)

u := r.URL
u.Path = "/prometheus/api/v1/query_range"
u.RawQuery = fmt.Sprintf("step=%d&start=%d&end=%d&query=%s",
int(step.Seconds()), extr.Start.Unix(), extr.End.Unix(), queryReturnsOKNoLatency)

client.QueryRangeHandler(w, r)
resp := w.Result()

bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
t.Error(err)
}

err = testStringMatch(string(bodyBytes), expected)
if err != nil {
t.Error(err)
}

err = testStatusCodeMatch(resp.StatusCode, http.StatusOK)
if err != nil {
t.Error(err)
}

if resp.Header.Get("status") != "" {
t.Errorf("status header should not be present. Found with value %s", resp.Header.Get("stattus"))
}

// ensure the request was sent through the proxy instead of the DeltaProxyCache
err = testResultHeaderPartMatch(resp.Header, map[string]string{"engine": "HTTPProxy"})
if err != nil {
t.Error(err)
}

}

func TestDeltaProxyCacheRequestRemoveStale(t *testing.T) {

ts, w, r, rsc, err := setupTestHarnessDPC()
Expand Down Expand Up @@ -1465,7 +1407,6 @@ func TestDeltaProxyCacheRequest_BackfillTolerance(t *testing.T) {
xn := timeseries.Extent{Start: now.Add(-time.Duration(6) * time.Hour).Truncate(step), End: now.Truncate(step)}

// We can predict what slice will need to be fetched and ensure that is only what is requested upstream
expectedFetched := fmt.Sprintf("[%s]", timeseries.Extent{Start: xn.End, End: xn.End})
expected, _, _ := mockprom.GetTimeSeriesData(query, xn.Start, xn.End, step)

u := r.URL
Expand Down Expand Up @@ -1519,12 +1460,7 @@ func TestDeltaProxyCacheRequest_BackfillTolerance(t *testing.T) {
t.Error(err)
}

err = testResultHeaderPartMatch(resp.Header, map[string]string{"status": "phit"})
if err != nil {
t.Error(err)
}

err = testResultHeaderPartMatch(resp.Header, map[string]string{"fetched": expectedFetched})
err = testResultHeaderPartMatch(resp.Header, map[string]string{"status": "hit"})
if err != nil {
t.Error(err)
}
Expand Down
Loading

0 comments on commit ba6119f

Please sign in to comment.