Skip to content

Commit

Permalink
Replication worker gets stuck on busy retry (#938)
Browse files Browse the repository at this point in the history
Updated the retry poilicy to treat all errors as transient except
BadRequestError.
Returning BadRequestError to processWithRetry on retry count exceeded
and deserialization error to nack the replication task and move the
message to dlq.
  • Loading branch information
samarabbas authored Jul 6, 2018
1 parent c52d560 commit 3651ac8
Showing 1 changed file with 24 additions and 17 deletions.
41 changes: 24 additions & 17 deletions service/worker/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package worker

import (
"errors"
"math"
"sync"
"sync/atomic"
Expand All @@ -47,7 +46,8 @@ import (

type workerStatus int

var errMaxAttemptReached = errors.New("Maximum attempts exceeded")
var errMaxAttemptReached = &shared.BadRequestError{Message: "Maximum attempts exceeded"}
var errDeserializeReplicationTask = &shared.BadRequestError{Message: "Failed to deserialize replication task"}

const (
workerStatusRunning workerStatus = iota
Expand Down Expand Up @@ -104,9 +104,9 @@ type (
)

const (
replicationTaskInitialRetryInterval = 50 * time.Millisecond
replicationTaskMaxRetryInterval = 10 * time.Second
replicationTaskExpirationInterval = 30 * time.Second
replicationTaskInitialRetryInterval = 100 * time.Millisecond
replicationTaskMaxRetryInterval = 2 * time.Second
replicationTaskExpirationInterval = 10 * time.Second
)

var (
Expand Down Expand Up @@ -195,6 +195,9 @@ func (p *replicationTaskProcessor) getRemainingRetryCount(remainingRetryCount in
numWorker := float64(p.config.ReplicatorConcurrency)
retryPercentage := workerInRetry / numWorker

if retryPercentage > 1 || retryPercentage < 0 {
p.logger.Fatal("Worker busy level is out of bound")
}
p.metricsClient.UpdateGauge(metrics.ReplicatorScope, metrics.ReplicatorRetryPercentage, retryPercentage)

min := func(i int64, j int64) int64 {
Expand Down Expand Up @@ -293,11 +296,21 @@ ProcessRetryLoop:
}
remainingRetryCount = p.getRemainingRetryCount(remainingRetryCount)
}

return errMsg
}

err = backoff.Retry(op, replicationTaskRetryPolicy, p.isTransientRetryableError)
if err != nil && p.isTransientRetryableError(err) {
// Emit a warning log on every 1000 attempt to retry a message
if remainingRetryCount%1000 == 0 {
p.logger.WithFields(bark.Fields{
logging.TagErr: err,
logging.TagPartitionKey: msg.Partition(),
logging.TagOffset: msg.Offset(),
}).Warn("Error processing replication task.")
}

// Keep on retrying transient errors for ever
continue ProcessRetryLoop
}
Expand Down Expand Up @@ -330,7 +343,9 @@ func (p *replicationTaskProcessor) process(msg kafka.Message, inRetry bool) erro
logging.TagPartitionKey: msg.Partition(),
logging.TagOffset: msg.Offset(),
}).Error("Failed to deserialize replication task.")
return err

// return BadRequestError so processWithRetry can nack the message
return errDeserializeReplicationTask
}

if task.TaskType == nil {
Expand Down Expand Up @@ -443,18 +458,10 @@ func (p *replicationTaskProcessor) updateFailureMetric(scope int, err error) {

func (p *replicationTaskProcessor) isTransientRetryableError(err error) bool {
switch err.(type) {
case *h.ShardOwnershipLostError:
return true
case *shared.ServiceBusyError:
return true
case *shared.LimitExceededError:
return true
case *shared.InternalServiceError:
return true
case *shared.RetryTaskError:
return true
default:
case *shared.BadRequestError:
return false
default:
return true
}
}

Expand Down

0 comments on commit 3651ac8

Please sign in to comment.