Skip to content

Commit

Permalink
Merge pull request #377 from habx/feature/limit-concurrency-aws-call
Browse files Browse the repository at this point in the history
APP-28974: feat(core): limit AWS requests concurrency
  • Loading branch information
habxtech authored Apr 11, 2022
2 parents 3ce9382 + 599eaea commit 2216cae
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 130 deletions.
149 changes: 82 additions & 67 deletions commands/sns/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
subscriptionsConfirmed = "SubscriptionsConfirmed"
subscriptionsSumZero = 0
maxEpic = 100
maxProcessingWorkers = 5
)

var (
Expand All @@ -40,7 +41,6 @@ var (
log *zap.SugaredLogger
rootArgs rootArguments
snsSvc *sns.SNS
mux sync.RWMutex
)

type rootArguments struct {
Expand Down Expand Up @@ -166,81 +166,96 @@ func awsSNSToClean() map[string][]string {
topics = topics[:AwsSnsMaxTopic]
}
log.Debugw("Truncated topics list", "nbTopics", len(topics))
var waitGrp sync.WaitGroup
waitGrp := &sync.WaitGroup{}
waitGrp.Add(len(topics))
for _, topic := range topics {
go func(topicARN *string) {
defer waitGrp.Done()
limiter := make(chan bool, maxProcessingWorkers)
snsToCleanChannel := make([]chan map[string][]string, len(topics))
for i, topic := range topics {
snsToCleanChannel[i] = make(chan map[string][]string, 1)
go processing(limiter, snsToCleanChannel[i], waitGrp, topic.TopicArn)
}
waitGrp.Wait()
for i := 0; i < len(topics); i++ {
for k, v := range <-snsToCleanChannel[i] {
snsToClean[k] = v
}
}
return snsToClean
}

topicName := *snsTopicARNToTopicName(topicARN)
tLog := log.With("topicName", topicName)
func processing(limiter chan bool, snsToClean chan map[string][]string, group *sync.WaitGroup, topicARN *string) {
topicName := *snsTopicARNToTopicName(topicARN)
tLog := log.With("topicName", topicName)
tLog.Debugw("start processing", "topicName", topicName)
_snsToClean := make(map[string][]string)
limiter <- true
defer func() {
<-limiter
snsToClean <- _snsToClean
group.Done()
tLog.Debugw("stop processing", "topicName", topicName)
}()

if rootArgs.excludePatten != nil {
if rootArgs.excludePatten.MatchString(topicName) {
tLog.Debug("Skipping topic because of excludePattern", "excludePattern", rootArgs.excludePatten.String())
return
}
}
if rootArgs.excludePatten != nil {
if rootArgs.excludePatten.MatchString(topicName) {
tLog.Debug("Skipping topic because of excludePattern", "excludePattern", rootArgs.excludePatten.String())
return
}
}

if topicARN == nil { // <-- I don't think this make any sense
tLog.Debug("Skipping topic because of nil topicARN")
return
}
tLog.Debugw("Getting topic attributes", "topicName", topicName)
topicAttributes, err := snsSvc.GetTopicAttributes(&sns.GetTopicAttributesInput{TopicArn: topicARN})
if err != nil {
log.Warnw("Cannot get topic attributes", "err", err)
return
}
if CheckTagNameUpdateDate != "" {
tLog.Debug("Update date tag enabled")
skip, err := updateDateToDelete(topicARN)
if topicARN == nil { // <-- I don't think this make any sense
tLog.Debug("Skipping topic because of nil topicARN")
return
}
tLog.Debugw("Getting topic attributes", "topicName", topicName)
topicAttributes, err := snsSvc.GetTopicAttributes(&sns.GetTopicAttributesInput{TopicArn: topicARN})
if err != nil {
log.Warnw("Cannot get topic attributes", "err", err)
return
}
if CheckTagNameUpdateDate != "" {
tLog.Debug("Update date tag enabled")
skip, err := updateDateToDelete(topicARN)
if err != nil {
log.Warnw("Cannot check tag name date update", "err", err)
}
if skip {
log.Infow("Skipping queue because of last update tag",
"tagName", CheckTagNameUpdateDate,
)
return
}
}
topicSubscriptionsSum := 0
if topicAttributes == nil {
return
}
for topicAttributesName, topicAttributesValue := range topicAttributes.Attributes {
// subscriptionsConfirmed
mLog := tLog.With("attributeName", topicAttributesName)

if topicAttributesName == subscriptionsConfirmed || topicAttributesName == subscriptionsPending {
if topicAttributesValue != nil {
mLog.Debugw("Fetched attribute", "attributeValue", *topicAttributesValue)
value, err := strconv.Atoi(*topicAttributesValue)
if err != nil {
log.Warnw("Cannot check tag name date update", "err", err)
}
if skip {
log.Infow("Skipping queue because of last update tag",
"tagName", CheckTagNameUpdateDate,
mLog.Errorw(
"cannot parse value",
"err", err,
"value", *topicAttributesValue,
)
return
continue
}
topicSubscriptionsSum += value
}
topicSubscriptionsSum := 0
if topicAttributes == nil {
return
}
for topicAttributesName, topicAttributesValue := range topicAttributes.Attributes {
// subscriptionsConfirmed
mLog := tLog.With("attributeName", topicAttributesName)

if topicAttributesName == subscriptionsConfirmed || topicAttributesName == subscriptionsPending {
if topicAttributesValue != nil {
mLog.Debugw("Fetched attribute", "attributeValue", *topicAttributesValue)
value, err := strconv.Atoi(*topicAttributesValue)
if err != nil {
mLog.Errorw(
"cannot parse value",
"err", err,
"value", *topicAttributesValue,
)
continue
}
topicSubscriptionsSum += value
}
}
}
if topicSubscriptionsSum == subscriptionsSumZero {
tLog.Debug("Topic unused")
mux.Lock()
snsToClean[*topicARN] = []string{topicName}
mux.Unlock()
} else {
tLog.Debug("Topic used")
}
}(topic.TopicArn)
}
}
if topicSubscriptionsSum == subscriptionsSumZero {
tLog.Debug("Topic unused")
_snsToClean[*topicARN] = []string{topicName}
} else {
tLog.Debug("Topic used")
}
waitGrp.Wait()
return snsToClean
}

func snsTopicARNToTopicName(topicARN *string) *string {
Expand Down
145 changes: 82 additions & 63 deletions commands/sqs/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
const (
metricSumZero = 0
maxCloudWatchMaxDatapoint = 100
maxProcessingWorkers = 5
)

var (
Expand All @@ -39,7 +40,6 @@ var (
log *zap.SugaredLogger
rootArgs rootArguments
sqsSvc *sqs.SQS
mux sync.RWMutex
)

func init() {
Expand Down Expand Up @@ -88,7 +88,7 @@ func awsSQSToClean() map[string][]string {

log.Debug("Initializing Cloudwatch session")
cwSvc := cloudwatch.New(helpers.GetAwsSession(AwsCloudWatchEndpoint))
now := time.Now()

log.Debug("Listing queues")
listQueues, err := sqsSvc.ListQueues(&sqs.ListQueuesInput{QueueNamePrefix: aws.String(AwsSQSQueuePrefix)})
if err != nil {
Expand All @@ -97,74 +97,93 @@ func awsSQSToClean() map[string][]string {
log.Debugw("Queues fetched", "queuesNb", len(listQueues.QueueUrls))
sqsToClean := make(map[string][]string)

var waitGrp sync.WaitGroup
waitGrp := &sync.WaitGroup{}
waitGrp.Add(len(listQueues.QueueUrls))
limiter := make(chan bool, maxProcessingWorkers)
sqsToCleanChannel := make([]chan map[string][]string, len(listQueues.QueueUrls))

for _, queueURL := range listQueues.QueueUrls {
go func(qURL *string) {
queueName := *sqsQueueURLToQueueName(qURL)
qLog := log.With("queueName", queueName)
defer waitGrp.Done()
if qURL != nil {
if rootArgs.excludePatten != nil {
if rootArgs.excludePatten.MatchString(queueName) {
qLog.Debugw("Skipping queue due to exclude pattern ")
return
}
}
if CheckTagNameUpdateDate != "" {
qLog.Debug("Updating SNS usage date tag")
skip, err := updateDateToDelete(qURL)
if err != nil {
log.Warnw("Cannot check tag name date update", "err", err)
}
if skip {
log.Infof("SQS: Skipping (tag name %s): %s", CheckTagNameUpdateDate, *sqsQueueURLToQueueName(qURL))
return
}
}
metrics, err := cwSvc.GetMetricData(GetSQSMetricDataInput(rootArgs.UnusedSinceDate, &now, qURL))
if err != nil {
log.Fatal(err)
}
metricsSum := 0.0
failed := false
for _, result := range metrics.MetricDataResults {
if len(result.Values) != 0 {
for k, value := range result.Values {
qLog.Debugw(
"Metric fetched",
"metricLabel", *result.Label,
"metricValue", *value,
"metricDate", *result.Timestamps[k],
)
metricsSum += *value
}
} else {
qLog.Debugw("No metric value", "metricLabel", *result.Label)
failed = true
}
}
if !failed {
qLog = qLog.With("metricsSum", metricsSum)
if metricsSum == metricSumZero {
qLog.Debug("Queue is unused")
mux.Lock()
sqsToClean[*qURL] = []string{queueName}
mux.Unlock()
} else {
qLog.Debug("Queue is used")
}
} else {
qLog.Debug("Invalid metrics")
}
}
}(queueURL)
for i, queueURL := range listQueues.QueueUrls {
sqsToCleanChannel[i] = make(chan map[string][]string, 1)
go processing(limiter, sqsToCleanChannel[i], waitGrp, cwSvc, queueURL)
}
waitGrp.Wait()
for i := 0; i < len(listQueues.QueueUrls); i++ {
for k, v := range <-sqsToCleanChannel[i] {
sqsToClean[k] = v
}
}

return sqsToClean
}

func processing(limiter chan bool, sqsToClean chan map[string][]string, group *sync.WaitGroup, cwSvc *cloudwatch.CloudWatch, queueURL *string) {
now := time.Now()
queueName := *sqsQueueURLToQueueName(queueURL)
tLog := log.With("queueName", queueName)
tLog.Debugw("start processing", "queueName", queueName)
_sqsToClean := make(map[string][]string)
limiter <- true
defer func() {
<-limiter
sqsToClean <- _sqsToClean
group.Done()
tLog.Debugw("stop processing", "queueName", queueName)
}()

if queueURL != nil {
if rootArgs.excludePatten != nil {
if rootArgs.excludePatten.MatchString(queueName) {
tLog.Debugw("Skipping queue due to exclude pattern ")
return
}
}
if CheckTagNameUpdateDate != "" {
tLog.Debug("Updating SNS usage date tag")
skip, err := updateDateToDelete(queueURL)
if err != nil {
log.Warnw("Cannot check tag name date update", "err", err)
}
if skip {
log.Infof("SQS: Skipping (tag name %s): %s", CheckTagNameUpdateDate, *sqsQueueURLToQueueName(queueURL))
return
}
}
metrics, err := cwSvc.GetMetricData(GetSQSMetricDataInput(rootArgs.UnusedSinceDate, &now, queueURL))
if err != nil {
log.Fatal(err)
}
metricsSum := 0.0
failed := false
for _, result := range metrics.MetricDataResults {
if len(result.Values) != 0 {
for k, value := range result.Values {
tLog.Debugw(
"Metric fetched",
"metricLabel", *result.Label,
"metricValue", *value,
"metricDate", *result.Timestamps[k],
)
metricsSum += *value
}
} else {
tLog.Debugw("No metric value", "metricLabel", *result.Label)
failed = true
}
}
if !failed {
tLog = tLog.With("metricsSum", metricsSum)
if metricsSum == metricSumZero {
tLog.Debug("Queue is unused")
_sqsToClean[*queueURL] = []string{queueName}
} else {
tLog.Debug("Queue is used")
}
} else {
tLog.Debug("Invalid metrics")
}
}
}

type rootArguments struct {
UnusedSince string
UnusedSinceDate *time.Time
Expand Down

0 comments on commit 2216cae

Please sign in to comment.