Skip to content

Commit

Permalink
Breaking changes:
Browse files Browse the repository at this point in the history
- Rename WorkflowTaskFailedException to WorkflowExecutorException
- Introduce GenericException to get custom properties

in cloud events:
- retryDelayMillis property renamed to secondsBeforeRetry for consistency
- error renamed to failure
 - new custom properties in exception description
  • Loading branch information
geomagilles committed Jan 15, 2025
1 parent 7d8eb1f commit 8c1e1fa
Show file tree
Hide file tree
Showing 19 changed files with 224 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

package io.infinitic.events.data.services

import io.infinitic.cloudEvents.ERROR
import io.infinitic.cloudEvents.FAILURE
import io.infinitic.cloudEvents.INFINITIC_VERSION
import io.infinitic.cloudEvents.OUTPUT
import io.infinitic.cloudEvents.REQUESTER
Expand Down Expand Up @@ -79,7 +79,7 @@ fun ServiceExecutorEventMessage.toJson(): JsonObject = when (this) {

is TaskRetriedEvent -> JsonObject(
mapOf(
ERROR to failure.toJson(),
FAILURE to failure.toJsonWithoutAttemptDetails(),
TASK_RETRY_DELAY to taskRetryDelay.toJson(),
TASK_RETRY_SEQUENCE to taskRetrySequence.toJson(),
TASK_RETRY_INDEX to taskRetryIndex.toJson(),
Expand All @@ -94,7 +94,7 @@ fun ServiceExecutorEventMessage.toJson(): JsonObject = when (this) {

is TaskFailedEvent -> JsonObject(
mapOf(
ERROR to failure.toJson(),
FAILURE to failure.toJsonWithoutAttemptDetails(),
TASK_RETRY_SEQUENCE to taskRetrySequence.toJson(),
TASK_RETRY_INDEX to taskRetryIndex.toJson(),
SERVICE_NAME to serviceName.toJson(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

package io.infinitic.events.errors

import io.infinitic.cloudEvents.ERROR
import io.infinitic.cloudEvents.FAILURE
import io.infinitic.cloudEvents.METHOD_ID
import io.infinitic.cloudEvents.METHOD_NAME
import io.infinitic.cloudEvents.SERVICE_NAME
Expand All @@ -40,7 +40,7 @@ import io.infinitic.common.tasks.executors.errors.TaskCanceledError
import io.infinitic.common.tasks.executors.errors.TaskFailedError
import io.infinitic.common.tasks.executors.errors.TaskTimedOutError
import io.infinitic.common.tasks.executors.errors.TaskUnknownError
import io.infinitic.common.tasks.executors.errors.WorkflowTaskFailedError
import io.infinitic.common.tasks.executors.errors.WorkflowExecutorError
import io.infinitic.common.utils.toJson
import io.infinitic.events.types.EXECUTOR_FAILED
import io.infinitic.events.types.REMOTE_METHOD_CANCELED
Expand All @@ -55,16 +55,16 @@ import kotlinx.serialization.json.JsonObject


fun DeferredError.toJson(): Pair<String, JsonObject> = when (this) {
is WorkflowTaskFailedError -> EXECUTOR_FAILED to JsonObject(
is WorkflowExecutorError -> EXECUTOR_FAILED to JsonObject(
mapOf(
ERROR to lastFailure.toJson(),
FAILURE to lastFailure.toJson(),
TASK_ID to workflowTaskId.toJson(),
),
)

is TaskFailedError -> TASK_FAILED to JsonObject(
mapOf(
ERROR to failure.toJson(),
FAILURE to lastFailure.toJson(),
TASK_ID to taskId.toJson(),
TASK_NAME to methodName.toJson(),
SERVICE_NAME to serviceName.toJson(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@

package io.infinitic.cloudEvents

const val ERROR = "error"
const val PREVIOUS = "previous"
const val OUTPUT = "output"
const val WORKER_NAME = "workerName"
const val INFINITIC_VERSION = "infiniticVersion"
Expand All @@ -35,7 +33,7 @@ const val TIMEOUT = "timeout"
const val SERVICE_NAME = "serviceName"
const val TASK_RETRY_SEQUENCE = "retrySequence"
const val TASK_RETRY_INDEX = "retryIndex"
const val TASK_RETRY_DELAY = "retryDelayMillis"
const val TASK_RETRY_DELAY = "secondsBeforeRetry"
const val TASK_META = "taskMeta"
const val TASK_TAGS = "taskTags"
const val TASK_NAME = "taskName"
Expand Down Expand Up @@ -63,8 +61,12 @@ const val TIMER_ID = "timerId"
const val TIMER_DURATION = "timerDuration"
const val TIMER_INSTANT = "timerInstant"

const val FAILURE = "failure"
const val FAILURE_STACKTRACE = "stackTrace"
const val FAILURE_PREVIOUS = "previousFailure"

const val ERROR = "error"
const val ERROR_NAME = "name"
const val ERROR_MESSAGE = "message"
const val ERROR_STACKTRACE = "stackTrace"
const val ERROR_CAUSE = "cause"
const val ERROR_DATA = "data"
const val ERROR_CUSTOM_PROPERTIES = "customProperties"
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import io.infinitic.common.workflows.engine.messages.RemoteTaskCompleted
import io.infinitic.common.workflows.engine.messages.RemoteTaskFailed
import io.infinitic.currentVersion
import io.infinitic.exceptions.DeferredException
import io.infinitic.tasks.TaskExceptionDetail
import io.infinitic.exceptions.GenericException
import io.infinitic.tasks.TaskFailure
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
Expand Down Expand Up @@ -157,7 +157,7 @@ data class TaskFailedEvent(
serviceName = serviceName,
methodName = methodName,
taskId = taskId,
failure = failure,
lastFailure = failure,
),
deferredError = deferredError,
emitterName = emitterName,
Expand Down Expand Up @@ -189,7 +189,8 @@ data class TaskFailedEvent(
retrySequence = msg.taskRetrySequence.toInt(),
retryIndex = msg.taskRetryIndex.toInt(),
secondsBeforeRetry = 0.0,
exceptionDetail = TaskExceptionDetail.from(cause),
stackTraceString = cause.stackTraceToString(),
exception = GenericException.from(cause),
previousFailure = msg.lastFailure,
),
deferredError = cause.deferredError,
Expand Down Expand Up @@ -239,7 +240,8 @@ data class TaskRetriedEvent(
retrySequence = msg.taskRetrySequence.toInt(),
retryIndex = msg.taskRetryIndex.toInt(),
secondsBeforeRetry = delay.toSeconds(),
exceptionDetail = TaskExceptionDetail.from(cause),
stackTraceString = cause.stackTraceToString(),
exception = GenericException.from(cause),
previousFailure = msg.lastFailure,
),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ import io.infinitic.exceptions.TaskFailedException
import io.infinitic.exceptions.TaskTimedOutException
import io.infinitic.exceptions.TaskUnknownException
import io.infinitic.exceptions.WorkflowCanceledException
import io.infinitic.exceptions.WorkflowExecutorException
import io.infinitic.exceptions.WorkflowFailedException
import io.infinitic.exceptions.WorkflowTaskFailedException
import io.infinitic.exceptions.WorkflowTimedOutException
import io.infinitic.exceptions.WorkflowUnknownException
import io.infinitic.tasks.TaskFailure
Expand Down Expand Up @@ -75,7 +75,7 @@ sealed class DeferredFailedError : DeferredError() {
when (exception) {
is TaskFailedException -> TaskFailedError.from(exception)
is WorkflowFailedException -> MethodFailedError.from(exception)
is WorkflowTaskFailedException -> WorkflowTaskFailedError.from(exception)
is WorkflowExecutorException -> WorkflowExecutorError.from(exception)
}
}
}
Expand Down Expand Up @@ -123,7 +123,7 @@ data class TaskUnknownError(

@AvroDefault(Avro.NULL) val methodName: MethodName?,

/** Id of the unknown task */
/** ID of the unknown task */
val taskId: TaskId
) : DeferredUnknownError() {
companion object {
Expand All @@ -143,13 +143,13 @@ data class MethodUnknownError(
/** Name of the unknown workflow */
val workflowName: WorkflowName,

/** Id of the unknown workflow */
/** ID of the unknown workflow */
val workflowId: WorkflowId,

@AvroDefault(Avro.NULL)
val workflowMethodName: MethodName?,

/** Id of the unknown workflow' method run */
/** ID of the unknown workflow' method run */
@AvroName("methodRunId")
val workflowMethodId: WorkflowMethodId?

Expand All @@ -173,7 +173,7 @@ data class TaskTimedOutError(
@SerialName("taskName")
val serviceName: ServiceName,

/** Id of the timed-out task */
/** ID of the timed-out task */
val taskId: TaskId,

/** Method of the timed-out task */
Expand All @@ -197,14 +197,14 @@ data class MethodTimedOutError(
/** Name of timed-out child workflow */
val workflowName: WorkflowName,

/** Id of timed-out child workflow */
/** ID of timed-out child workflow */
val workflowId: WorkflowId,

/** Method of timed-out child workflow */
@AvroName("methodName")
val workflowMethodName: MethodName,

/** Id of the methodRun */
/** ID of the methodRun */
@AvroName("methodRunId")
val workflowMethodId: WorkflowMethodId?
) : DeferredTimedOutError() {
Expand All @@ -226,7 +226,7 @@ data class TaskCanceledError(
/** Name of canceled task */
@SerialName("taskName") val serviceName: ServiceName,

/** Id of canceled task */
/** ID of canceled task */
val taskId: TaskId,

/** Method of canceled task */
Expand All @@ -249,13 +249,13 @@ data class MethodCanceledError(
/** Name of canceled child workflow */
val workflowName: WorkflowName,

/** Id of canceled child workflow */
/** ID of canceled child workflow */
val workflowId: WorkflowId,

@AvroDefault(Avro.NULL)
val workflowMethodName: MethodName?,

/** Id of the methodRun */
/** ID of the methodRun */
@AvroName("methodRunId")
val workflowMethodId: WorkflowMethodId?,
) : DeferredCanceledError() {
Expand All @@ -280,19 +280,19 @@ data class TaskFailedError(
/** Method of failed task */
val methodName: MethodName,

/** Id of failed task */
/** ID of failed task */
val taskId: TaskId,

/** cause of the error */
@SerialName("cause") val failure: TaskFailure
@SerialName("cause") val lastFailure: TaskFailure
) : DeferredFailedError() {
companion object {
fun from(e: TaskFailedException) =
TaskFailedError(
serviceName = ServiceName(e.serviceName),
methodName = MethodName(e.methodName),
taskId = TaskId(e.taskId),
failure = e.failure,
lastFailure = e.lastFailure,
)
}
}
Expand All @@ -304,14 +304,14 @@ data class MethodFailedError(
/** Name of failed child workflow */
val workflowName: WorkflowName,

/** Id of failed child workflow */
/** ID of failed child workflow */
val workflowId: WorkflowId,

/** Method of failed child workflow */
@AvroName("methodName")
val workflowMethodName: MethodName,

/** Id of failed method run */
/** ID of failed method run */
@AvroName("methodRunId")
val workflowMethodId: WorkflowMethodId?,

Expand All @@ -330,25 +330,25 @@ data class MethodFailedError(
}
}

/** Error occurring when waiting a failed workflow */
/** Error occurring in Workflow Task */
@Serializable
@SerialName("FailedWorkflowTaskError")
data class WorkflowTaskFailedError(
data class WorkflowExecutorError(
/** Name of failed workflow */
val workflowName: WorkflowName,

/** Id of failed workflow */
/** ID of failed workflow */
val workflowId: WorkflowId,

/** Id of failed workflow task */
/** ID of failed workflow task */
val workflowTaskId: TaskId,

/** cause of the error */
@SerialName("cause") val lastFailure: TaskFailure
) : DeferredFailedError() {
companion object {
fun from(e: WorkflowTaskFailedException): WorkflowTaskFailedError =
WorkflowTaskFailedError(
fun from(e: WorkflowExecutorException): WorkflowExecutorError =
WorkflowExecutorError(
workflowName = WorkflowName(e.workflowName),
workflowId = WorkflowId(e.workflowId),
workflowTaskId = TaskId(e.workflowTaskId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import io.infinitic.common.workflows.data.workflowTasks.isWorkflowTask
import io.infinitic.common.workflows.data.workflows.WorkflowId
import io.infinitic.common.workflows.data.workflows.WorkflowName
import io.infinitic.currentVersion
import io.infinitic.tasks.TaskExceptionDetail
import io.infinitic.exceptions.GenericException
import io.infinitic.tasks.TaskFailure
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
Expand Down Expand Up @@ -144,7 +144,8 @@ data class ExecuteTask(
retrySequence = msg.taskRetrySequence.toInt(),
retryIndex = msg.taskRetryIndex.toInt(),
secondsBeforeRetry = delay.toSeconds(),
exceptionDetail = TaskExceptionDetail.from(cause),
stackTraceString = cause.stackTraceToString(),
exception = GenericException.from(cause),
previousFailure = msg.lastFailure,
),
)
Expand Down
Loading

0 comments on commit 8c1e1fa

Please sign in to comment.