diff --git a/README.md b/README.md index 2583eaa..bd85a13 100644 --- a/README.md +++ b/README.md @@ -319,13 +319,15 @@ name: "this is just an display name" # redis: 1KB payload: | You can define any task information in payload -# retryLimit max value varies on backend type to prevent from overloading backend. -# redis: 10 +# retryLimit is the maximum number of retry (negative number means infinite) +# NOTE: only the limited number of recent task records will be recorded in its status. +# so, if you set large value or infinite here, you will loose old task records. +# please see the description of status.history field in the next section. retryLimit: 3 # timeoutSeconds is for task handler timeout. # If set positive value, task handler timeout for processing this task. # Otherwise, worker's default timeout will be applied. (See 'Task Queue Worker' section) -timeoutSeconds: 600 +timeoutSeconds: 600 ``` #### `Task` @@ -342,8 +344,8 @@ spec: rertryLimit: 3 timeoutSeconds: 100 status: - # Phase of the task. - # See below section for task lifecycle + # Phase of the task. + # See below section for task lifecycle phase: Processing createdAt: 2020-02-12T20:20:29.350631+09:00 # Failure count of the task @@ -356,19 +358,24 @@ status: processUID: 7b7b39f5-da66-4380-8002-033dff0e0f26 # worker name received the task workerName: everpeace-macbookpro-2018.local - # This value is unique among pftaskqueue worker processes + # This value is unique among pftaskqueue worker processes workerUID: 15bfe424-889a-49ca-88d7-fb0fc51f68d # timestamps receivedAt: 2020-02-12T20:20:39.350631+09:00 startedAt: 2020-02-12T20:20:39.351479+09:00 - # history of processing the task. + # history of recent records of processing the task. + # the limited number of recent records are recorded in this field. + # the value varies on backend types to prvent overloading backends: + # - redis: 10 entries + # NOTE: so, if you set larger value than this limit in spec.rertryLimit, + # you will loose old task records. history: - # TaskRecord: - # this represents a record of task handler invocation + # TaskRecord: + # this represents a record of task handler invocation # in a specific worker. # UID of process(generated by pftaskqueue) - processUID: 194b8ad9-543b-4d01-a571-f8d1db2e74e6 - # worker name & UID which received the task + # worker name & UID which received the task workerName: everpeace-macbookpro-2018.local workerUID: 06c333f8-aeab-4a3d-8624-064a305c53ff # timestamps @@ -387,8 +394,8 @@ status: # - Signaled: pftaskqueue worker signaled and the task handler was interrupted # - InternalError: pftaskqueue worker faced some error processing a task reason: Succeeded - # Returned values from the task handlers. - # See 'Task Handler Specification' section for how pftaskqueue worker communicates + # Returned values from the task handlers. + # See 'Task Handler Specification' section for how pftaskqueue worker communicates # with its task handler processes. # payload max size varies on backend type to prevent from overloading backend. # redis: 1KB @@ -398,7 +405,7 @@ status: # redis: 1KB # If size exceeded, the contents will be truncated automatically message: "" - # Below two fields will be set if the worker which processes the task was + # Below two fields will be set if the worker which processes the task was # lost and salvaged by the other worker. # See "Worker lifecycle" section below for details. # salvagedBy: @@ -421,10 +428,10 @@ status: If you queued your `TaskSpec`, `pftaskqueue` assign UID to it and generate `Task` with `Pending` phase for it. Some worker pulled a `Pending` task from the queue, `Task` transits to `Received` phase. When `Task` actually stared to be processed by task handler process, it transits to `Processing` phase. -Once task handler process succeeded, `Task` transits to `Succeeded` phase. If task handler process failed, `pftaskqueue` can handle automatic retry feature with respect to `TaskSpec.retryLimit`. If the task handler process failed and it didn't reach at its retry limit, `pftaskqueue` re-queue the task with setting `Pending` phase again. Otherwise `pftaskqueue` will give up retry and mark it `Failed` phase. You can see all the process record of the `Task` status. +Once task handler process succeeded, `Task` transits to `Succeeded` phase. If task handler process failed, `pftaskqueue` can handle automatic retry feature with respect to `TaskSpec.retryLimit`. If the task handler process failed and it didn't reach at its retry limit, `pftaskqueue` re-queue the task with setting `Pending` phase again. Otherwise `pftaskqueue` will give up retry and mark it `Failed` phase. You can see all the process record of the `Task` status. If worker was signaled, tasks in `Received` or `Processing` phase will be treated as failure and `pftaskqueue` will handle automatic retry feature. - + ```yaml $ pftaskqueue get-task [queue] --state=failed -o yaml ... @@ -510,13 +517,13 @@ worker: # This value will be used when TaskSpec.timeoutSeconds is not set or 0. defaultTimeout: 30m0s # Task Handler Command - # A Worker spawns a process with the command for each received tasks + # A Worker spawns a process with the command for each received tasks commands: - cat # Worker heartbeat configuration to detect worker process existence # Please see "Worker lifecycle" section heartBeat: - # A Worker process tries to update its Worker.Status.lastHeartBeatAt field + # A Worker process tries to update its Worker.Status.lastHeartBeatAt field # stored in queue backend in this interval interval: 2s # A Worker.Status.lastHeartBeatAt will be determined "expired" @@ -531,7 +538,7 @@ worker: exitOnEmpty: false # If exitOnEmpty is true, worker waits for exit in the grace period exitOnEmptyGracePeriod: 10s - # If the value was positive, worker will exit + # If the value was positive, worker will exit # after processing the number of tasks numTasks: 1000 # Base directory to create workspace for task handler processes @@ -590,10 +597,10 @@ status: +------------+ ``` -Once worker started, it starts with `Running` phase. In the startup, a worker register self to the queue and get its UID. The UID becomes the identifier of workers. If worker exited normally (with `exit-code=0`), it transits `Succeeded` phase. If `exit-code` was not 0, it transits to `Failed` phase. +Once worker started, it starts with `Running` phase. In the startup, a worker register self to the queue and get its UID. The UID becomes the identifier of workers. If worker exited normally (with `exit-code=0`), it transits `Succeeded` phase. If `exit-code` was not 0, it transits to `Failed` phase. However, worker process was go away by various reasons (`SIGKILL`-ed, `OOMKiller`, etc.). Then, how to detect those worker's sate? `pftaskquue` applies simple timeout based heuristics. A worker process keeps sending heartbeat during it runs, with configured interval, to the queue by updating its `Status.lastHeartBeatAt` field. If the heartbeat became older then configured expiration duration, the worker was determined as 'Lost' state (`phase=Failed, reason=Lost`). Moreover when a worker detects their own heartbeat expired, they exited by their selves to wait they will be salvaged by other workers. - + On every worker startup, a worker tries to find `Lost` workers which are safe to be salvaged. `pftaskqueue` also used simple timeout-based heuristics in salvation, too. If time passed `Worker.HeartBeat.SalvagedDuration` after its heartbeat expiration, the worker is determined as a salvation target. Once the worker finds some salvation target workers, it will salvage the worker. "Salvation" means - marks the target `Salvaged` phase (`phase=Failed, reason=Salvaged`) @@ -626,18 +633,18 @@ pftaskqueue get-worker [queue] --state=[all,running,succeeded,failed,lost,tosalv ``` {workspace direcoty} │ -│ # pftaskqueue prepares whole the contents -├── input +│ # pftaskqueue prepares whole the contents +├── input │   ├── payload # TaskSpec.payload in text format │   ├── retryLimit # TaskSpec.retryLimit in text format │   ├── timeoutSeconds # TaskSpec.timeoutSeconds in text format │   └── meta │      ├── taskUID # taskUID of the task in text format -│      ├── processUID # prrocessUID of the task handler process +│      ├── processUID # prrocessUID of the task handler process │      ├── task.json # whole task information in JSON format -│      ├── workerName # workerName of the worker process -│      ├── workerUID # workerUID of the worker process -│      └── workerConfig.json # whole workerConfig information in JSON format +│      ├── workerName # workerName of the worker process +│      ├── workerUID # workerUID of the worker process +│      └── workerConfig.json # whole workerConfig information in JSON format │ │ # pftaskqueue just creates the directory │ # If any error happened in reading files in the directory, the task fails with the TaskResult below. @@ -645,7 +652,7 @@ pftaskqueue get-worker [queue] --state=[all,running,succeeded,failed,lost,tosalv │ # reason: "InternalError" │ # message: "...error message..." │ # payload: null -└── output +└── output ├── payload # If the file exists, the contents will record in TaskResult.payload. Null otherwise. │ # Max size of the payload varies on backend type to avoid from overloading backend │ # redis: 1KB @@ -659,7 +666,7 @@ pftaskqueue get-worker [queue] --state=[all,running,succeeded,failed,lost,tosalv # e.g. [{"payload": "foo", "retryLimit": "3", "timeout": "10m"}] 3 directories, 12 files -``` +``` ## Dead letters @@ -681,7 +688,7 @@ $ pftaskqueue get-task [queue] --state=deadletter --output yaml ... ``` -## Managing configurations +## Managing configurations `pftaskqueue` has a lot of configuration parameters. `pftaskqueue` provides multiple ways to configure them. `pftaskqueue` reads configuraton parameter in the following precedence order. Each item takes precedence over the item below it: @@ -738,16 +745,16 @@ redis: # key prefix of redis database # all the key used pftaskqueue was prefixed by '_pftaskqueue:{keyPrefix}:` keyPrefix: omura - + # redis server information(addr, password, db) addr: "" password: "" db: 0 - + # # timeout/connection pool setting # see also: https://github.com/go-redis/redis/blob/a579d58c59af2f8cefbb7f90b8adc4df97f4fd8f/options.go#L59-L95 - # + # dialTimeout: 5s readTimeout: 3s writeTimeout: 3s @@ -757,9 +764,9 @@ redis: poolTimeout: 4s idleTimeout: 5m0s idleCheckFrequency: 1m0s - + # - # pftaskqueue will retry when redis operation failed + # pftaskqueue will retry when redis operation failed # in exponential backoff manner. # you can configure backoff parameters below # @@ -771,7 +778,7 @@ redis: maxElapsedTime: 1m0s # max retry count. -1 means no limit. maxRetry: -1 -``` +``` ## Bash/Zsh completion diff --git a/pkg/apis/task/task.go b/pkg/apis/task/task.go index c5a2aca..7a09358 100644 --- a/pkg/apis/task/task.go +++ b/pkg/apis/task/task.go @@ -178,7 +178,7 @@ func (t *Task) IsWorkerLost(defaultTimeoutSeconds int) bool { return t.Status.CurrentRecord.ReceivedAt.Add(timeout).Before(time.Now()) } -func (t *Task) SetSuccess(payload *string, message *string) error { +func (t *Task) SetSuccess(payload *string, message *string, historyLengthLimit int) error { if t.Status.Phase != TaskPhaseProcessing { return errors.Errorf("invalid status: actual=%s expected=%s", t.Status.Phase, TaskPhaseProcessing) } @@ -205,10 +205,14 @@ func (t *Task) SetSuccess(payload *string, message *string) error { } t.Status.History = append(t.Status.History, *current) + if len(t.Status.History) > historyLengthLimit { + t.Status.History = t.Status.History[len(t.Status.History)-historyLengthLimit:] + } + return nil } -func (t *Task) RecordFailure(reason TaskResultReason, payload *string, message *string) (bool, error) { +func (t *Task) RecordFailure(reason TaskResultReason, payload *string, message *string, historyLengthLimit int) (bool, error) { if t.Status.Phase != TaskPhaseProcessing && t.Status.Phase != TaskPhaseReceived { return false, errors.Errorf("invalid status: actual=%s expected=[%s,%s]", t.Status.Phase, TaskPhaseProcessing, TaskPhaseReceived) } @@ -233,13 +237,16 @@ func (t *Task) RecordFailure(reason TaskResultReason, payload *string, message * t.Status.History = []TaskRecord{} } t.Status.History = append(t.Status.History, *current) + if len(t.Status.History) > historyLengthLimit { + t.Status.History = t.Status.History[len(t.Status.History)-historyLengthLimit:] + } + t.Status.FailureCount = t.Status.FailureCount + 1 requeue := true t.Status.Phase = TaskPhasePending - // no requeue because retry exceeded - if t.Status.FailureCount > t.Spec.RetryLimit { + if t.Spec.RetryLimit >= 0 && t.Status.FailureCount > t.Spec.RetryLimit { requeue = false t.Status.Phase = TaskPhaseFailed } diff --git a/pkg/backend/redis/redis_test.go b/pkg/backend/redis/redis_test.go index 96d1ce0..36ce2c5 100644 --- a/pkg/backend/redis/redis_test.go +++ b/pkg/backend/redis/redis_test.go @@ -89,6 +89,7 @@ var ( TimeoutSeconds: 60, } SampleInvalidTaskSpec = task.TaskSpec{ + Name: strings.Repeat("a", MaxNameLength+1), Payload: strings.Repeat("x", PayloadMaxSizeInKB*KB+1), RetryLimit: 100, TimeoutSeconds: 0, @@ -712,8 +713,8 @@ var _ = Describe("Backend", func() { vErr, ok := err.(*util.ValidationError) Expect(ok).To(Equal(true)) Expect(len(vErr.Errors)).To(Equal(2)) + Expect(vErr.Error()).To(ContainSubstring("TaskSpec.Name max length")) Expect(vErr.Error()).To(ContainSubstring("TaskSpec.Payload max size is")) - Expect(vErr.Error()).To(ContainSubstring("TaskSpec.retryLimit max is")) }) }) When("Spec is valid", func() { diff --git a/pkg/backend/redis/task.go b/pkg/backend/redis/task.go index bc46448..dec1c6d 100644 --- a/pkg/backend/redis/task.go +++ b/pkg/backend/redis/task.go @@ -41,7 +41,7 @@ const ( KB = 1 << 10 PayloadMaxSizeInKB = 1 MessageMaxSizeInKB = 1 - RetryLimitMax = 10 + HistoryLengthMax = 10 MaxNameLength = 1024 ) @@ -650,6 +650,7 @@ func (b *Backend) SetSucceeded(ctx context.Context, queueUID, workerUID uuid.UUI err = t.SetSuccess( util.Truncate(resultPayload, PayloadMaxSizeInKB*KB), util.Truncate(message, MessageMaxSizeInKB*KB), + HistoryLengthMax, ) if err != nil { dlerr := b.invalidMessageDLError( @@ -791,6 +792,7 @@ func (b *Backend) RecordFailure(ctx context.Context, queueUID, workerUID uuid.UU reason, util.Truncate(resultPayload, PayloadMaxSizeInKB*KB), util.Truncate(message, MessageMaxSizeInKB*KB), + HistoryLengthMax, ) if err != nil { dlerr := b.invalidMessageDLError( @@ -924,11 +926,6 @@ func (b *Backend) validateTaskSpec(s task.TaskSpec) error { errors.Errorf("TaskSpec.Payload max size is %d Bytes (actual=%d Bytes)", maxBytes, len(s.Payload)), ) } - if s.RetryLimit > RetryLimitMax { - vErrors = multierror.Append(vErrors, - errors.Errorf("TaskSpec.retryLimit max is %d (actual=%d)", RetryLimitMax, s.RetryLimit), - ) - } if vErrors != nil { return (*util.ValidationError)(vErrors) }