diff --git a/pkg/processor/batchprocessor.go b/pkg/processor/batchprocessor.go index bb54fe1..bce2fed 100644 --- a/pkg/processor/batchprocessor.go +++ b/pkg/processor/batchprocessor.go @@ -13,7 +13,6 @@ import ( "github.com/aws/aws-xray-daemon/pkg/conn" "github.com/aws/aws-xray-daemon/pkg/telemetry" "github.com/aws/aws-xray-daemon/pkg/util/timer" - "math" "math/rand" "regexp" "time" @@ -22,12 +21,6 @@ import ( log "github.com/cihub/seelog" ) -const ( - backoffCapSeconds = 30 - backoffMinAttempts = 10 - backoffBaseSeconds = 1 -) - var /* const */ segIdRegexp = regexp.MustCompile(`\"id\":\"(.*?)\"`) var /* const */ traceIdRegexp = regexp.MustCompile(`\"trace_id\":\"(.*?)\"`) @@ -68,7 +61,6 @@ func (s *segmentsBatch) send(batch []*string) { } func (s *segmentsBatch) poll() { - failedAttempt := 0 for { batch, ok := <-s.batches if ok { @@ -80,16 +72,9 @@ func (s *segmentsBatch) poll() { r, err := s.xRay.PutTraceSegments(params) if err != nil { telemetry.EvaluateConnectionError(err) - failedAttempt++ - backOffSeconds := s.backOff(failedAttempt) log.Errorf("Sending segment batch failed with: %v", err) - log.Warnf("Delaying sending of additional batches by %v seconds", backOffSeconds) - if backOffSeconds > 0 { - <-s.timer.After(time.Second * time.Duration(backOffSeconds)) - } continue } else { - failedAttempt = 0 telemetry.T.SegmentSent(int64(len(batch))) } elapsed := time.Since(start) @@ -136,35 +121,3 @@ func (s *segmentsBatch) poll() { func (s *segmentsBatch) close() { close(s.batches) } - -func min(x, y int32) int32 { - if x < y { - return x - } - return y -} - -// Returns int32 number for Full Jitter Base -// If the computation result in value greater than Max Int31 it returns MAX Int31 value -func getValidJitterBase(backoffBase, attempt int) int32 { - base := float64(backoffBase) * math.Pow(2, float64(attempt)) - var baseInt int32 - if base > float64(math.MaxInt32/2) { - baseInt = math.MaxInt32 / 2 - } else { - baseInt = int32(base) - } - return baseInt -} - -func (s *segmentsBatch) backOff(attempt int) int32 { - if attempt <= backoffMinAttempts { - return 0 - } - // Attempts to be considered for Jitter Backoff - backoffAttempts := attempt - backoffMinAttempts - // As per Full Jitter described in https://www.awsarchitectureblog.com/2015/03/backoff.html - base := getValidJitterBase(backoffBaseSeconds, backoffAttempts) - randomBackoff := s.randGen.Int31n(base) - return min(backoffCapSeconds, randomBackoff) -} diff --git a/pkg/processor/batchprocessor_test.go b/pkg/processor/batchprocessor_test.go index 8c6fcf5..1a75a4f 100644 --- a/pkg/processor/batchprocessor_test.go +++ b/pkg/processor/batchprocessor_test.go @@ -12,10 +12,8 @@ package processor import ( "errors" "fmt" - "math/rand" "strings" "testing" - "time" "github.com/aws/aws-sdk-go/service/xray" "github.com/aws/aws-xray-daemon/pkg/util/test" @@ -112,157 +110,6 @@ func TestPollSendSuccess(t *testing.T) { assert.True(t, strings.Contains(log.Logs[1], doneMsg)) } -func TestPollSendFailedOnceMoreThanMin(t *testing.T) { - seed := int64(122321) - randGen := rand.New(rand.NewSource(seed)) - timer := test.MockTimerClient{} - log := test.LogSetup() - xRay := new(MockXRayClient) - xRay.On("PutTraceSegments", nil).Return("Error") - s := segmentsBatch{ - batches: make(chan []*string, 1), - xRay: xRay, - done: make(chan bool), - randGen: rand.New(rand.NewSource(seed)), - timer: &timer, - } - testMessage := "Test Message" - batch := []*string{&testMessage} - // First failure - backoff := randGen.Int31n(backoffBaseSeconds * 2) - - go s.poll() - for i := 0; i < backoffMinAttempts; i++ { - s.send(batch) - timer.Advance(time.Second) - time.Sleep(time.Millisecond) - } - s.send(batch) - close(s.batches) - - time.Sleep(time.Millisecond) - timer.Advance(time.Second * time.Duration(backoff)) - - assert.EqualValues(t, xRay.CallNoToPutTraceSegments, backoffMinAttempts+1) - // Backed off only once after min failed attempts are exhausted - assert.EqualValues(t, 1, timer.AfterCalledTimes()) - - <-s.done - - assert.True(t, strings.Contains(log.Logs[len(log.Logs)-1], doneMsg)) -} - -func TestPollSendFailedTwiceMoreThanMin(t *testing.T) { - seed := int64(122321) - randGen := rand.New(rand.NewSource(seed)) - timer := test.MockTimerClient{} - log := test.LogSetup() - xRay := new(MockXRayClient) - xRay.On("PutTraceSegments", nil).Return("Error") - s := segmentsBatch{ - batches: make(chan []*string, 1), - xRay: xRay, - done: make(chan bool), - randGen: rand.New(rand.NewSource(seed)), - timer: &timer, - } - testMessage := "Test Message" - batch := []*string{&testMessage} - // First failure - backoff := randGen.Int31n(backoffBaseSeconds * 2) - - go s.poll() - for i := 0; i < backoffMinAttempts; i++ { - s.send(batch) - timer.Advance(time.Second) - time.Sleep(time.Millisecond) - } - s.send(batch) - - time.Sleep(time.Millisecond) - timer.Advance(time.Second * time.Duration(backoff)) - - assert.EqualValues(t, xRay.CallNoToPutTraceSegments, backoffMinAttempts+1) - assert.EqualValues(t, 1, timer.AfterCalledTimes()) - - backoff2 := randGen.Int31n(backoffBaseSeconds * 4) - - s.send(batch) - - time.Sleep(time.Millisecond) - timer.Advance(time.Second * time.Duration(backoff2)) - - assert.EqualValues(t, xRay.CallNoToPutTraceSegments, backoffMinAttempts+2) - assert.EqualValues(t, 2, timer.AfterCalledTimes()) - - close(s.batches) - <-s.done - assert.True(t, strings.Contains(log.Logs[len(log.Logs)-1], doneMsg)) -} - -func TestPollSendFailedTwiceAndSucceedThird(t *testing.T) { - seed := int64(122321) - randGen := rand.New(rand.NewSource(seed)) - timer := test.MockTimerClient{} - log := test.LogSetup() - xRay := new(MockXRayClient) - xRay.On("PutTraceSegments", nil).Return("Error").Times(backoffMinAttempts + 2) - xRay.On("PutTraceSegments", nil).Return("").Once() - - s := segmentsBatch{ - batches: make(chan []*string, 1), - xRay: xRay, - done: make(chan bool), - randGen: rand.New(rand.NewSource(seed)), - timer: &timer, - } - testMessage := "Test Message" - batch := []*string{&testMessage} - - // First failure. - backoff := randGen.Int31n(backoffBaseSeconds * 2) - - go s.poll() - for i := 0; i < backoffMinAttempts; i++ { - s.send(batch) - timer.Advance(time.Second) - time.Sleep(time.Millisecond) - } - s.send(batch) - - time.Sleep(time.Millisecond) - timer.Advance(time.Second * time.Duration(backoff)) - - assert.EqualValues(t, xRay.CallNoToPutTraceSegments, backoffMinAttempts+1) - assert.EqualValues(t, 1, timer.AfterCalledTimes()) - - // Second failure. - backoff2 := randGen.Int31n(backoffBaseSeconds * 4) - - s.send(batch) - - time.Sleep(time.Millisecond) - timer.Advance(time.Second * time.Duration(backoff2)) - - assert.EqualValues(t, xRay.CallNoToPutTraceSegments, backoffMinAttempts+2) - assert.EqualValues(t, 2, timer.AfterCalledTimes()) - - // Third success. - s.send(batch) - - time.Sleep(time.Millisecond) - timer.Advance(time.Second) - - assert.EqualValues(t, xRay.CallNoToPutTraceSegments, backoffMinAttempts+3) - assert.EqualValues(t, 2, timer.AfterCalledTimes()) // no backoff logic triggered. - - close(s.batches) - <-s.done - - assert.True(t, strings.Contains(log.Logs[len(log.Logs)-2], fmt.Sprintf("Successfully sent batch of %v", 1))) - assert.True(t, strings.Contains(log.Logs[len(log.Logs)-1], doneMsg)) -} - func TestPutTraceSegmentsParameters(t *testing.T) { log := test.LogSetup() xRay := new(MockXRayClient) @@ -336,74 +183,3 @@ func TestPollSendReturnUnprocessedInvalid(t *testing.T) { assert.True(t, strings.Contains(log.Logs[0], fmt.Sprintf("Sent batch of %v segments but had %v Unprocessed segments", 1, 1))) assert.True(t, strings.Contains(log.Logs[1], "Received nil unprocessed segment id from X-Ray service")) } - -type minTestCase struct { - x int32 - y int32 - result int32 -} - -func TestMin(t *testing.T) { - testCases := []minTestCase{ - {x: 23, y: 54, result: 23}, - {x: 1121, y: 21, result: 21}, - {x: -12123, y: -4343, result: -12123}, - {x: 77, y: 77, result: 77}, - {x: 0, y: 0, result: 0}, - {x: 0, y: -54, result: -54}, - {x: -6543, y: 0, result: -6543}, - } - for _, c := range testCases { - r := min(c.x, c.y) - - assert.EqualValues(t, c.result, r, fmt.Sprintf("Min Test: X: %v, Y: %v, Expected: %v", c.x, c.y, c.result)) - } -} - -func TestGetValidJitterBase(t *testing.T) { - testCases := []struct { - backoffBase int - attempt int - expectedValue int32 - }{ - {backoffBase: 1, attempt: 1, expectedValue: 2}, - {backoffBase: 2, attempt: 2, expectedValue: 8}, - {backoffBase: 1, attempt: 25, expectedValue: 33554432}, - {backoffBase: 5, attempt: 30, expectedValue: 1073741823}, - {backoffBase: 1, attempt: 100, expectedValue: 1073741823}, - } - for _, tc := range testCases { - backoffBase := tc.backoffBase - attempt := tc.attempt - - base := getValidJitterBase(backoffBase, attempt) - - assert.EqualValues(t, tc.expectedValue, base) - } -} - -func TestBackoff(t *testing.T) { - failedAttempts := []int{1, 2, 5, 7, 10, 23, 100, 1000, 343212} - seedRandom := rand.New(rand.NewSource(time.Now().Unix())) - for _, fa := range failedAttempts { - seed := int64(seedRandom.Int63()) - randGen := rand.New(rand.NewSource(seed)) - s := segmentsBatch{ - randGen: rand.New(rand.NewSource(seed)), - } - - backoffSec := s.backOff(fa) - - var backoffExpected int32 - - if fa > backoffMinAttempts { - randomBackoff := randGen.Int31n(getValidJitterBase(backoffBaseSeconds, fa-backoffMinAttempts)) - backoffExpected = randomBackoff - } - - if backoffCapSeconds < backoffExpected { - backoffExpected = backoffCapSeconds - } - assert.EqualValues(t, backoffExpected, backoffSec, fmt.Sprintf("Test Case: Failed Attempt: %v, Rand Seed: %v", fa, seed)) - } -}