Skip to content

Commit

Permalink
Merge pull request #39 from pfnet-research/infinite-max-retry
Browse files Browse the repository at this point in the history
Support infinite retry.  Task.Status.History now records the limited number of recent processing records.
  • Loading branch information
everpeace authored May 28, 2021
2 parents 5599568 + 3f3abd8 commit 3fb88ba
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 46 deletions.
77 changes: 42 additions & 35 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -319,13 +319,15 @@ name: "this is just an display name"
# redis: 1KB
payload: |
You can define any task information in payload
# retryLimit max value varies on backend type to prevent from overloading backend.
# redis: 10
# retryLimit is the maximum number of retry (negative number means infinite)
# NOTE: only the limited number of recent task records will be recorded in its status.
# so, if you set large value or infinite here, you will loose old task records.
# please see the description of status.history field in the next section.
retryLimit: 3
# timeoutSeconds is for task handler timeout.
# If set positive value, task handler timeout for processing this task.
# Otherwise, worker's default timeout will be applied. (See 'Task Queue Worker' section)
timeoutSeconds: 600
timeoutSeconds: 600
```
#### `Task`
Expand All @@ -342,8 +344,8 @@ spec:
rertryLimit: 3
timeoutSeconds: 100
status:
# Phase of the task.
# See below section for task lifecycle
# Phase of the task.
# See below section for task lifecycle
phase: Processing
createdAt: 2020-02-12T20:20:29.350631+09:00
# Failure count of the task
Expand All @@ -356,19 +358,24 @@ status:
processUID: 7b7b39f5-da66-4380-8002-033dff0e0f26
# worker name received the task
workerName: everpeace-macbookpro-2018.local
# This value is unique among pftaskqueue worker processes
# This value is unique among pftaskqueue worker processes
workerUID: 15bfe424-889a-49ca-88d7-fb0fc51f68d
# timestamps
receivedAt: 2020-02-12T20:20:39.350631+09:00
startedAt: 2020-02-12T20:20:39.351479+09:00
# history of processing the task.
# history of recent records of processing the task.
# the limited number of recent records are recorded in this field.
# the value varies on backend types to prvent overloading backends:
# - redis: 10 entries
# NOTE: so, if you set larger value than this limit in spec.rertryLimit,
# you will loose old task records.
history:
# TaskRecord:
# this represents a record of task handler invocation
# TaskRecord:
# this represents a record of task handler invocation
# in a specific worker.
# UID of process(generated by pftaskqueue)
- processUID: 194b8ad9-543b-4d01-a571-f8d1db2e74e6
# worker name & UID which received the task
# worker name & UID which received the task
workerName: everpeace-macbookpro-2018.local
workerUID: 06c333f8-aeab-4a3d-8624-064a305c53ff
# timestamps
Expand All @@ -387,8 +394,8 @@ status:
# - Signaled: pftaskqueue worker signaled and the task handler was interrupted
# - InternalError: pftaskqueue worker faced some error processing a task
reason: Succeeded
# Returned values from the task handlers.
# See 'Task Handler Specification' section for how pftaskqueue worker communicates
# Returned values from the task handlers.
# See 'Task Handler Specification' section for how pftaskqueue worker communicates
# with its task handler processes.
# payload max size varies on backend type to prevent from overloading backend.
# redis: 1KB
Expand All @@ -398,7 +405,7 @@ status:
# redis: 1KB
# If size exceeded, the contents will be truncated automatically
message: ""
# Below two fields will be set if the worker which processes the task was
# Below two fields will be set if the worker which processes the task was
# lost and salvaged by the other worker.
# See "Worker lifecycle" section below for details.
# salvagedBy: <workerUID>
Expand All @@ -421,10 +428,10 @@ status:
If you queued your `TaskSpec`, `pftaskqueue` assign UID to it and generate `Task` with `Pending` phase for it. Some worker pulled a `Pending` task from the queue, `Task` transits to `Received` phase. When `Task` actually stared to be processed by task handler process, it transits to `Processing` phase.
Once task handler process succeeded, `Task` transits to `Succeeded` phase. If task handler process failed, `pftaskqueue` can handle automatic retry feature with respect to `TaskSpec.retryLimit`. If the task handler process failed and it didn't reach at its retry limit, `pftaskqueue` re-queue the task with setting `Pending` phase again. Otherwise `pftaskqueue` will give up retry and mark it `Failed` phase. You can see all the process record of the `Task` status.
Once task handler process succeeded, `Task` transits to `Succeeded` phase. If task handler process failed, `pftaskqueue` can handle automatic retry feature with respect to `TaskSpec.retryLimit`. If the task handler process failed and it didn't reach at its retry limit, `pftaskqueue` re-queue the task with setting `Pending` phase again. Otherwise `pftaskqueue` will give up retry and mark it `Failed` phase. You can see all the process record of the `Task` status.
If worker was signaled, tasks in `Received` or `Processing` phase will be treated as failure and `pftaskqueue` will handle automatic retry feature.
```yaml
$ pftaskqueue get-task [queue] --state=failed -o yaml
...
Expand Down Expand Up @@ -510,13 +517,13 @@ worker:
# This value will be used when TaskSpec.timeoutSeconds is not set or 0.
defaultTimeout: 30m0s
# Task Handler Command
# A Worker spawns a process with the command for each received tasks
# A Worker spawns a process with the command for each received tasks
commands:
- cat
# Worker heartbeat configuration to detect worker process existence
# Please see "Worker lifecycle" section
heartBeat:
# A Worker process tries to update its Worker.Status.lastHeartBeatAt field
# A Worker process tries to update its Worker.Status.lastHeartBeatAt field
# stored in queue backend in this interval
interval: 2s
# A Worker.Status.lastHeartBeatAt will be determined "expired"
Expand All @@ -531,7 +538,7 @@ worker:
exitOnEmpty: false
# If exitOnEmpty is true, worker waits for exit in the grace period
exitOnEmptyGracePeriod: 10s
# If the value was positive, worker will exit
# If the value was positive, worker will exit
# after processing the number of tasks
numTasks: 1000
# Base directory to create workspace for task handler processes
Expand Down Expand Up @@ -590,10 +597,10 @@ status:
+------------+
```

Once worker started, it starts with `Running` phase. In the startup, a worker register self to the queue and get its UID. The UID becomes the identifier of workers. If worker exited normally (with `exit-code=0`), it transits `Succeeded` phase. If `exit-code` was not 0, it transits to `Failed` phase.
Once worker started, it starts with `Running` phase. In the startup, a worker register self to the queue and get its UID. The UID becomes the identifier of workers. If worker exited normally (with `exit-code=0`), it transits `Succeeded` phase. If `exit-code` was not 0, it transits to `Failed` phase.

However, worker process was go away by various reasons (`SIGKILL`-ed, `OOMKiller`, etc.). Then, how to detect those worker's sate? `pftaskquue` applies simple timeout based heuristics. A worker process keeps sending heartbeat during it runs, with configured interval, to the queue by updating its `Status.lastHeartBeatAt` field. If the heartbeat became older then configured expiration duration, the worker was determined as 'Lost' state (`phase=Failed, reason=Lost`). Moreover when a worker detects their own heartbeat expired, they exited by their selves to wait they will be salvaged by other workers.

On every worker startup, a worker tries to find `Lost` workers which are safe to be salvaged. `pftaskqueue` also used simple timeout-based heuristics in salvation, too. If time passed `Worker.HeartBeat.SalvagedDuration` after its heartbeat expiration, the worker is determined as a salvation target. Once the worker finds some salvation target workers, it will salvage the worker. "Salvation" means

- marks the target `Salvaged` phase (`phase=Failed, reason=Salvaged`)
Expand Down Expand Up @@ -626,26 +633,26 @@ pftaskqueue get-worker [queue] --state=[all,running,succeeded,failed,lost,tosalv
```
{workspace direcoty}
│ # pftaskqueue prepares whole the contents
├── input
│ # pftaskqueue prepares whole the contents
├── input
│   ├── payload # TaskSpec.payload in text format
│   ├── retryLimit # TaskSpec.retryLimit in text format
│   ├── timeoutSeconds # TaskSpec.timeoutSeconds in text format
│   └── meta
│      ├── taskUID # taskUID of the task in text format
│      ├── processUID # prrocessUID of the task handler process
│      ├── processUID # prrocessUID of the task handler process
│      ├── task.json # whole task information in JSON format
│      ├── workerName # workerName of the worker process
│      ├── workerUID # workerUID of the worker process
│      └── workerConfig.json # whole workerConfig information in JSON format
│      ├── workerName # workerName of the worker process
│      ├── workerUID # workerUID of the worker process
│      └── workerConfig.json # whole workerConfig information in JSON format
│ # pftaskqueue just creates the directory
│ # If any error happened in reading files in the directory, the task fails with the TaskResult below.
│ # type: "Failure"
│ # reason: "InternalError"
│ # message: "...error message..."
│ # payload: null
└── output
└── output
├── payload # If the file exists, the contents will record in TaskResult.payload. Null otherwise.
│ # Max size of the payload varies on backend type to avoid from overloading backend
│ # redis: 1KB
Expand All @@ -659,7 +666,7 @@ pftaskqueue get-worker [queue] --state=[all,running,succeeded,failed,lost,tosalv
# e.g. [{"payload": "foo", "retryLimit": "3", "timeout": "10m"}]
3 directories, 12 files
```
```

## Dead letters

Expand All @@ -681,7 +688,7 @@ $ pftaskqueue get-task [queue] --state=deadletter --output yaml
...
```

## Managing configurations
## Managing configurations

`pftaskqueue` has a lot of configuration parameters. `pftaskqueue` provides multiple ways to configure them. `pftaskqueue` reads configuraton parameter in the following precedence order. Each item takes precedence over the item below it:

Expand Down Expand Up @@ -738,16 +745,16 @@ redis:
# key prefix of redis database
# all the key used pftaskqueue was prefixed by '_pftaskqueue:{keyPrefix}:`
keyPrefix: omura

# redis server information(addr, password, db)
addr: ""
password: ""
db: 0

#
# timeout/connection pool setting
# see also: https://github.com/go-redis/redis/blob/a579d58c59af2f8cefbb7f90b8adc4df97f4fd8f/options.go#L59-L95
#
#
dialTimeout: 5s
readTimeout: 3s
writeTimeout: 3s
Expand All @@ -757,9 +764,9 @@ redis:
poolTimeout: 4s
idleTimeout: 5m0s
idleCheckFrequency: 1m0s

#
# pftaskqueue will retry when redis operation failed
# pftaskqueue will retry when redis operation failed
# in exponential backoff manner.
# you can configure backoff parameters below
#
Expand All @@ -771,7 +778,7 @@ redis:
maxElapsedTime: 1m0s
# max retry count. -1 means no limit.
maxRetry: -1
```
```
## Bash/Zsh completion
Expand Down
15 changes: 11 additions & 4 deletions pkg/apis/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (t *Task) IsWorkerLost(defaultTimeoutSeconds int) bool {
return t.Status.CurrentRecord.ReceivedAt.Add(timeout).Before(time.Now())
}

func (t *Task) SetSuccess(payload *string, message *string) error {
func (t *Task) SetSuccess(payload *string, message *string, historyLengthLimit int) error {
if t.Status.Phase != TaskPhaseProcessing {
return errors.Errorf("invalid status: actual=%s expected=%s", t.Status.Phase, TaskPhaseProcessing)
}
Expand All @@ -205,10 +205,14 @@ func (t *Task) SetSuccess(payload *string, message *string) error {
}
t.Status.History = append(t.Status.History, *current)

if len(t.Status.History) > historyLengthLimit {
t.Status.History = t.Status.History[len(t.Status.History)-historyLengthLimit:]
}

return nil
}

func (t *Task) RecordFailure(reason TaskResultReason, payload *string, message *string) (bool, error) {
func (t *Task) RecordFailure(reason TaskResultReason, payload *string, message *string, historyLengthLimit int) (bool, error) {
if t.Status.Phase != TaskPhaseProcessing && t.Status.Phase != TaskPhaseReceived {
return false, errors.Errorf("invalid status: actual=%s expected=[%s,%s]", t.Status.Phase, TaskPhaseProcessing, TaskPhaseReceived)
}
Expand All @@ -233,13 +237,16 @@ func (t *Task) RecordFailure(reason TaskResultReason, payload *string, message *
t.Status.History = []TaskRecord{}
}
t.Status.History = append(t.Status.History, *current)
if len(t.Status.History) > historyLengthLimit {
t.Status.History = t.Status.History[len(t.Status.History)-historyLengthLimit:]
}

t.Status.FailureCount = t.Status.FailureCount + 1

requeue := true
t.Status.Phase = TaskPhasePending

// no requeue because retry exceeded
if t.Status.FailureCount > t.Spec.RetryLimit {
if t.Spec.RetryLimit >= 0 && t.Status.FailureCount > t.Spec.RetryLimit {
requeue = false
t.Status.Phase = TaskPhaseFailed
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/backend/redis/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ var (
TimeoutSeconds: 60,
}
SampleInvalidTaskSpec = task.TaskSpec{
Name: strings.Repeat("a", MaxNameLength+1),
Payload: strings.Repeat("x", PayloadMaxSizeInKB*KB+1),
RetryLimit: 100,
TimeoutSeconds: 0,
Expand Down Expand Up @@ -712,8 +713,8 @@ var _ = Describe("Backend", func() {
vErr, ok := err.(*util.ValidationError)
Expect(ok).To(Equal(true))
Expect(len(vErr.Errors)).To(Equal(2))
Expect(vErr.Error()).To(ContainSubstring("TaskSpec.Name max length"))
Expect(vErr.Error()).To(ContainSubstring("TaskSpec.Payload max size is"))
Expect(vErr.Error()).To(ContainSubstring("TaskSpec.retryLimit max is"))
})
})
When("Spec is valid", func() {
Expand Down
9 changes: 3 additions & 6 deletions pkg/backend/redis/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const (
KB = 1 << 10
PayloadMaxSizeInKB = 1
MessageMaxSizeInKB = 1
RetryLimitMax = 10
HistoryLengthMax = 10
MaxNameLength = 1024
)

Expand Down Expand Up @@ -650,6 +650,7 @@ func (b *Backend) SetSucceeded(ctx context.Context, queueUID, workerUID uuid.UUI
err = t.SetSuccess(
util.Truncate(resultPayload, PayloadMaxSizeInKB*KB),
util.Truncate(message, MessageMaxSizeInKB*KB),
HistoryLengthMax,
)
if err != nil {
dlerr := b.invalidMessageDLError(
Expand Down Expand Up @@ -791,6 +792,7 @@ func (b *Backend) RecordFailure(ctx context.Context, queueUID, workerUID uuid.UU
reason,
util.Truncate(resultPayload, PayloadMaxSizeInKB*KB),
util.Truncate(message, MessageMaxSizeInKB*KB),
HistoryLengthMax,
)
if err != nil {
dlerr := b.invalidMessageDLError(
Expand Down Expand Up @@ -924,11 +926,6 @@ func (b *Backend) validateTaskSpec(s task.TaskSpec) error {
errors.Errorf("TaskSpec.Payload max size is %d Bytes (actual=%d Bytes)", maxBytes, len(s.Payload)),
)
}
if s.RetryLimit > RetryLimitMax {
vErrors = multierror.Append(vErrors,
errors.Errorf("TaskSpec.retryLimit max is %d (actual=%d)", RetryLimitMax, s.RetryLimit),
)
}
if vErrors != nil {
return (*util.ValidationError)(vErrors)
}
Expand Down

0 comments on commit 3fb88ba

Please sign in to comment.