Skip to content

Commit

Permalink
Feature: allow checks to run less often
Browse files Browse the repository at this point in the history
This change allows a check to run less often by storing the produced
metrics and pushing the last set that was produced by the scraper once
every 2 minutes.

Signed-off-by: Marcelo E. Magallon <[email protected]>
  • Loading branch information
mem committed Feb 28, 2024
1 parent 58458db commit 8cdacf0
Show file tree
Hide file tree
Showing 3 changed files with 250 additions and 72 deletions.
194 changes: 133 additions & 61 deletions internal/scraper/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const (
CheckInfoMetricName = "sm_check_info"
CheckInfoSource = "synthetic-monitoring-agent"
maxLabelValueLength = 2048 // this is the default value in Prometheus
maxPublishInterval = 2 * time.Minute
minPublishGap = 10 * time.Second
)

var (
Expand Down Expand Up @@ -220,74 +222,113 @@ func (s *Scraper) Run(ctx context.Context) {
// TODO(mem): keep count of the number of successive errors and
// collect logs if threshold is reached.

var sm checkStateMachine
var (
frequency = ms(s.check.Frequency)
offset = ms(s.check.Offset)
)

// need to keep the most recently published payload for clean up
var payload *probeData
if offset == 0 {
offset = randDuration(min(frequency, maxPublishInterval))
}

scrape := func(ctx context.Context, t time.Time) {
s.scrapeCounter.Inc()
scrapeHandler := scrapeHandler{scraper: s}

var err error
payload, err = s.collectData(ctx, t)
tickWithOffset(
ctx,
s.stop,
scrapeHandler.scrape, scrapeHandler.republish, scrapeHandler.cleanup,
frequency, offset,
maxPublishInterval, minPublishGap,
)

switch {
case errors.Is(err, errCheckFailed):
s.errorCounter.WithLabelValues("check").Inc()
sm.fail(func() {
s.logger.Info().Msg("check entered FAIL state")
})

case err != nil:
s.errorCounter.WithLabelValues("collector").Inc()
s.logger.Error().Err(err).Msg("error collecting data")
return
s.cancel()

default:
sm.pass(func() {
s.logger.Info().Msg("check entered PASS state")
})
}
s.logger.Info().Msg("scraper stopped")
}

if payload != nil {
s.publisher.Publish(payload)
}
type scrapeHandler struct {
scraper *Scraper
payload *probeData // needed to keep the most recently published payload for republication and clean up
sm checkStateMachine
}

func (h *scrapeHandler) scrape(ctx context.Context, t time.Time) {
h.scraper.scrapeCounter.Inc()

var err error
h.payload, err = h.scraper.collectData(ctx, t)

switch {
case errors.Is(err, errCheckFailed):
h.scraper.errorCounter.WithLabelValues("check").Inc()
h.sm.fail(func() {
h.scraper.logger.Info().Msg("check entered FAIL state")
})

case err != nil:
h.scraper.errorCounter.WithLabelValues("collector").Inc()
h.scraper.logger.Error().Err(err).Msg("error collecting data")
return

default:
h.sm.pass(func() {
h.scraper.logger.Info().Msg("check entered PASS state")
})
}

cleanup := func(ctx context.Context, t time.Time) {
if payload == nil {
return
}
if h.payload != nil {
h.scraper.publisher.Publish(h.payload)
}
}

staleSample := prompb.Sample{
Timestamp: t.UnixNano()/1e6 + 1, // ms
Value: staleMarker,
}
func (h *scrapeHandler) republish(ctx context.Context, t time.Time) {
if h.payload == nil {
return
}

for i := range payload.ts {
ts := &payload.ts[i]
for j := range ts.Samples {
ts.Samples[j] = staleSample
}
h.payload.streams = nil // do not republish logs

now := t.UnixMilli()
for i := range h.payload.ts {
ts := &h.payload.ts[i]
for j := range ts.Samples {
ts.Samples[j].Timestamp = now
}
}

payload.streams = nil
h.scraper.publisher.Publish(h.payload)
}

s.publisher.Publish(payload)
func (h *scrapeHandler) cleanup(ctx context.Context, t time.Time) {
if h.payload == nil {
return
}

payload = nil
staleSample := prompb.Sample{
Timestamp: t.UnixNano()/1e6 + 1, // ms
Value: staleMarker,
}

offset := s.check.Offset
if offset == 0 {
offset = rand.Int63n(s.check.Frequency)
for i := range h.payload.ts {
ts := &h.payload.ts[i]
for j := range ts.Samples {
ts.Samples[j] = staleSample
}
}

tickWithOffset(ctx, s.stop, scrape, cleanup, offset, s.check.Frequency)
h.payload.streams = nil

s.cancel()
h.scraper.publisher.Publish(h.payload)

s.logger.Info().Msg("scraper stopped")
h.payload = nil
}

func ms(n int64) time.Duration {
return time.Duration(n) * time.Millisecond
}

func randDuration(d time.Duration) time.Duration {
return time.Duration(rand.Int63n(int64(d)))
}

func (s *Scraper) Stop() {
Expand All @@ -307,40 +348,56 @@ func (s Scraper) LastModified() float64 {
return s.check.Modified
}

func tickWithOffset(ctx context.Context, stop <-chan struct{}, f func(context.Context, time.Time), cleanup func(context.Context, time.Time), offset, period int64) {
timer := time.NewTimer(time.Duration(offset) * time.Millisecond)
func tickWithOffset(
ctx context.Context,
stop <-chan struct{},
work, idle, cleanup func(context.Context, time.Time),
period, offset, maxIdle, minGap time.Duration) {
// wait for up to offset duration, paying attention to cancellation signals.
offsetTimer := time.NewTimer(offset)

var lastTick time.Time

select {
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
if !offsetTimer.Stop() {
<-offsetTimer.C
}
return

case <-stop:
if !timer.Stop() {
<-timer.C
if !offsetTimer.Stop() {
<-offsetTimer.C
}
// we haven't done anything yet, no clean up
return

case t := <-timer.C:
case t := <-offsetTimer.C:
lastTick = t
f(ctx, t)
work(ctx, t)
}

// create a ticker that won't fire, and replace it with a running
// ticker in case we need it.
inactivityTicker := &time.Ticker{}

if period > maxIdle {
inactivityTicker = time.NewTicker(maxIdle)
}

ticker := time.NewTicker(time.Duration(period) * time.Millisecond)
workTicker := time.NewTicker(period)

for {
select {
case <-ctx.Done():
ticker.Stop()
workTicker.Stop()
inactivityTicker.Stop()
// no clean up if the context is cancelled.
return

case <-stop:
ticker.Stop()
workTicker.Stop()
inactivityTicker.Stop()
// if we are here, we already pushed something
// at least once, lastTick cannot be zero, but
// just in case...
Expand All @@ -349,9 +406,24 @@ func tickWithOffset(ctx context.Context, stop <-chan struct{}, f func(context.Co
}
return

case t := <-ticker.C:
case t := <-inactivityTicker.C:
// if the amount of time since the last run is greater
// than minGap and the amount of time left until the
// next run is larger than minGap, then run the idle
// function.
//
// p----I----I----Ip---I----I----I-p
// ^ ^ ^
// | | +-- don't run here because it's too close to the next run
// | +-- run here
// +-- don't run here because it's too close to the last run
if t.Sub(lastTick) >= minGap && lastTick.Add(period).Sub(t) >= minGap {
idle(ctx, t)
}

case t := <-workTicker.C:
lastTick = t
f(ctx, t)
work(ctx, t)
}
}
}
Expand Down
98 changes: 98 additions & 0 deletions internal/scraper/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1622,3 +1622,101 @@ func TestScraperRun(t *testing.T) {
require.True(t, found)
require.Equal(t, testProber.failureCount, checkErrCounter.(*testCounter).count.Load())
}

func TestTickWithOffset(t *testing.T) {
const (
WORK = 1
IDLE = 2
CLEANUP = 3
)

testcases := map[string]struct {
timeout time.Duration
period time.Duration
offset time.Duration
maxIdle time.Duration
minGap time.Duration
expected []int
}{
"An idle worker running between regular runs": {
timeout: 1050 * time.Millisecond,
period: 500 * time.Millisecond,
offset: 1,
maxIdle: 100 * time.Millisecond,
minGap: 50 * time.Millisecond,
expected: []int{
WORK, // 0
IDLE, // 100
IDLE, // 200
IDLE, // 300
IDLE, // 400
WORK, // 500
IDLE, // 600
IDLE, // 700
IDLE, // 800
IDLE, // 900
WORK, // 1000
CLEANUP},
},
"An idle worker trying to run within gap duration of regular runs.": {
timeout: 1050 * time.Millisecond,
period: 500 * time.Millisecond,
offset: 1,
maxIdle: 100 * time.Millisecond,
minGap: 150 * time.Millisecond,
expected: []int{
WORK, // 0
IDLE, // 200, there's no idle at 100 because it's too close to the previous run
IDLE, // 300
WORK, // 500, there's no idle at 400 because it's too close to the next run
IDLE, // 700, there's no idle at 600 because it's too close to the previous run
IDLE, // 800
WORK, // 1000, there's no idle at 900 because it's too close to the next run
CLEANUP},
},
"A zero offset and a scraper that has already been cancelled.": {
timeout: 0, // this asks for immediate cancellation
period: 500 * time.Millisecond,
offset: 0,
maxIdle: 100 * time.Millisecond,
minGap: 150 * time.Millisecond,
expected: nil,
},
}

for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
ctx, cancel := testhelper.Context(context.Background(), t)
t.Cleanup(cancel)

// Append a particular marker to the results slice
// depending on which function was called by
// TestTickWithOffset. This allows us to test that the
// correct function is being called in the correct
// order.
var results []int

work := func(context.Context, time.Time) { results = append(results, WORK) }
idle := func(context.Context, time.Time) { results = append(results, IDLE) }
cleanup := func(context.Context, time.Time) { results = append(results, CLEANUP) }

stop := make(chan struct{})

if tc.timeout > 0 {
go func() {
time.Sleep(1050 * time.Millisecond)
close(stop)
}()
} else {
// why not in the goroutine? Because we need to
// make sure the channel is closed *before*
// calling tickWithOffset
close(stop)
}

tickWithOffset(ctx, stop, work, idle, cleanup, tc.period, tc.offset, tc.maxIdle, tc.minGap)

require.Equal(t, tc.expected, results)
})
}
}
Loading

0 comments on commit 8cdacf0

Please sign in to comment.