Skip to content

Commit

Permalink
Remove custom backoff logic for sending segments (#186)
Browse files Browse the repository at this point in the history
* remove custom backoff logic

* remove commented out code
  • Loading branch information
srprash authored Jan 27, 2023
1 parent aa8ffe7 commit dc5f98d
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 271 deletions.
47 changes: 0 additions & 47 deletions pkg/processor/batchprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/aws/aws-xray-daemon/pkg/conn"
"github.com/aws/aws-xray-daemon/pkg/telemetry"
"github.com/aws/aws-xray-daemon/pkg/util/timer"
"math"
"math/rand"
"regexp"
"time"
Expand All @@ -22,12 +21,6 @@ import (
log "github.com/cihub/seelog"
)

const (
backoffCapSeconds = 30
backoffMinAttempts = 10
backoffBaseSeconds = 1
)

var /* const */ segIdRegexp = regexp.MustCompile(`\"id\":\"(.*?)\"`)
var /* const */ traceIdRegexp = regexp.MustCompile(`\"trace_id\":\"(.*?)\"`)

Expand Down Expand Up @@ -68,7 +61,6 @@ func (s *segmentsBatch) send(batch []*string) {
}

func (s *segmentsBatch) poll() {
failedAttempt := 0
for {
batch, ok := <-s.batches
if ok {
Expand All @@ -80,16 +72,9 @@ func (s *segmentsBatch) poll() {
r, err := s.xRay.PutTraceSegments(params)
if err != nil {
telemetry.EvaluateConnectionError(err)
failedAttempt++
backOffSeconds := s.backOff(failedAttempt)
log.Errorf("Sending segment batch failed with: %v", err)
log.Warnf("Delaying sending of additional batches by %v seconds", backOffSeconds)
if backOffSeconds > 0 {
<-s.timer.After(time.Second * time.Duration(backOffSeconds))
}
continue
} else {
failedAttempt = 0
telemetry.T.SegmentSent(int64(len(batch)))
}
elapsed := time.Since(start)
Expand Down Expand Up @@ -136,35 +121,3 @@ func (s *segmentsBatch) poll() {
func (s *segmentsBatch) close() {
close(s.batches)
}

func min(x, y int32) int32 {
if x < y {
return x
}
return y
}

// Returns int32 number for Full Jitter Base
// If the computation result in value greater than Max Int31 it returns MAX Int31 value
func getValidJitterBase(backoffBase, attempt int) int32 {
base := float64(backoffBase) * math.Pow(2, float64(attempt))
var baseInt int32
if base > float64(math.MaxInt32/2) {
baseInt = math.MaxInt32 / 2
} else {
baseInt = int32(base)
}
return baseInt
}

func (s *segmentsBatch) backOff(attempt int) int32 {
if attempt <= backoffMinAttempts {
return 0
}
// Attempts to be considered for Jitter Backoff
backoffAttempts := attempt - backoffMinAttempts
// As per Full Jitter described in https://www.awsarchitectureblog.com/2015/03/backoff.html
base := getValidJitterBase(backoffBaseSeconds, backoffAttempts)
randomBackoff := s.randGen.Int31n(base)
return min(backoffCapSeconds, randomBackoff)
}
224 changes: 0 additions & 224 deletions pkg/processor/batchprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ package processor
import (
"errors"
"fmt"
"math/rand"
"strings"
"testing"
"time"

"github.com/aws/aws-sdk-go/service/xray"
"github.com/aws/aws-xray-daemon/pkg/util/test"
Expand Down Expand Up @@ -112,157 +110,6 @@ func TestPollSendSuccess(t *testing.T) {
assert.True(t, strings.Contains(log.Logs[1], doneMsg))
}

func TestPollSendFailedOnceMoreThanMin(t *testing.T) {
seed := int64(122321)
randGen := rand.New(rand.NewSource(seed))
timer := test.MockTimerClient{}
log := test.LogSetup()
xRay := new(MockXRayClient)
xRay.On("PutTraceSegments", nil).Return("Error")
s := segmentsBatch{
batches: make(chan []*string, 1),
xRay: xRay,
done: make(chan bool),
randGen: rand.New(rand.NewSource(seed)),
timer: &timer,
}
testMessage := "Test Message"
batch := []*string{&testMessage}
// First failure
backoff := randGen.Int31n(backoffBaseSeconds * 2)

go s.poll()
for i := 0; i < backoffMinAttempts; i++ {
s.send(batch)
timer.Advance(time.Second)
time.Sleep(time.Millisecond)
}
s.send(batch)
close(s.batches)

time.Sleep(time.Millisecond)
timer.Advance(time.Second * time.Duration(backoff))

assert.EqualValues(t, xRay.CallNoToPutTraceSegments, backoffMinAttempts+1)
// Backed off only once after min failed attempts are exhausted
assert.EqualValues(t, 1, timer.AfterCalledTimes())

<-s.done

assert.True(t, strings.Contains(log.Logs[len(log.Logs)-1], doneMsg))
}

func TestPollSendFailedTwiceMoreThanMin(t *testing.T) {
seed := int64(122321)
randGen := rand.New(rand.NewSource(seed))
timer := test.MockTimerClient{}
log := test.LogSetup()
xRay := new(MockXRayClient)
xRay.On("PutTraceSegments", nil).Return("Error")
s := segmentsBatch{
batches: make(chan []*string, 1),
xRay: xRay,
done: make(chan bool),
randGen: rand.New(rand.NewSource(seed)),
timer: &timer,
}
testMessage := "Test Message"
batch := []*string{&testMessage}
// First failure
backoff := randGen.Int31n(backoffBaseSeconds * 2)

go s.poll()
for i := 0; i < backoffMinAttempts; i++ {
s.send(batch)
timer.Advance(time.Second)
time.Sleep(time.Millisecond)
}
s.send(batch)

time.Sleep(time.Millisecond)
timer.Advance(time.Second * time.Duration(backoff))

assert.EqualValues(t, xRay.CallNoToPutTraceSegments, backoffMinAttempts+1)
assert.EqualValues(t, 1, timer.AfterCalledTimes())

backoff2 := randGen.Int31n(backoffBaseSeconds * 4)

s.send(batch)

time.Sleep(time.Millisecond)
timer.Advance(time.Second * time.Duration(backoff2))

assert.EqualValues(t, xRay.CallNoToPutTraceSegments, backoffMinAttempts+2)
assert.EqualValues(t, 2, timer.AfterCalledTimes())

close(s.batches)
<-s.done
assert.True(t, strings.Contains(log.Logs[len(log.Logs)-1], doneMsg))
}

func TestPollSendFailedTwiceAndSucceedThird(t *testing.T) {
seed := int64(122321)
randGen := rand.New(rand.NewSource(seed))
timer := test.MockTimerClient{}
log := test.LogSetup()
xRay := new(MockXRayClient)
xRay.On("PutTraceSegments", nil).Return("Error").Times(backoffMinAttempts + 2)
xRay.On("PutTraceSegments", nil).Return("").Once()

s := segmentsBatch{
batches: make(chan []*string, 1),
xRay: xRay,
done: make(chan bool),
randGen: rand.New(rand.NewSource(seed)),
timer: &timer,
}
testMessage := "Test Message"
batch := []*string{&testMessage}

// First failure.
backoff := randGen.Int31n(backoffBaseSeconds * 2)

go s.poll()
for i := 0; i < backoffMinAttempts; i++ {
s.send(batch)
timer.Advance(time.Second)
time.Sleep(time.Millisecond)
}
s.send(batch)

time.Sleep(time.Millisecond)
timer.Advance(time.Second * time.Duration(backoff))

assert.EqualValues(t, xRay.CallNoToPutTraceSegments, backoffMinAttempts+1)
assert.EqualValues(t, 1, timer.AfterCalledTimes())

// Second failure.
backoff2 := randGen.Int31n(backoffBaseSeconds * 4)

s.send(batch)

time.Sleep(time.Millisecond)
timer.Advance(time.Second * time.Duration(backoff2))

assert.EqualValues(t, xRay.CallNoToPutTraceSegments, backoffMinAttempts+2)
assert.EqualValues(t, 2, timer.AfterCalledTimes())

// Third success.
s.send(batch)

time.Sleep(time.Millisecond)
timer.Advance(time.Second)

assert.EqualValues(t, xRay.CallNoToPutTraceSegments, backoffMinAttempts+3)
assert.EqualValues(t, 2, timer.AfterCalledTimes()) // no backoff logic triggered.

close(s.batches)
<-s.done

assert.True(t, strings.Contains(log.Logs[len(log.Logs)-2], fmt.Sprintf("Successfully sent batch of %v", 1)))
assert.True(t, strings.Contains(log.Logs[len(log.Logs)-1], doneMsg))
}

func TestPutTraceSegmentsParameters(t *testing.T) {
log := test.LogSetup()
xRay := new(MockXRayClient)
Expand Down Expand Up @@ -336,74 +183,3 @@ func TestPollSendReturnUnprocessedInvalid(t *testing.T) {
assert.True(t, strings.Contains(log.Logs[0], fmt.Sprintf("Sent batch of %v segments but had %v Unprocessed segments", 1, 1)))
assert.True(t, strings.Contains(log.Logs[1], "Received nil unprocessed segment id from X-Ray service"))
}

type minTestCase struct {
x int32
y int32
result int32
}

func TestMin(t *testing.T) {
testCases := []minTestCase{
{x: 23, y: 54, result: 23},
{x: 1121, y: 21, result: 21},
{x: -12123, y: -4343, result: -12123},
{x: 77, y: 77, result: 77},
{x: 0, y: 0, result: 0},
{x: 0, y: -54, result: -54},
{x: -6543, y: 0, result: -6543},
}
for _, c := range testCases {
r := min(c.x, c.y)

assert.EqualValues(t, c.result, r, fmt.Sprintf("Min Test: X: %v, Y: %v, Expected: %v", c.x, c.y, c.result))
}
}

func TestGetValidJitterBase(t *testing.T) {
testCases := []struct {
backoffBase int
attempt int
expectedValue int32
}{
{backoffBase: 1, attempt: 1, expectedValue: 2},
{backoffBase: 2, attempt: 2, expectedValue: 8},
{backoffBase: 1, attempt: 25, expectedValue: 33554432},
{backoffBase: 5, attempt: 30, expectedValue: 1073741823},
{backoffBase: 1, attempt: 100, expectedValue: 1073741823},
}
for _, tc := range testCases {
backoffBase := tc.backoffBase
attempt := tc.attempt

base := getValidJitterBase(backoffBase, attempt)

assert.EqualValues(t, tc.expectedValue, base)
}
}

func TestBackoff(t *testing.T) {
failedAttempts := []int{1, 2, 5, 7, 10, 23, 100, 1000, 343212}
seedRandom := rand.New(rand.NewSource(time.Now().Unix()))
for _, fa := range failedAttempts {
seed := int64(seedRandom.Int63())
randGen := rand.New(rand.NewSource(seed))
s := segmentsBatch{
randGen: rand.New(rand.NewSource(seed)),
}

backoffSec := s.backOff(fa)

var backoffExpected int32

if fa > backoffMinAttempts {
randomBackoff := randGen.Int31n(getValidJitterBase(backoffBaseSeconds, fa-backoffMinAttempts))
backoffExpected = randomBackoff
}

if backoffCapSeconds < backoffExpected {
backoffExpected = backoffCapSeconds
}
assert.EqualValues(t, backoffExpected, backoffSec, fmt.Sprintf("Test Case: Failed Attempt: %v, Rand Seed: %v", fa, seed))
}
}

0 comments on commit dc5f98d

Please sign in to comment.