Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.0 backfill tolerance update #550

Merged
merged 9 commits into from
Mar 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions pkg/backends/irondb/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ import (
// SeriesEnvelope values represent a time series data response from the
// IRONdb API.
type SeriesEnvelope struct {
Data DataPoints `json:"data"`
ExtentList timeseries.ExtentList `json:"extents,omitempty"`
StepDuration time.Duration `json:"step,omitempty"`
timeRangeQuery *timeseries.TimeRangeQuery
Data DataPoints `json:"data"`
ExtentList timeseries.ExtentList `json:"extents,omitempty"`
StepDuration time.Duration `json:"step,omitempty"`
timeRangeQuery *timeseries.TimeRangeQuery
VolatileExtentList timeseries.ExtentList `json:"-"`
}

// MarshalJSON encodes a series envelope value into a JSON byte slice.
Expand Down Expand Up @@ -102,6 +103,14 @@ func (se *SeriesEnvelope) UnmarshalJSON(b []byte) error {
return err
}

func (se *SeriesEnvelope) VolatileExtents() timeseries.ExtentList {
return se.VolatileExtentList
}

func (se *SeriesEnvelope) SetVolatileExtents(e timeseries.ExtentList) {
se.VolatileExtentList = e
}

// DataPoint values represent a single data element of a time series data
// response from the IRONdb API.
type DataPoint struct {
Expand Down
11 changes: 11 additions & 0 deletions pkg/backends/irondb/model/model_df4.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type DF4SeriesEnvelope struct {
StepDuration time.Duration `json:"step,omitempty"`
ExtentList timeseries.ExtentList `json:"extents,omitempty"`
timeRangeQuery *timeseries.TimeRangeQuery
// VolatileExtents is the list extents in the dataset that should be refreshed
// on the next request to the Origin
VolatileExtentList timeseries.ExtentList `json:"-"`
}

// DF4Info values contain information about the timestamps of the data elements
Expand Down Expand Up @@ -364,3 +367,11 @@ func (se *DF4SeriesEnvelope) Size() int64 {
wg.Wait()
return c
}

func (se *DF4SeriesEnvelope) VolatileExtents() timeseries.ExtentList {
return se.VolatileExtentList
}

func (se *DF4SeriesEnvelope) SetVolatileExtents(e timeseries.ExtentList) {
se.VolatileExtentList = e
}
2 changes: 2 additions & 0 deletions pkg/backends/options/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const (
DefaultTracingConfigName = "default"
// DefaultBackfillToleranceMS is the default Backfill Tolerance setting for Backends
DefaultBackfillToleranceMS = 0
// DefaultBackfillTolerancePoints is the default Backfill Tolerance setting for Backends
DefaultBackfillTolerancePoints = 0
// DefaultKeepAliveTimeoutMS is the default Keep Alive Timeout for Backends' upstream client pools
DefaultKeepAliveTimeoutMS = 300000
// DefaultMaxIdleConns is the default number of Idle Connections in Backends' upstream client pools
Expand Down
18 changes: 14 additions & 4 deletions pkg/backends/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,14 @@ type Options struct {
// TimeseriesEvictionMethodName specifies which methodology ("oldest", "lru") is used to identify
//timeseries to evict from a full cache object
TimeseriesEvictionMethodName string `yaml:"timeseries_eviction_method,omitempty"`
// BackfillToleranceMS prevents values with timestamps newer than the provided
// number of seconds from being cached this allows propagation of upstream backfill operations
// that modify recently-served data
// BackfillToleranceMS prevents values with timestamps newer than the provided number of
// milliseconds from being cached. this allows propagation of upstream backfill operations
// that modify recently-cached data
BackfillToleranceMS int64 `yaml:"backfill_tolerance_ms,omitempty"`
// BackfillTolerancePoints is similar to the MS version, except that it's final value is dependent
// on the query step value to determine the relative duration of backfill tolerance per-query
// When both are set, the higher of the two values is used
BackfillTolerancePoints int `yaml:"backfill_tolerance_points,omitempty"`
// PathList is a list of Path Options that control the behavior of the given paths when requested
Paths map[string]*po.Options `yaml:"paths,omitempty"`
// NegativeCacheName provides the name of the Negative Cache Config to be used by this Backend
Expand Down Expand Up @@ -188,8 +192,9 @@ type Options struct {
// New will return a pointer to a Backend Options with the default configuration settings
func New() *Options {
return &Options{
BackfillTolerance: DefaultBackfillToleranceMS,
BackfillTolerance: time.Duration(DefaultBackfillToleranceMS) * time.Millisecond,
BackfillToleranceMS: DefaultBackfillToleranceMS,
BackfillTolerancePoints: DefaultBackfillTolerancePoints,
CacheKeyPrefix: "",
CacheName: DefaultBackendCacheName,
CompressableTypeList: DefaultCompressableTypes(),
Expand Down Expand Up @@ -226,6 +231,7 @@ func (o *Options) Clone() *Options {
no.DearticulateUpstreamRanges = o.DearticulateUpstreamRanges
no.BackfillTolerance = o.BackfillTolerance
no.BackfillToleranceMS = o.BackfillToleranceMS
no.BackfillTolerancePoints = o.BackfillTolerancePoints
no.CacheName = o.CacheName
no.CacheKeyPrefix = o.CacheKeyPrefix
no.FastForwardDisable = o.FastForwardDisable
Expand Down Expand Up @@ -552,6 +558,10 @@ func SetDefaults(
no.BackfillToleranceMS = o.BackfillToleranceMS
}

if metadata.IsDefined("backends", name, "backfill_tolerance_points") {
no.BackfillTolerancePoints = o.BackfillTolerancePoints
}

if metadata.IsDefined("backends", name, "paths") {
err := po.SetDefaults(name, metadata, o.Paths, crw)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions pkg/backends/options/options_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ backends:
timeseries_eviction_method: lru
fast_forward_disable: true
backfill_tolerance_ms: 301000
backfill_tolerance_points: 2
timeout_ms: 37000
health_check_endpoint: /test_health
health_check_upstream_path: /test/upstream/endpoint
Expand Down Expand Up @@ -84,6 +85,12 @@ backends:
collapsed_forwarding: basic
response_headers:
X-Header-Test: test-value
prometheus:
labels:
testlabel: trickster
alb:
methodology: rr
pool: [ test ]
tls:
full_chain_cert_path: file.that.should.not.exist.ever.pem
private_key_path: file.that.should.not.exist.ever.pem
Expand Down
88 changes: 56 additions & 32 deletions pkg/proxy/engines/deltaproxycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,11 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request, modeler *tim
pr := newProxyRequest(r, w)
rlo.FastForwardDisable = o.FastForwardDisable || rlo.FastForwardDisable
trq.NormalizeExtent()

// this is used to ensure the head of the cache respects the BackFill Tolerance
bf := timeseries.Extent{Start: time.Unix(0, 0), End: trq.Extent.End}
bt := trq.GetBackfillTolerance(o.BackfillTolerance)

if !trq.IsOffset && bt > 0 { // TODO: research if we need this clause: && !time.Now().Add(-bt).After(bf.End) {
bf.End = bf.End.Add(-bt)
}

now := time.Now()

bt := trq.GetBackfillTolerance(o.BackfillTolerance, o.BackfillTolerancePoints)
bfs := now.Add(-bt).Truncate(trq.Step) // start of the backfill tolerance window

OldestRetainedTimestamp := time.Time{}
if o.TimeseriesEvictionMethod == evictionmethods.EvictionMethodOldest {
OldestRetainedTimestamp = now.Truncate(trq.Step).Add(-(trq.Step * o.TimeseriesRetention))
Expand All @@ -108,13 +102,6 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request, modeler *tim
DoProxy(w, r, true)
return
}
if trq.Extent.Start.After(bf.End) {
tl.Debug(pr.Logger, "timerange is too new to cache due to backfill tolerance",
tl.Pairs{"backFillToleranceSecs": bt,
"newestRetainedTimestamp": bf.End, "queryStart": trq.Extent.Start})
DoProxy(w, r, true)
return
}
}

client.SetExtent(pr.upstreamRequest, trq, &trq.Extent)
Expand Down Expand Up @@ -198,18 +185,6 @@ checkCache:
DoProxy(w, r, true)
return
}
if trq.Extent.Start.After(el[len(el)-1].End) {
pr.cacheLock.RRelease()
tl.Debug(pr.Logger, "timerange not cached due to backfill tolerance",
tl.Pairs{
"backFillToleranceSecs": bt,
"newestRetainedTimestamp": bf.End,
"queryStart": trq.Extent.Start,
},
)
DoProxy(w, r, true)
return
}
}
}
cacheStatus = status.LookupStatusPartialHit
Expand All @@ -218,9 +193,21 @@ checkCache:
}

// Find the ranges that we want, but which are not currently cached
var missRanges timeseries.ExtentList
var missRanges, cvr timeseries.ExtentList
vr := cts.VolatileExtents()
if cacheStatus == status.LookupStatusPartialHit {
missRanges = cts.Extents().CalculateDeltas(trq.Extent, trq.Step)
// this is the backfill part of backfill tolerance. if there are any volatile
// ranges in the timeseries, this determines if any fall within the client's
// requested range and ensures they are re-requested. this only happens if
// the request is already a phit
if bt > 0 && len(missRanges) > 0 && len(vr) > 0 {
// this checks the timeseries's volatile ranges for any overlap with
// the request extent, and adds those to the missRanges to refresh
if cvr = vr.Crop(trq.Extent); len(cvr) > 0 {
missRanges = append(missRanges, cvr...).Compress(trq.Step)
}
}
}

if len(missRanges) == 0 && cacheStatus == status.LookupStatusPartialHit {
Expand All @@ -244,7 +231,7 @@ checkCache:
// in this case, it's not a cache hit, so something is _likely_ going to be cached now.
// we write lock here, so as to prevent other concurrent client requests for the same url,
// which will have the same cacheStatus, from causing the same or similar HTTP requests
// to be made against the origin, since just one should doc.
// to be made against the origin, since just one should do.

// acquire a write lock via the Upgrade method, which will swap the read lock for a
// write lock, and return true if this client was the only one, or otherwise the first
Expand Down Expand Up @@ -383,6 +370,43 @@ checkCache:
cts.Merge(true, mts...)
}

// this handles the tolerance part of backfill tolerance, by adding new tolerable ranges to
// the timeseries's volatile list, and removing those that no longer tolerate backfill
if bt > 0 && cacheStatus != status.LookupStatusHit {

var shouldCompress bool
ve := cts.VolatileExtents()

// first, remove those that are now too old to tolerate backfill.
if len(cvr) > 0 {
// this updates the timeseries's volatile list to remove anything just fetched that is
// older than the current backfill tolerance timestamp; so it is now immutable in cache
ve = ve.Remove(cvr, trq.Step)
shouldCompress = true
}

// now add in any new time ranges that should tolerate backfill
var adds timeseries.Extent
if trq.Extent.End.After(bfs) {
adds.End = trq.Extent.End
if trq.Extent.Start.Before(bfs) {
adds.Start = bfs
} else {
adds.Start = trq.Extent.Start
}
}
if !adds.End.IsZero() {
ve = append(ve, adds)
shouldCompress = true
}

// if any changes happened to the volatile list, set it in the cached timeseries
if shouldCompress {
cts.SetVolatileExtents(ve.Compress(trq.Step))
}

}

// cts is the cacheable time series, rts is the user's response timeseries
var rts timeseries.Timeseries
if cacheStatus != status.LookupStatusKeyMiss {
Expand All @@ -399,9 +423,9 @@ checkCache:
// Backfill Tolerance before storing to cache
switch o.TimeseriesEvictionMethod {
case evictionmethods.EvictionMethodLRU:
cts.CropToSize(o.TimeseriesRetentionFactor, bf.End, trq.Extent)
cts.CropToSize(o.TimeseriesRetentionFactor, now, trq.Extent)
default:
cts.CropToRange(timeseries.Extent{End: bf.End, Start: OldestRetainedTimestamp})
cts.CropToRange(timeseries.Extent{End: now, Start: OldestRetainedTimestamp})
}
// Don't cache datasets with empty extents
// (everything was cropped so there is nothing to cache)
Expand Down
66 changes: 1 addition & 65 deletions pkg/proxy/engines/deltaproxycache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,64 +154,6 @@ func TestDeltaProxyCacheRequestMissThenHit(t *testing.T) {
}
}

func TestDeltaProxyCacheRequestAllItemsTooNew(t *testing.T) {

ts, w, r, rsc, err := setupTestHarnessDPC()
if err != nil {
t.Error(err)
}
defer ts.Close()

client := rsc.BackendClient.(*TestClient)
o := rsc.BackendOptions
rsc.CacheConfig.Provider = "test"

o.FastForwardDisable = true
o.BackfillToleranceMS = 600000
o.BackfillTolerance = time.Millisecond * time.Duration(o.BackfillToleranceMS)

step := time.Duration(300) * time.Second
end := time.Now()

extr := timeseries.Extent{Start: end.Add(-time.Duration(5) * time.Minute), End: end}

expected, _, _ := mockprom.GetTimeSeriesData(queryReturnsOKNoLatency, extr.Start, extr.End, step)

u := r.URL
u.Path = "/prometheus/api/v1/query_range"
u.RawQuery = fmt.Sprintf("step=%d&start=%d&end=%d&query=%s",
int(step.Seconds()), extr.Start.Unix(), extr.End.Unix(), queryReturnsOKNoLatency)

client.QueryRangeHandler(w, r)
resp := w.Result()

bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
t.Error(err)
}

err = testStringMatch(string(bodyBytes), expected)
if err != nil {
t.Error(err)
}

err = testStatusCodeMatch(resp.StatusCode, http.StatusOK)
if err != nil {
t.Error(err)
}

if resp.Header.Get("status") != "" {
t.Errorf("status header should not be present. Found with value %s", resp.Header.Get("stattus"))
}

// ensure the request was sent through the proxy instead of the DeltaProxyCache
err = testResultHeaderPartMatch(resp.Header, map[string]string{"engine": "HTTPProxy"})
if err != nil {
t.Error(err)
}

}

func TestDeltaProxyCacheRequestRemoveStale(t *testing.T) {

ts, w, r, rsc, err := setupTestHarnessDPC()
Expand Down Expand Up @@ -1465,7 +1407,6 @@ func TestDeltaProxyCacheRequest_BackfillTolerance(t *testing.T) {
xn := timeseries.Extent{Start: now.Add(-time.Duration(6) * time.Hour).Truncate(step), End: now.Truncate(step)}

// We can predict what slice will need to be fetched and ensure that is only what is requested upstream
expectedFetched := fmt.Sprintf("[%s]", timeseries.Extent{Start: xn.End, End: xn.End})
expected, _, _ := mockprom.GetTimeSeriesData(query, xn.Start, xn.End, step)

u := r.URL
Expand Down Expand Up @@ -1519,12 +1460,7 @@ func TestDeltaProxyCacheRequest_BackfillTolerance(t *testing.T) {
t.Error(err)
}

err = testResultHeaderPartMatch(resp.Header, map[string]string{"status": "phit"})
if err != nil {
t.Error(err)
}

err = testResultHeaderPartMatch(resp.Header, map[string]string{"fetched": expectedFetched})
err = testResultHeaderPartMatch(resp.Header, map[string]string{"status": "hit"})
if err != nil {
t.Error(err)
}
Expand Down
Loading