Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove custom backoff logic for sending segments #186

Merged
merged 2 commits into from
Jan 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 0 additions & 47 deletions pkg/processor/batchprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/aws/aws-xray-daemon/pkg/conn"
"github.com/aws/aws-xray-daemon/pkg/telemetry"
"github.com/aws/aws-xray-daemon/pkg/util/timer"
"math"
"math/rand"
"regexp"
"time"
Expand All @@ -22,12 +21,6 @@ import (
log "github.com/cihub/seelog"
)

const (
backoffCapSeconds = 30
backoffMinAttempts = 10
backoffBaseSeconds = 1
)

var /* const */ segIdRegexp = regexp.MustCompile(`\"id\":\"(.*?)\"`)
var /* const */ traceIdRegexp = regexp.MustCompile(`\"trace_id\":\"(.*?)\"`)

Expand Down Expand Up @@ -68,7 +61,6 @@ func (s *segmentsBatch) send(batch []*string) {
}

func (s *segmentsBatch) poll() {
failedAttempt := 0
for {
batch, ok := <-s.batches
if ok {
Expand All @@ -80,16 +72,9 @@ func (s *segmentsBatch) poll() {
r, err := s.xRay.PutTraceSegments(params)
if err != nil {
telemetry.EvaluateConnectionError(err)
failedAttempt++
backOffSeconds := s.backOff(failedAttempt)
log.Errorf("Sending segment batch failed with: %v", err)
log.Warnf("Delaying sending of additional batches by %v seconds", backOffSeconds)
if backOffSeconds > 0 {
<-s.timer.After(time.Second * time.Duration(backOffSeconds))
}
continue
} else {
failedAttempt = 0
telemetry.T.SegmentSent(int64(len(batch)))
}
elapsed := time.Since(start)
Expand Down Expand Up @@ -136,35 +121,3 @@ func (s *segmentsBatch) poll() {
func (s *segmentsBatch) close() {
close(s.batches)
}

func min(x, y int32) int32 {
if x < y {
return x
}
return y
}

// Returns int32 number for Full Jitter Base
// If the computation result in value greater than Max Int31 it returns MAX Int31 value
func getValidJitterBase(backoffBase, attempt int) int32 {
base := float64(backoffBase) * math.Pow(2, float64(attempt))
var baseInt int32
if base > float64(math.MaxInt32/2) {
baseInt = math.MaxInt32 / 2
} else {
baseInt = int32(base)
}
return baseInt
}

func (s *segmentsBatch) backOff(attempt int) int32 {
if attempt <= backoffMinAttempts {
return 0
}
// Attempts to be considered for Jitter Backoff
backoffAttempts := attempt - backoffMinAttempts
// As per Full Jitter described in https://www.awsarchitectureblog.com/2015/03/backoff.html
base := getValidJitterBase(backoffBaseSeconds, backoffAttempts)
randomBackoff := s.randGen.Int31n(base)
return min(backoffCapSeconds, randomBackoff)
}
224 changes: 0 additions & 224 deletions pkg/processor/batchprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ package processor
import (
"errors"
"fmt"
"math/rand"
"strings"
"testing"
"time"

"github.com/aws/aws-sdk-go/service/xray"
"github.com/aws/aws-xray-daemon/pkg/util/test"
Expand Down Expand Up @@ -112,157 +110,6 @@ func TestPollSendSuccess(t *testing.T) {
assert.True(t, strings.Contains(log.Logs[1], doneMsg))
}

func TestPollSendFailedOnceMoreThanMin(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We removed quite a few tests because of the new functionality.

Can we add some back in that test the new functionality?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the removal of custom backoff logic, we automatically fall to the AWS SDK retry behavior which is not a new functionality in our code base. Are you suggesting we add a unit test for the AWS SDK retry functionality? IMO it won't be appropriate because if their retry behavior changes, it will break our tests.

seed := int64(122321)
randGen := rand.New(rand.NewSource(seed))
timer := test.MockTimerClient{}
log := test.LogSetup()
xRay := new(MockXRayClient)
xRay.On("PutTraceSegments", nil).Return("Error")
s := segmentsBatch{
batches: make(chan []*string, 1),
xRay: xRay,
done: make(chan bool),
randGen: rand.New(rand.NewSource(seed)),
timer: &timer,
}
testMessage := "Test Message"
batch := []*string{&testMessage}
// First failure
backoff := randGen.Int31n(backoffBaseSeconds * 2)

go s.poll()
for i := 0; i < backoffMinAttempts; i++ {
s.send(batch)
timer.Advance(time.Second)
time.Sleep(time.Millisecond)
}
s.send(batch)
close(s.batches)

time.Sleep(time.Millisecond)
timer.Advance(time.Second * time.Duration(backoff))

assert.EqualValues(t, xRay.CallNoToPutTraceSegments, backoffMinAttempts+1)
// Backed off only once after min failed attempts are exhausted
assert.EqualValues(t, 1, timer.AfterCalledTimes())

<-s.done

assert.True(t, strings.Contains(log.Logs[len(log.Logs)-1], doneMsg))
}

func TestPollSendFailedTwiceMoreThanMin(t *testing.T) {
seed := int64(122321)
randGen := rand.New(rand.NewSource(seed))
timer := test.MockTimerClient{}
log := test.LogSetup()
xRay := new(MockXRayClient)
xRay.On("PutTraceSegments", nil).Return("Error")
s := segmentsBatch{
batches: make(chan []*string, 1),
xRay: xRay,
done: make(chan bool),
randGen: rand.New(rand.NewSource(seed)),
timer: &timer,
}
testMessage := "Test Message"
batch := []*string{&testMessage}
// First failure
backoff := randGen.Int31n(backoffBaseSeconds * 2)

go s.poll()
for i := 0; i < backoffMinAttempts; i++ {
s.send(batch)
timer.Advance(time.Second)
time.Sleep(time.Millisecond)
}
s.send(batch)

time.Sleep(time.Millisecond)
timer.Advance(time.Second * time.Duration(backoff))

assert.EqualValues(t, xRay.CallNoToPutTraceSegments, backoffMinAttempts+1)
assert.EqualValues(t, 1, timer.AfterCalledTimes())

backoff2 := randGen.Int31n(backoffBaseSeconds * 4)

s.send(batch)

time.Sleep(time.Millisecond)
timer.Advance(time.Second * time.Duration(backoff2))

assert.EqualValues(t, xRay.CallNoToPutTraceSegments, backoffMinAttempts+2)
assert.EqualValues(t, 2, timer.AfterCalledTimes())

close(s.batches)
<-s.done
assert.True(t, strings.Contains(log.Logs[len(log.Logs)-1], doneMsg))
}

func TestPollSendFailedTwiceAndSucceedThird(t *testing.T) {
seed := int64(122321)
randGen := rand.New(rand.NewSource(seed))
timer := test.MockTimerClient{}
log := test.LogSetup()
xRay := new(MockXRayClient)
xRay.On("PutTraceSegments", nil).Return("Error").Times(backoffMinAttempts + 2)
xRay.On("PutTraceSegments", nil).Return("").Once()

s := segmentsBatch{
batches: make(chan []*string, 1),
xRay: xRay,
done: make(chan bool),
randGen: rand.New(rand.NewSource(seed)),
timer: &timer,
}
testMessage := "Test Message"
batch := []*string{&testMessage}

// First failure.
backoff := randGen.Int31n(backoffBaseSeconds * 2)

go s.poll()
for i := 0; i < backoffMinAttempts; i++ {
s.send(batch)
timer.Advance(time.Second)
time.Sleep(time.Millisecond)
}
s.send(batch)

time.Sleep(time.Millisecond)
timer.Advance(time.Second * time.Duration(backoff))

assert.EqualValues(t, xRay.CallNoToPutTraceSegments, backoffMinAttempts+1)
assert.EqualValues(t, 1, timer.AfterCalledTimes())

// Second failure.
backoff2 := randGen.Int31n(backoffBaseSeconds * 4)

s.send(batch)

time.Sleep(time.Millisecond)
timer.Advance(time.Second * time.Duration(backoff2))

assert.EqualValues(t, xRay.CallNoToPutTraceSegments, backoffMinAttempts+2)
assert.EqualValues(t, 2, timer.AfterCalledTimes())

// Third success.
s.send(batch)

time.Sleep(time.Millisecond)
timer.Advance(time.Second)

assert.EqualValues(t, xRay.CallNoToPutTraceSegments, backoffMinAttempts+3)
assert.EqualValues(t, 2, timer.AfterCalledTimes()) // no backoff logic triggered.

close(s.batches)
<-s.done

assert.True(t, strings.Contains(log.Logs[len(log.Logs)-2], fmt.Sprintf("Successfully sent batch of %v", 1)))
assert.True(t, strings.Contains(log.Logs[len(log.Logs)-1], doneMsg))
}

func TestPutTraceSegmentsParameters(t *testing.T) {
log := test.LogSetup()
xRay := new(MockXRayClient)
Expand Down Expand Up @@ -336,74 +183,3 @@ func TestPollSendReturnUnprocessedInvalid(t *testing.T) {
assert.True(t, strings.Contains(log.Logs[0], fmt.Sprintf("Sent batch of %v segments but had %v Unprocessed segments", 1, 1)))
assert.True(t, strings.Contains(log.Logs[1], "Received nil unprocessed segment id from X-Ray service"))
}

type minTestCase struct {
x int32
y int32
result int32
}

func TestMin(t *testing.T) {
testCases := []minTestCase{
{x: 23, y: 54, result: 23},
{x: 1121, y: 21, result: 21},
{x: -12123, y: -4343, result: -12123},
{x: 77, y: 77, result: 77},
{x: 0, y: 0, result: 0},
{x: 0, y: -54, result: -54},
{x: -6543, y: 0, result: -6543},
}
for _, c := range testCases {
r := min(c.x, c.y)

assert.EqualValues(t, c.result, r, fmt.Sprintf("Min Test: X: %v, Y: %v, Expected: %v", c.x, c.y, c.result))
}
}

func TestGetValidJitterBase(t *testing.T) {
testCases := []struct {
backoffBase int
attempt int
expectedValue int32
}{
{backoffBase: 1, attempt: 1, expectedValue: 2},
{backoffBase: 2, attempt: 2, expectedValue: 8},
{backoffBase: 1, attempt: 25, expectedValue: 33554432},
{backoffBase: 5, attempt: 30, expectedValue: 1073741823},
{backoffBase: 1, attempt: 100, expectedValue: 1073741823},
}
for _, tc := range testCases {
backoffBase := tc.backoffBase
attempt := tc.attempt

base := getValidJitterBase(backoffBase, attempt)

assert.EqualValues(t, tc.expectedValue, base)
}
}

func TestBackoff(t *testing.T) {
failedAttempts := []int{1, 2, 5, 7, 10, 23, 100, 1000, 343212}
seedRandom := rand.New(rand.NewSource(time.Now().Unix()))
for _, fa := range failedAttempts {
seed := int64(seedRandom.Int63())
randGen := rand.New(rand.NewSource(seed))
s := segmentsBatch{
randGen: rand.New(rand.NewSource(seed)),
}

backoffSec := s.backOff(fa)

var backoffExpected int32

if fa > backoffMinAttempts {
randomBackoff := randGen.Int31n(getValidJitterBase(backoffBaseSeconds, fa-backoffMinAttempts))
backoffExpected = randomBackoff
}

if backoffCapSeconds < backoffExpected {
backoffExpected = backoffCapSeconds
}
assert.EqualValues(t, backoffExpected, backoffSec, fmt.Sprintf("Test Case: Failed Attempt: %v, Rand Seed: %v", fa, seed))
}
}