From cbf6470593f8496db42b3fbb4931ffd00b2641ae Mon Sep 17 00:00:00 2001
From: Prashant Srivastava <srprash@amazon.com>
Date: Thu, 26 Jan 2023 07:44:12 -0800
Subject: [PATCH 1/2] remove custom backoff logic

---
 pkg/processor/batchprocessor.go      |  48 +-----
 pkg/processor/batchprocessor_test.go | 224 ---------------------------
 2 files changed, 1 insertion(+), 271 deletions(-)

diff --git a/pkg/processor/batchprocessor.go b/pkg/processor/batchprocessor.go
index bb54fe1..1a60163 100644
--- a/pkg/processor/batchprocessor.go
+++ b/pkg/processor/batchprocessor.go
@@ -13,7 +13,6 @@ import (
 	"github.com/aws/aws-xray-daemon/pkg/conn"
 	"github.com/aws/aws-xray-daemon/pkg/telemetry"
 	"github.com/aws/aws-xray-daemon/pkg/util/timer"
-	"math"
 	"math/rand"
 	"regexp"
 	"time"
@@ -22,12 +21,6 @@ import (
 	log "github.com/cihub/seelog"
 )
 
-const (
-	backoffCapSeconds  = 30
-	backoffMinAttempts = 10
-	backoffBaseSeconds = 1
-)
-
 var /* const */ segIdRegexp = regexp.MustCompile(`\"id\":\"(.*?)\"`)
 var /* const */ traceIdRegexp = regexp.MustCompile(`\"trace_id\":\"(.*?)\"`)
 
@@ -68,7 +61,7 @@ func (s *segmentsBatch) send(batch []*string) {
 }
 
 func (s *segmentsBatch) poll() {
-	failedAttempt := 0
+	//failedAttempt := 0
 	for {
 		batch, ok := <-s.batches
 		if ok {
@@ -80,16 +73,9 @@ func (s *segmentsBatch) poll() {
 			r, err := s.xRay.PutTraceSegments(params)
 			if err != nil {
 				telemetry.EvaluateConnectionError(err)
-				failedAttempt++
-				backOffSeconds := s.backOff(failedAttempt)
 				log.Errorf("Sending segment batch failed with: %v", err)
-				log.Warnf("Delaying sending of additional batches by %v seconds", backOffSeconds)
-				if backOffSeconds > 0 {
-					<-s.timer.After(time.Second * time.Duration(backOffSeconds))
-				}
 				continue
 			} else {
-				failedAttempt = 0
 				telemetry.T.SegmentSent(int64(len(batch)))
 			}
 			elapsed := time.Since(start)
@@ -136,35 +122,3 @@ func (s *segmentsBatch) poll() {
 func (s *segmentsBatch) close() {
 	close(s.batches)
 }
-
-func min(x, y int32) int32 {
-	if x < y {
-		return x
-	}
-	return y
-}
-
-// Returns int32 number for Full Jitter Base
-// If the computation result in value greater than Max Int31 it returns MAX Int31 value
-func getValidJitterBase(backoffBase, attempt int) int32 {
-	base := float64(backoffBase) * math.Pow(2, float64(attempt))
-	var baseInt int32
-	if base > float64(math.MaxInt32/2) {
-		baseInt = math.MaxInt32 / 2
-	} else {
-		baseInt = int32(base)
-	}
-	return baseInt
-}
-
-func (s *segmentsBatch) backOff(attempt int) int32 {
-	if attempt <= backoffMinAttempts {
-		return 0
-	}
-	// Attempts to be considered for Jitter Backoff
-	backoffAttempts := attempt - backoffMinAttempts
-	// As per Full Jitter described in https://www.awsarchitectureblog.com/2015/03/backoff.html
-	base := getValidJitterBase(backoffBaseSeconds, backoffAttempts)
-	randomBackoff := s.randGen.Int31n(base)
-	return min(backoffCapSeconds, randomBackoff)
-}
diff --git a/pkg/processor/batchprocessor_test.go b/pkg/processor/batchprocessor_test.go
index 8c6fcf5..1a75a4f 100644
--- a/pkg/processor/batchprocessor_test.go
+++ b/pkg/processor/batchprocessor_test.go
@@ -12,10 +12,8 @@ package processor
 import (
 	"errors"
 	"fmt"
-	"math/rand"
 	"strings"
 	"testing"
-	"time"
 
 	"github.com/aws/aws-sdk-go/service/xray"
 	"github.com/aws/aws-xray-daemon/pkg/util/test"
@@ -112,157 +110,6 @@ func TestPollSendSuccess(t *testing.T) {
 	assert.True(t, strings.Contains(log.Logs[1], doneMsg))
 }
 
-func TestPollSendFailedOnceMoreThanMin(t *testing.T) {
-	seed := int64(122321)
-	randGen := rand.New(rand.NewSource(seed))
-	timer := test.MockTimerClient{}
-	log := test.LogSetup()
-	xRay := new(MockXRayClient)
-	xRay.On("PutTraceSegments", nil).Return("Error")
-	s := segmentsBatch{
-		batches: make(chan []*string, 1),
-		xRay:    xRay,
-		done:    make(chan bool),
-		randGen: rand.New(rand.NewSource(seed)),
-		timer:   &timer,
-	}
-	testMessage := "Test Message"
-	batch := []*string{&testMessage}
-	// First failure
-	backoff := randGen.Int31n(backoffBaseSeconds * 2)
-
-	go s.poll()
-	for i := 0; i < backoffMinAttempts; i++ {
-		s.send(batch)
-		timer.Advance(time.Second)
-		time.Sleep(time.Millisecond)
-	}
-	s.send(batch)
-	close(s.batches)
-
-	time.Sleep(time.Millisecond)
-	timer.Advance(time.Second * time.Duration(backoff))
-
-	assert.EqualValues(t, xRay.CallNoToPutTraceSegments, backoffMinAttempts+1)
-	// Backed off only once after min failed attempts are exhausted
-	assert.EqualValues(t, 1, timer.AfterCalledTimes())
-
-	<-s.done
-
-	assert.True(t, strings.Contains(log.Logs[len(log.Logs)-1], doneMsg))
-}
-
-func TestPollSendFailedTwiceMoreThanMin(t *testing.T) {
-	seed := int64(122321)
-	randGen := rand.New(rand.NewSource(seed))
-	timer := test.MockTimerClient{}
-	log := test.LogSetup()
-	xRay := new(MockXRayClient)
-	xRay.On("PutTraceSegments", nil).Return("Error")
-	s := segmentsBatch{
-		batches: make(chan []*string, 1),
-		xRay:    xRay,
-		done:    make(chan bool),
-		randGen: rand.New(rand.NewSource(seed)),
-		timer:   &timer,
-	}
-	testMessage := "Test Message"
-	batch := []*string{&testMessage}
-	// First failure
-	backoff := randGen.Int31n(backoffBaseSeconds * 2)
-
-	go s.poll()
-	for i := 0; i < backoffMinAttempts; i++ {
-		s.send(batch)
-		timer.Advance(time.Second)
-		time.Sleep(time.Millisecond)
-	}
-	s.send(batch)
-
-	time.Sleep(time.Millisecond)
-	timer.Advance(time.Second * time.Duration(backoff))
-
-	assert.EqualValues(t, xRay.CallNoToPutTraceSegments, backoffMinAttempts+1)
-	assert.EqualValues(t, 1, timer.AfterCalledTimes())
-
-	backoff2 := randGen.Int31n(backoffBaseSeconds * 4)
-
-	s.send(batch)
-
-	time.Sleep(time.Millisecond)
-	timer.Advance(time.Second * time.Duration(backoff2))
-
-	assert.EqualValues(t, xRay.CallNoToPutTraceSegments, backoffMinAttempts+2)
-	assert.EqualValues(t, 2, timer.AfterCalledTimes())
-
-	close(s.batches)
-	<-s.done
-	assert.True(t, strings.Contains(log.Logs[len(log.Logs)-1], doneMsg))
-}
-
-func TestPollSendFailedTwiceAndSucceedThird(t *testing.T) {
-	seed := int64(122321)
-	randGen := rand.New(rand.NewSource(seed))
-	timer := test.MockTimerClient{}
-	log := test.LogSetup()
-	xRay := new(MockXRayClient)
-	xRay.On("PutTraceSegments", nil).Return("Error").Times(backoffMinAttempts + 2)
-	xRay.On("PutTraceSegments", nil).Return("").Once()
-
-	s := segmentsBatch{
-		batches: make(chan []*string, 1),
-		xRay:    xRay,
-		done:    make(chan bool),
-		randGen: rand.New(rand.NewSource(seed)),
-		timer:   &timer,
-	}
-	testMessage := "Test Message"
-	batch := []*string{&testMessage}
-
-	// First failure.
-	backoff := randGen.Int31n(backoffBaseSeconds * 2)
-
-	go s.poll()
-	for i := 0; i < backoffMinAttempts; i++ {
-		s.send(batch)
-		timer.Advance(time.Second)
-		time.Sleep(time.Millisecond)
-	}
-	s.send(batch)
-
-	time.Sleep(time.Millisecond)
-	timer.Advance(time.Second * time.Duration(backoff))
-
-	assert.EqualValues(t, xRay.CallNoToPutTraceSegments, backoffMinAttempts+1)
-	assert.EqualValues(t, 1, timer.AfterCalledTimes())
-
-	// Second failure.
-	backoff2 := randGen.Int31n(backoffBaseSeconds * 4)
-
-	s.send(batch)
-
-	time.Sleep(time.Millisecond)
-	timer.Advance(time.Second * time.Duration(backoff2))
-
-	assert.EqualValues(t, xRay.CallNoToPutTraceSegments, backoffMinAttempts+2)
-	assert.EqualValues(t, 2, timer.AfterCalledTimes())
-
-	// Third success.
-	s.send(batch)
-
-	time.Sleep(time.Millisecond)
-	timer.Advance(time.Second)
-
-	assert.EqualValues(t, xRay.CallNoToPutTraceSegments, backoffMinAttempts+3)
-	assert.EqualValues(t, 2, timer.AfterCalledTimes()) // no backoff logic triggered.
-
-	close(s.batches)
-	<-s.done
-
-	assert.True(t, strings.Contains(log.Logs[len(log.Logs)-2], fmt.Sprintf("Successfully sent batch of %v", 1)))
-	assert.True(t, strings.Contains(log.Logs[len(log.Logs)-1], doneMsg))
-}
-
 func TestPutTraceSegmentsParameters(t *testing.T) {
 	log := test.LogSetup()
 	xRay := new(MockXRayClient)
@@ -336,74 +183,3 @@ func TestPollSendReturnUnprocessedInvalid(t *testing.T) {
 	assert.True(t, strings.Contains(log.Logs[0], fmt.Sprintf("Sent batch of %v segments but had %v Unprocessed segments", 1, 1)))
 	assert.True(t, strings.Contains(log.Logs[1], "Received nil unprocessed segment id from X-Ray service"))
 }
-
-type minTestCase struct {
-	x      int32
-	y      int32
-	result int32
-}
-
-func TestMin(t *testing.T) {
-	testCases := []minTestCase{
-		{x: 23, y: 54, result: 23},
-		{x: 1121, y: 21, result: 21},
-		{x: -12123, y: -4343, result: -12123},
-		{x: 77, y: 77, result: 77},
-		{x: 0, y: 0, result: 0},
-		{x: 0, y: -54, result: -54},
-		{x: -6543, y: 0, result: -6543},
-	}
-	for _, c := range testCases {
-		r := min(c.x, c.y)
-
-		assert.EqualValues(t, c.result, r, fmt.Sprintf("Min Test: X: %v, Y: %v, Expected: %v", c.x, c.y, c.result))
-	}
-}
-
-func TestGetValidJitterBase(t *testing.T) {
-	testCases := []struct {
-		backoffBase   int
-		attempt       int
-		expectedValue int32
-	}{
-		{backoffBase: 1, attempt: 1, expectedValue: 2},
-		{backoffBase: 2, attempt: 2, expectedValue: 8},
-		{backoffBase: 1, attempt: 25, expectedValue: 33554432},
-		{backoffBase: 5, attempt: 30, expectedValue: 1073741823},
-		{backoffBase: 1, attempt: 100, expectedValue: 1073741823},
-	}
-	for _, tc := range testCases {
-		backoffBase := tc.backoffBase
-		attempt := tc.attempt
-
-		base := getValidJitterBase(backoffBase, attempt)
-
-		assert.EqualValues(t, tc.expectedValue, base)
-	}
-}
-
-func TestBackoff(t *testing.T) {
-	failedAttempts := []int{1, 2, 5, 7, 10, 23, 100, 1000, 343212}
-	seedRandom := rand.New(rand.NewSource(time.Now().Unix()))
-	for _, fa := range failedAttempts {
-		seed := int64(seedRandom.Int63())
-		randGen := rand.New(rand.NewSource(seed))
-		s := segmentsBatch{
-			randGen: rand.New(rand.NewSource(seed)),
-		}
-
-		backoffSec := s.backOff(fa)
-
-		var backoffExpected int32
-
-		if fa > backoffMinAttempts {
-			randomBackoff := randGen.Int31n(getValidJitterBase(backoffBaseSeconds, fa-backoffMinAttempts))
-			backoffExpected = randomBackoff
-		}
-
-		if backoffCapSeconds < backoffExpected {
-			backoffExpected = backoffCapSeconds
-		}
-		assert.EqualValues(t, backoffExpected, backoffSec, fmt.Sprintf("Test Case: Failed Attempt: %v, Rand Seed: %v", fa, seed))
-	}
-}

From ac1c68d039afd81c422d6b121aa3f9ecb54d6d3d Mon Sep 17 00:00:00 2001
From: Prashant Srivastava <srprash@amazon.com>
Date: Thu, 26 Jan 2023 13:14:33 -0800
Subject: [PATCH 2/2] remove commented out code

---
 pkg/processor/batchprocessor.go | 1 -
 1 file changed, 1 deletion(-)

diff --git a/pkg/processor/batchprocessor.go b/pkg/processor/batchprocessor.go
index 1a60163..bce2fed 100644
--- a/pkg/processor/batchprocessor.go
+++ b/pkg/processor/batchprocessor.go
@@ -61,7 +61,6 @@ func (s *segmentsBatch) send(batch []*string) {
 }
 
 func (s *segmentsBatch) poll() {
-	//failedAttempt := 0
 	for {
 		batch, ok := <-s.batches
 		if ok {