diff --git a/buildSrc/src/main/kotlin/Ci.kt b/buildSrc/src/main/kotlin/Ci.kt index 1727734ce..4a186015b 100644 --- a/buildSrc/src/main/kotlin/Ci.kt +++ b/buildSrc/src/main/kotlin/Ci.kt @@ -25,7 +25,7 @@ object Ci { private const val SNAPSHOT = "-SNAPSHOT" // base version number - private const val BASE = "0.16.4" + private const val BASE = "0.17.0" // GitHub run number private val githubRunNumber = System.getenv("GITHUB_RUN_NUMBER") diff --git a/buildSrc/src/main/kotlin/Libs.kt b/buildSrc/src/main/kotlin/Libs.kt index 5450154b3..454c9d31c 100644 --- a/buildSrc/src/main/kotlin/Libs.kt +++ b/buildSrc/src/main/kotlin/Libs.kt @@ -30,7 +30,7 @@ object Libs { } object Coroutines { - private const val version = "1.8.1" + private const val version = "1.10.1" const val core = "org.jetbrains.kotlinx:kotlinx-coroutines-core:$version" const val jdk8 = "org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:$version" } @@ -40,7 +40,7 @@ object Libs { } object Serialization { - private const val version = "1.7.1" + private const val version = "1.8.0" const val json = "org.jetbrains.kotlinx:kotlinx-serialization-json:$version" const val core = "org.jetbrains.kotlinx:kotlinx-serialization-core:$version" } @@ -57,7 +57,7 @@ object Libs { } object Jackson { - private const val version = "2.17.2" + private const val version = "2.18.2" const val core = "com.fasterxml.jackson.core:jackson-core:$version" const val databind = "com.fasterxml.jackson.core:jackson-databind:$version" const val kotlin = "com.fasterxml.jackson.module:jackson-module-kotlin:$version" @@ -76,14 +76,14 @@ object Libs { } object TestContainers { - private const val version = "1.20.1" + private const val version = "1.20.4" const val testcontainers = "org.testcontainers:testcontainers:$version" const val mysql = "org.testcontainers:mysql:$version" const val postgresql = "org.testcontainers:postgresql:$version" } object Mockk { - const val mockk = "io.mockk:mockk:1.13.12" + const val mockk = "io.mockk:mockk:1.13.14" } object Avro4k { @@ -91,7 +91,7 @@ object Libs { } object Hoplite { - private const val version = "2.7.5" + private const val version = "2.9.0" const val core = "com.sksamuel.hoplite:hoplite-core:$version" const val yaml = "com.sksamuel.hoplite:hoplite-yaml:$version" } @@ -120,7 +120,7 @@ object Libs { } object Logging { - const val jvm = "io.github.oshai:kotlin-logging-jvm:7.0.0" + const val jvm = "io.github.oshai:kotlin-logging-jvm:7.0.3" } object Compress { diff --git a/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/data/services/serviceData.kt b/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/data/services/serviceData.kt index 62cc6d7f1..ecb946a06 100644 --- a/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/data/services/serviceData.kt +++ b/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/data/services/serviceData.kt @@ -23,7 +23,7 @@ package io.infinitic.events.data.services -import io.infinitic.cloudEvents.ERROR +import io.infinitic.cloudEvents.FAILURE import io.infinitic.cloudEvents.INFINITIC_VERSION import io.infinitic.cloudEvents.OUTPUT import io.infinitic.cloudEvents.REQUESTER @@ -79,7 +79,7 @@ fun ServiceExecutorEventMessage.toJson(): JsonObject = when (this) { is TaskRetriedEvent -> JsonObject( mapOf( - ERROR to lastError.toJson(), + FAILURE to failure.toJsonWithoutAttemptDetails(), TASK_RETRY_DELAY to taskRetryDelay.toJson(), TASK_RETRY_SEQUENCE to taskRetrySequence.toJson(), TASK_RETRY_INDEX to taskRetryIndex.toJson(), @@ -94,7 +94,7 @@ fun ServiceExecutorEventMessage.toJson(): JsonObject = when (this) { is TaskFailedEvent -> JsonObject( mapOf( - ERROR to executionError.toJson(), + FAILURE to failure.toJsonWithoutAttemptDetails(), TASK_RETRY_SEQUENCE to taskRetrySequence.toJson(), TASK_RETRY_INDEX to taskRetryIndex.toJson(), SERVICE_NAME to serviceName.toJson(), diff --git a/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/errors/errors.kt b/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/errors/errors.kt index 324086944..3ab80e7ae 100644 --- a/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/errors/errors.kt +++ b/infinitic-cloudevents/src/main/kotlin/io/infinitic/events/errors/errors.kt @@ -23,7 +23,7 @@ package io.infinitic.events.errors -import io.infinitic.cloudEvents.ERROR +import io.infinitic.cloudEvents.FAILURE import io.infinitic.cloudEvents.METHOD_ID import io.infinitic.cloudEvents.METHOD_NAME import io.infinitic.cloudEvents.SERVICE_NAME @@ -40,7 +40,7 @@ import io.infinitic.common.tasks.executors.errors.TaskCanceledError import io.infinitic.common.tasks.executors.errors.TaskFailedError import io.infinitic.common.tasks.executors.errors.TaskTimedOutError import io.infinitic.common.tasks.executors.errors.TaskUnknownError -import io.infinitic.common.tasks.executors.errors.WorkflowTaskFailedError +import io.infinitic.common.tasks.executors.errors.WorkflowExecutorError import io.infinitic.common.utils.toJson import io.infinitic.events.types.EXECUTOR_FAILED import io.infinitic.events.types.REMOTE_METHOD_CANCELED @@ -55,16 +55,16 @@ import kotlinx.serialization.json.JsonObject fun DeferredError.toJson(): Pair = when (this) { - is WorkflowTaskFailedError -> EXECUTOR_FAILED to JsonObject( + is WorkflowExecutorError -> EXECUTOR_FAILED to JsonObject( mapOf( - ERROR to cause.toJson(), + FAILURE to lastFailure.toJson(), TASK_ID to workflowTaskId.toJson(), ), ) is TaskFailedError -> TASK_FAILED to JsonObject( mapOf( - ERROR to cause.toJson(), + FAILURE to lastFailure.toJson(), TASK_ID to taskId.toJson(), TASK_NAME to methodName.toJson(), SERVICE_NAME to serviceName.toJson(), diff --git a/infinitic-common/src/main/kotlin/io/infinitic/cloudEvents/properties.kt b/infinitic-common/src/main/kotlin/io/infinitic/cloudEvents/properties.kt index 06acb2c0a..f280990ed 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/cloudEvents/properties.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/cloudEvents/properties.kt @@ -23,7 +23,6 @@ package io.infinitic.cloudEvents -const val ERROR = "error" const val OUTPUT = "output" const val WORKER_NAME = "workerName" const val INFINITIC_VERSION = "infiniticVersion" @@ -34,7 +33,7 @@ const val TIMEOUT = "timeout" const val SERVICE_NAME = "serviceName" const val TASK_RETRY_SEQUENCE = "retrySequence" const val TASK_RETRY_INDEX = "retryIndex" -const val TASK_RETRY_DELAY = "retryDelayMillis" +const val TASK_RETRY_DELAY = "secondsBeforeRetry" const val TASK_META = "taskMeta" const val TASK_TAGS = "taskTags" const val TASK_NAME = "taskName" @@ -62,7 +61,12 @@ const val TIMER_ID = "timerId" const val TIMER_DURATION = "timerDuration" const val TIMER_INSTANT = "timerInstant" +const val FAILURE = "failure" +const val FAILURE_STACKTRACE = "stackTrace" +const val FAILURE_PREVIOUS = "previousFailure" + +const val ERROR = "error" const val ERROR_NAME = "name" const val ERROR_MESSAGE = "message" -const val ERROR_STACKTRACE = "stackTrace" const val ERROR_CAUSE = "cause" +const val ERROR_CUSTOM_PROPERTIES = "customProperties" diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/clients/messages/ClientMessage.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/clients/messages/ClientMessage.kt index fcec51e21..88022838f 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/clients/messages/ClientMessage.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/clients/messages/ClientMessage.kt @@ -37,11 +37,11 @@ import io.infinitic.common.tasks.data.TaskId import io.infinitic.common.tasks.data.TaskMeta import io.infinitic.common.tasks.data.TaskTag import io.infinitic.common.tasks.executors.errors.DeferredError -import io.infinitic.common.tasks.executors.errors.ExecutionError import io.infinitic.common.workflows.data.workflowMethods.WorkflowMethodId import io.infinitic.common.workflows.data.workflows.WorkflowId import io.infinitic.common.workflows.data.workflows.WorkflowName import io.infinitic.common.workflows.data.workflows.WorkflowTag +import io.infinitic.tasks.TaskFailure import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable @@ -84,7 +84,7 @@ data class TaskCompleted( data class TaskFailed( override val recipientName: ClientName, override val taskId: TaskId, - val cause: ExecutionError, + val cause: TaskFailure, override val emitterName: EmitterName ) : ClientMessage(), TaskMessage diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/data/MillisDuration.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/data/MillisDuration.kt index 904b22e5f..17ac15bf5 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/data/MillisDuration.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/data/MillisDuration.kt @@ -45,6 +45,8 @@ data class MillisDuration(val millis: Long) : Comparable, JsonAble { operator fun minus(other: MillisDuration) = MillisDuration(this.millis - other.millis) operator fun plus(other: MillisInstant) = MillisInstant(this.millis + other.long) + + fun toSeconds(): Double = millis / 1000.0 } object MillisDurationSerializer : KSerializer { diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/exceptions/thisShouldNotHappen.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/exceptions/thisShouldNotHappen.kt index eb2164f94..4bfd9fafe 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/exceptions/thisShouldNotHappen.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/exceptions/thisShouldNotHappen.kt @@ -22,12 +22,13 @@ */ package io.infinitic.common.exceptions -fun thisShouldNotHappen(reason: String? = null): Nothing = - throw RuntimeException( - "this should not happen${ - when (reason) { - null -> "" - else -> ": $reason" - } - }", - ) +internal class ShouldNotHappenException(reason: String? = null) : RuntimeException( + "this should not happen${ + when (reason) { + null -> "" + else -> ": $reason" + } + }", +) + +fun thisShouldNotHappen(reason: String? = null): Nothing = throw ShouldNotHappenException(reason) diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/tasks/events/messages/ServiceExecutorEventMessage.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/tasks/events/messages/ServiceExecutorEventMessage.kt index e47af169e..60261a033 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/tasks/events/messages/ServiceExecutorEventMessage.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/tasks/events/messages/ServiceExecutorEventMessage.kt @@ -48,16 +48,17 @@ import io.infinitic.common.tasks.data.TaskRetrySequence import io.infinitic.common.tasks.data.TaskReturnValue import io.infinitic.common.tasks.data.TaskTag import io.infinitic.common.tasks.executors.errors.DeferredError -import io.infinitic.common.tasks.executors.errors.ExecutionError import io.infinitic.common.tasks.executors.errors.TaskFailedError import io.infinitic.common.tasks.executors.messages.ExecuteTask import io.infinitic.common.tasks.tags.messages.RemoveTaskIdFromTag -import io.infinitic.common.workers.data.WorkerName import io.infinitic.common.workflows.data.workflowTasks.isWorkflowTask import io.infinitic.common.workflows.engine.messages.RemoteTaskCompleted import io.infinitic.common.workflows.engine.messages.RemoteTaskFailed import io.infinitic.currentVersion import io.infinitic.exceptions.DeferredException +import io.infinitic.exceptions.GenericException +import io.infinitic.tasks.TaskFailure +import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable @Serializable @@ -129,7 +130,7 @@ data class TaskFailedEvent( override val clientWaiting: Boolean?, override val taskTags: Set, override val taskMeta: TaskMeta, - val executionError: ExecutionError, + @SerialName("executionError") val failure: TaskFailure, val deferredError: DeferredError?, ) : ServiceExecutorEventMessage() { @@ -138,7 +139,7 @@ data class TaskFailedEvent( true -> TaskFailed( recipientName = requester.clientName, taskId = taskId, - cause = executionError, + cause = failure, emitterName = emitterName, ) @@ -156,7 +157,7 @@ data class TaskFailedEvent( serviceName = serviceName, methodName = methodName, taskId = taskId, - cause = executionError, + lastFailure = failure, ), deferredError = deferredError, emitterName = emitterName, @@ -183,7 +184,15 @@ data class TaskFailedEvent( clientWaiting = msg.clientWaiting, taskTags = msg.taskTags, taskMeta = TaskMeta(meta), - executionError = cause.getExecutionError(emitterName), + failure = TaskFailure( + workerName = emitterName.toString(), + retrySequence = msg.taskRetrySequence.toInt(), + retryIndex = msg.taskRetryIndex.toInt(), + secondsBeforeRetry = 0.0, + stackTraceString = cause.stackTraceToString(), + exception = GenericException.from(cause), + previousFailure = msg.lastFailure, + ), deferredError = cause.deferredError, ) } @@ -204,7 +213,7 @@ data class TaskRetriedEvent( override val taskTags: Set, override val taskMeta: TaskMeta, val taskRetryDelay: MillisDuration, - val lastError: ExecutionError, + @SerialName("lastError") val failure: TaskFailure, ) : ServiceExecutorEventMessage() { companion object { @@ -220,13 +229,21 @@ data class TaskRetriedEvent( taskId = msg.taskId, emitterName = emitterName, taskRetrySequence = msg.taskRetrySequence, - taskRetryIndex = msg.taskRetryIndex + 1, + taskRetryIndex = msg.taskRetryIndex, requester = msg.requester ?: thisShouldNotHappen(), clientWaiting = msg.clientWaiting, taskTags = msg.taskTags, taskMeta = TaskMeta(meta), taskRetryDelay = delay, - lastError = cause.getExecutionError(emitterName), + failure = TaskFailure( + workerName = emitterName.toString(), + retrySequence = msg.taskRetrySequence.toInt(), + retryIndex = msg.taskRetryIndex.toInt(), + secondsBeforeRetry = delay.toSeconds(), + stackTraceString = cause.stackTraceToString(), + exception = GenericException.from(cause), + previousFailure = msg.lastFailure, + ), ) } } @@ -335,5 +352,3 @@ private val Throwable.deferredError false -> null } -private fun Throwable.getExecutionError(emitterName: EmitterName) = - ExecutionError.from(WorkerName.from(emitterName), this) diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/tasks/executors/errors/DeferredError.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/tasks/executors/errors/DeferredError.kt index e29526ecc..839a02ff9 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/tasks/executors/errors/DeferredError.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/tasks/executors/errors/DeferredError.kt @@ -42,10 +42,11 @@ import io.infinitic.exceptions.TaskFailedException import io.infinitic.exceptions.TaskTimedOutException import io.infinitic.exceptions.TaskUnknownException import io.infinitic.exceptions.WorkflowCanceledException +import io.infinitic.exceptions.WorkflowExecutorException import io.infinitic.exceptions.WorkflowFailedException -import io.infinitic.exceptions.WorkflowTaskFailedException import io.infinitic.exceptions.WorkflowTimedOutException import io.infinitic.exceptions.WorkflowUnknownException +import io.infinitic.tasks.TaskFailure import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable @@ -74,7 +75,7 @@ sealed class DeferredFailedError : DeferredError() { when (exception) { is TaskFailedException -> TaskFailedError.from(exception) is WorkflowFailedException -> MethodFailedError.from(exception) - is WorkflowTaskFailedException -> WorkflowTaskFailedError.from(exception) + is WorkflowExecutorException -> WorkflowExecutorError.from(exception) } } } @@ -120,9 +121,9 @@ data class TaskUnknownError( @SerialName("taskName") val serviceName: ServiceName, - @AvroDefault(Avro.Companion.NULL) val methodName: MethodName?, + @AvroDefault(Avro.NULL) val methodName: MethodName?, - /** Id of the unknown task */ + /** ID of the unknown task */ val taskId: TaskId ) : DeferredUnknownError() { companion object { @@ -142,13 +143,13 @@ data class MethodUnknownError( /** Name of the unknown workflow */ val workflowName: WorkflowName, - /** Id of the unknown workflow */ + /** ID of the unknown workflow */ val workflowId: WorkflowId, @AvroDefault(Avro.NULL) val workflowMethodName: MethodName?, - /** Id of the unknown workflow' method run */ + /** ID of the unknown workflow' method run */ @AvroName("methodRunId") val workflowMethodId: WorkflowMethodId? @@ -172,7 +173,7 @@ data class TaskTimedOutError( @SerialName("taskName") val serviceName: ServiceName, - /** Id of the timed-out task */ + /** ID of the timed-out task */ val taskId: TaskId, /** Method of the timed-out task */ @@ -196,14 +197,14 @@ data class MethodTimedOutError( /** Name of timed-out child workflow */ val workflowName: WorkflowName, - /** Id of timed-out child workflow */ + /** ID of timed-out child workflow */ val workflowId: WorkflowId, /** Method of timed-out child workflow */ @AvroName("methodName") val workflowMethodName: MethodName, - /** Id of the methodRun */ + /** ID of the methodRun */ @AvroName("methodRunId") val workflowMethodId: WorkflowMethodId? ) : DeferredTimedOutError() { @@ -225,7 +226,7 @@ data class TaskCanceledError( /** Name of canceled task */ @SerialName("taskName") val serviceName: ServiceName, - /** Id of canceled task */ + /** ID of canceled task */ val taskId: TaskId, /** Method of canceled task */ @@ -248,13 +249,13 @@ data class MethodCanceledError( /** Name of canceled child workflow */ val workflowName: WorkflowName, - /** Id of canceled child workflow */ + /** ID of canceled child workflow */ val workflowId: WorkflowId, @AvroDefault(Avro.NULL) val workflowMethodName: MethodName?, - /** Id of the methodRun */ + /** ID of the methodRun */ @AvroName("methodRunId") val workflowMethodId: WorkflowMethodId?, ) : DeferredCanceledError() { @@ -279,11 +280,11 @@ data class TaskFailedError( /** Method of failed task */ val methodName: MethodName, - /** Id of failed task */ + /** ID of failed task */ val taskId: TaskId, /** cause of the error */ - val cause: ExecutionError + @SerialName("cause") val lastFailure: TaskFailure ) : DeferredFailedError() { companion object { fun from(e: TaskFailedException) = @@ -291,7 +292,7 @@ data class TaskFailedError( serviceName = ServiceName(e.serviceName), methodName = MethodName(e.methodName), taskId = TaskId(e.taskId), - cause = ExecutionError.from(e.workerException), + lastFailure = e.lastFailure, ) } } @@ -303,14 +304,14 @@ data class MethodFailedError( /** Name of failed child workflow */ val workflowName: WorkflowName, - /** Id of failed child workflow */ + /** ID of failed child workflow */ val workflowId: WorkflowId, /** Method of failed child workflow */ @AvroName("methodName") val workflowMethodName: MethodName, - /** Id of failed method run */ + /** ID of failed method run */ @AvroName("methodRunId") val workflowMethodId: WorkflowMethodId?, @@ -329,29 +330,29 @@ data class MethodFailedError( } } -/** Error occurring when waiting a failed workflow */ +/** Error occurring in Workflow Task */ @Serializable @SerialName("FailedWorkflowTaskError") -data class WorkflowTaskFailedError( +data class WorkflowExecutorError( /** Name of failed workflow */ val workflowName: WorkflowName, - /** Id of failed workflow */ + /** ID of failed workflow */ val workflowId: WorkflowId, - /** Id of failed workflow task */ + /** ID of failed workflow task */ val workflowTaskId: TaskId, /** cause of the error */ - val cause: ExecutionError + @SerialName("cause") val lastFailure: TaskFailure ) : DeferredFailedError() { companion object { - fun from(e: WorkflowTaskFailedException): WorkflowTaskFailedError = - WorkflowTaskFailedError( + fun from(e: WorkflowExecutorException): WorkflowExecutorError = + WorkflowExecutorError( workflowName = WorkflowName(e.workflowName), workflowId = WorkflowId(e.workflowId), workflowTaskId = TaskId(e.workflowTaskId), - cause = ExecutionError.from(e.workerException), + lastFailure = e.lastFailure, ) } } diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/tasks/executors/errors/ExecutionError.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/tasks/executors/errors/ExecutionError.kt deleted file mode 100644 index 472340bd3..000000000 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/tasks/executors/errors/ExecutionError.kt +++ /dev/null @@ -1,99 +0,0 @@ -/** - * "Commons Clause" License Condition v1.0 - * - * The Software is provided to you by the Licensor under the License, as defined below, subject to - * the following condition. - * - * Without limiting other conditions in the License, the grant of rights under the License will not - * include, and the License does not grant to you, the right to Sell the Software. - * - * For purposes of the foregoing, “Sell” means practicing any or all of the rights granted to you - * under the License to provide to third parties, for a fee or other consideration (including - * without limitation fees for hosting or consulting/ support services related to the Software), a - * product or service whose value derives, entirely or substantially, from the functionality of the - * Software. Any license notice or attribution required by the License must also include this - * Commons Clause License Condition notice. - * - * Software: Infinitic - * - * License: MIT License (https://opensource.org/licenses/MIT) - * - * Licensor: infinitic.io - */ -package io.infinitic.common.tasks.executors.errors - -import com.github.avrokotlin.avro4k.AvroName -import com.github.avrokotlin.avro4k.AvroNamespace -import io.infinitic.cloudEvents.ERROR_CAUSE -import io.infinitic.cloudEvents.ERROR_MESSAGE -import io.infinitic.cloudEvents.ERROR_NAME -import io.infinitic.cloudEvents.ERROR_STACKTRACE -import io.infinitic.common.utils.JsonAble -import io.infinitic.common.utils.toJson -import io.infinitic.common.workers.data.WorkerName -import io.infinitic.exceptions.WorkerException -import kotlinx.serialization.Serializable -import kotlinx.serialization.json.JsonObject -import kotlinx.serialization.json.JsonPrimitive - -/** Data class representing an error */ -@Serializable -@AvroNamespace("io.infinitic.tasks.executor") -@AvroName("WorkerError") -data class ExecutionError( - /** Name of the worker */ - val workerName: WorkerName, - - /** Name of the error */ - val name: String, - - /** Message of the error */ - val message: String?, - - /** String version of the stack trace */ - val stackTraceToString: String, - - /** cause of the error */ - val cause: ExecutionError? -) : JsonAble { - companion object { - fun from(exception: WorkerException): ExecutionError = - ExecutionError( - workerName = WorkerName(exception.workerName), - name = exception.name, - message = exception.message, - stackTraceToString = exception.stackTraceToString, - cause = exception.cause?.let { from(it) }, - ) - - fun from(workerName: WorkerName, throwable: Throwable): ExecutionError = - ExecutionError( - workerName = workerName, - name = throwable::class.java.name, - message = throwable.message, - stackTraceToString = throwable.stackTraceToString(), - cause = when (val cause = throwable.cause) { - null, throwable -> null - else -> from(workerName, cause) - }, - ) - } - - // we remove end of line for stackTraceToString of the output to preserve logs - override fun toString(): String = this::class.java.simpleName + "(" + - listOf( - "name" to name, - "message" to message, - "stackTrace" to stackTraceToString.replace("\n", ""), - "cause" to cause, - ).joinToString { "${it.first}=${it.second}" } + ")" - - override fun toJson(): JsonObject = JsonObject( - mapOf( - ERROR_NAME to JsonPrimitive(name), - ERROR_MESSAGE to JsonPrimitive(message), - ERROR_STACKTRACE to JsonPrimitive(stackTraceToString), - ERROR_CAUSE to cause.toJson(), - ), - ) -} diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/tasks/executors/messages/ServiceExecutorMessage.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/tasks/executors/messages/ServiceExecutorMessage.kt index 25023bc3e..b04348d78 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/tasks/executors/messages/ServiceExecutorMessage.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/tasks/executors/messages/ServiceExecutorMessage.kt @@ -28,6 +28,7 @@ import com.github.avrokotlin.avro4k.AvroName import com.github.avrokotlin.avro4k.AvroNamespace import io.infinitic.common.clients.data.ClientName import io.infinitic.common.data.MessageId +import io.infinitic.common.data.MillisDuration import io.infinitic.common.data.Version import io.infinitic.common.data.methods.MethodArgs import io.infinitic.common.data.methods.MethodName @@ -45,14 +46,14 @@ import io.infinitic.common.tasks.data.TaskMeta import io.infinitic.common.tasks.data.TaskRetryIndex import io.infinitic.common.tasks.data.TaskRetrySequence import io.infinitic.common.tasks.data.TaskTag -import io.infinitic.common.tasks.executors.errors.ExecutionError import io.infinitic.common.workers.config.WorkflowVersion -import io.infinitic.common.workers.data.WorkerName import io.infinitic.common.workflows.data.workflowMethods.WorkflowMethodId import io.infinitic.common.workflows.data.workflowTasks.isWorkflowTask import io.infinitic.common.workflows.data.workflows.WorkflowId import io.infinitic.common.workflows.data.workflows.WorkflowName import io.infinitic.currentVersion +import io.infinitic.exceptions.GenericException +import io.infinitic.tasks.TaskFailure import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable @@ -90,17 +91,17 @@ data class ExecuteTask( override val taskRetrySequence: TaskRetrySequence, override val taskRetryIndex: TaskRetryIndex, @AvroDefault(Avro.NULL) override var requester: Requester?, - @Deprecated("Not used since version 0.13.0") val workflowName: WorkflowName? = null, - @Deprecated("Not used since version 0.13.0") val workflowId: WorkflowId? = null, - @Deprecated("Not used since version 0.13.0") @AvroName("methodRunId") val workflowMethodId: WorkflowMethodId? = null, val taskTags: Set, val taskMeta: TaskMeta, val clientWaiting: Boolean, val methodName: MethodName, val methodParameterTypes: MethodParameterTypes?, @SerialName("methodParameters") val methodArgs: MethodArgs, - val lastError: ExecutionError?, - @Deprecated("Not used since version 0.13.0") @AvroDefault(Avro.NULL) val workflowVersion: WorkflowVersion? = null + @SerialName("lastError") val lastFailure: TaskFailure?, + @Deprecated("Not used after 0.13.0") val workflowName: WorkflowName? = null, + @Deprecated("Not used after 0.13.0") val workflowId: WorkflowId? = null, + @Deprecated("Not used after 0.13.0") @AvroName("methodRunId") val workflowMethodId: WorkflowMethodId? = null, + @Deprecated("Not used after 0.13.0") @AvroDefault(Avro.NULL) val workflowVersion: WorkflowVersion? = null ) : ServiceExecutorMessage() { init { @@ -122,6 +123,7 @@ data class ExecuteTask( fun retryFrom( msg: ExecuteTask, emitterName: EmitterName, + delay: MillisDuration, cause: Throwable, meta: Map ) = ExecuteTask( @@ -137,10 +139,15 @@ data class ExecuteTask( methodName = msg.methodName, methodParameterTypes = msg.methodParameterTypes, methodArgs = msg.methodArgs, - lastError = cause.getExecutionError(emitterName), + lastFailure = TaskFailure( + workerName = emitterName.toString(), + retrySequence = msg.taskRetrySequence.toInt(), + retryIndex = msg.taskRetryIndex.toInt(), + secondsBeforeRetry = delay.toSeconds(), + stackTraceString = cause.stackTraceToString(), + exception = GenericException.from(cause), + previousFailure = msg.lastFailure, + ), ) } } - -private fun Throwable.getExecutionError(emitterName: EmitterName) = - ExecutionError.from(WorkerName.from(emitterName), this) diff --git a/infinitic-common/src/main/kotlin/io/infinitic/common/workflows/engine/commands/dispatchTask.kt b/infinitic-common/src/main/kotlin/io/infinitic/common/workflows/engine/commands/dispatchTask.kt index 59f96ea0b..19bf7f4b2 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/common/workflows/engine/commands/dispatchTask.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/common/workflows/engine/commands/dispatchTask.kt @@ -55,7 +55,7 @@ suspend fun InfiniticProducer.dispatchTask( methodName = methodName, methodParameterTypes = methodParameterTypes, methodArgs = methodParameters, - lastError = null, + lastFailure = null, emitterName = emitterName, ) } diff --git a/infinitic-common/src/main/kotlin/io/infinitic/exceptions/DeferredException.kt b/infinitic-common/src/main/kotlin/io/infinitic/exceptions/DeferredException.kt index e5a11e382..750fe9289 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/exceptions/DeferredException.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/exceptions/DeferredException.kt @@ -35,91 +35,80 @@ import io.infinitic.common.tasks.executors.errors.TaskCanceledError import io.infinitic.common.tasks.executors.errors.TaskFailedError import io.infinitic.common.tasks.executors.errors.TaskTimedOutError import io.infinitic.common.tasks.executors.errors.TaskUnknownError -import io.infinitic.common.tasks.executors.errors.WorkflowTaskFailedError +import io.infinitic.common.tasks.executors.errors.WorkflowExecutorError +import io.infinitic.tasks.TaskFailure import kotlinx.serialization.SerialName -import kotlinx.serialization.Serializable -import kotlinx.serialization.Transient /** * DeferredException are use-facing exceptions. * * They can be thrown when an exception occurs while synchronously waiting for a deferred. */ - - -@Serializable sealed class DeferredException : kotlin.RuntimeException() { abstract val description: String companion object { - fun from(error: DeferredError) = - when (error) { - is DeferredUnknownError -> DeferredUnknownException.from(error) - is DeferredTimedOutError -> DeferredTimedOutException.from(error) - is DeferredCanceledError -> DeferredCanceledException.from(error) - is DeferredFailedError -> DeferredFailedException.from(error) - } + fun from(error: DeferredError) = when (error) { + is DeferredUnknownError -> DeferredUnknownException.from(error) + is DeferredTimedOutError -> DeferredTimedOutException.from(error) + is DeferredCanceledError -> DeferredCanceledException.from(error) + is DeferredFailedError -> DeferredFailedException.from(error) + } + + internal const val REMOTE_TASK_ERROR = "Unable to get the result of a remote task. " + internal const val REMOTE_WORKFLOW_ERROR = "Unable to get the result of a remote workflow. " } + } -@Serializable sealed class DeferredUnknownException : DeferredException() { companion object { - fun from(error: DeferredUnknownError) = - when (error) { - is TaskUnknownError -> TaskUnknownException.from(error) - is MethodUnknownError -> WorkflowUnknownException.from(error) - } + fun from(error: DeferredUnknownError) = when (error) { + is TaskUnknownError -> TaskUnknownException.from(error) + is MethodUnknownError -> WorkflowUnknownException.from(error) + } } } -@Serializable sealed class DeferredTimedOutException : DeferredException() { companion object { - fun from(error: DeferredTimedOutError) = - when (error) { - is TaskTimedOutError -> TaskTimedOutException.from(error) - is MethodTimedOutError -> WorkflowTimedOutException.from(error) - } + fun from(error: DeferredTimedOutError) = when (error) { + is TaskTimedOutError -> TaskTimedOutException.from(error) + is MethodTimedOutError -> WorkflowTimedOutException.from(error) + } } } -@Serializable sealed class DeferredCanceledException : DeferredException() { companion object { - fun from(error: DeferredCanceledError) = - when (error) { - is TaskCanceledError -> TaskCanceledException.from(error) - is MethodCanceledError -> WorkflowCanceledException.from(error) - } + fun from(error: DeferredCanceledError) = when (error) { + is TaskCanceledError -> TaskCanceledException.from(error) + is MethodCanceledError -> WorkflowCanceledException.from(error) + } } } -@Serializable sealed class DeferredFailedException : DeferredException() { companion object { - fun from(error: DeferredFailedError) = - when (error) { - is TaskFailedError -> TaskFailedException.from(error) - is MethodFailedError -> WorkflowFailedException.from(error) - is WorkflowTaskFailedError -> WorkflowTaskFailedException.from(error) - } + fun from(error: DeferredFailedError) = when (error) { + is TaskFailedError -> TaskFailedException.from(error) + is MethodFailedError -> WorkflowFailedException.from(error) + is WorkflowExecutorError -> WorkflowExecutorException.from(error) + } } } /** Exception occurring when waiting for an unknown task */ -@Serializable data class TaskUnknownException( /** Name of the canceled task */ @SerialName("taskName") val serviceName: String, val methodName: String? = null, - /** Id of the canceled task */ + /** ID of the canceled task */ val taskId: String ) : DeferredUnknownException() { - @Transient - override val description = "Unable to fetch the result of a remote task. " + + override val description = REMOTE_TASK_ERROR + "It appears this task has either already terminated or is not recognized " + "(serviceName: $serviceName, " + (methodName?.let { "methodName: $methodName, " } ?: "") + @@ -136,12 +125,11 @@ data class TaskUnknownException( } /** Exception occurring when waiting for an unknown workflow */ -@Serializable data class WorkflowUnknownException( /** Name of the canceled child workflow */ val workflowName: String, - /** Id of the canceled child workflow */ + /** ID of the canceled child workflow */ val workflowId: String, val workflowMethodName: String?, @@ -149,8 +137,7 @@ data class WorkflowUnknownException( /** Id of the methodRun */ val workflowMethodId: String? ) : DeferredUnknownException() { - @Transient - override val description = "Unable to fetch the result of a remote workflow. " + + override val description = REMOTE_WORKFLOW_ERROR + "It appears this workflow has either already terminated or is not recognized " + "(workflowName: $workflowName, workflowId: $workflowId" + (workflowMethodName?.let { ", methodName: $it" } ?: "") + @@ -169,19 +156,17 @@ data class WorkflowUnknownException( } /** Exception occurring when waiting for a timed-out task */ -@Serializable data class TaskTimedOutException( /** Name of the canceled task */ - @SerialName("taskName") val serviceName: String, + val serviceName: String, - /** Id of the canceled task */ + /** ID of the canceled task */ val taskId: String, /** Method called */ val methodName: String ) : DeferredTimedOutException() { - @Transient - override val description = "Unable to fetch the result of a remote task. " + + override val description = REMOTE_TASK_ERROR + "The time allotted for this task has expired. " + "(Service Name: $serviceName, Method Name: $methodName, Task ID: $taskId)" @@ -196,12 +181,11 @@ data class TaskTimedOutException( } /** Error occurring when waiting for a timed-out workflow */ -@Serializable data class WorkflowTimedOutException( /** Name of the canceled child workflow */ val workflowName: String, - /** Id of the canceled child workflow */ + /** ID of the canceled child workflow */ val workflowId: String, val workflowMethodName: String, @@ -209,8 +193,7 @@ data class WorkflowTimedOutException( /** Id of the methodRun */ val workflowMethodId: String? ) : DeferredTimedOutException() { - @Transient - override val description = "Unable to fetch the result of a remote workflow. " + + override val description = REMOTE_WORKFLOW_ERROR + "The time allotted for this workflow has expired. " + "(Workflow Name: $workflowName, Workflow ID: $workflowId, Method Name: $workflowMethodName" + (workflowMethodId?.let { ", Method ID: $it" } ?: "") + @@ -228,19 +211,17 @@ data class WorkflowTimedOutException( } /** Exception occurring when waiting for a canceled task */ -@Serializable data class TaskCanceledException( /** Name of the canceled task */ - @SerialName("taskName") val serviceName: String, + val serviceName: String, - /** Id of the canceled task */ + /** ID of the canceled task */ val taskId: String, /** Method called */ val methodName: String ) : DeferredCanceledException() { - @Transient - override val description = "Unable to fetch the result of a remote task. " + + override val description = REMOTE_TASK_ERROR + "It appears this task has been canceled " + "(Service Name: $serviceName, Method Name: $methodName, Task ID: $taskId)" @@ -255,21 +236,19 @@ data class TaskCanceledException( } /** Exception occurring when waiting for a canceled workflow */ -@Serializable data class WorkflowCanceledException( /** Name of the canceled child workflow */ val workflowName: String, - /** Id of the canceled child workflow */ + /** ID of the canceled child workflow */ val workflowId: String, val workflowMethodName: String?, - /** Id of the methodRun */ + /** ID of the method execution */ val workflowMethodId: String? ) : DeferredCanceledException() { - @Transient - override val description = "Unable to fetch the result of a remote workflow. " + + override val description = REMOTE_WORKFLOW_ERROR + "It appears this workflow has been canceled " + "(Workflow Name: $workflowName, Workflow ID: $workflowId" + (workflowMethodName?.let { ", Method Name: $it" } ?: "") + @@ -288,22 +267,21 @@ data class WorkflowCanceledException( } /** Exception occurring when waiting fora failed task */ -@Serializable data class TaskFailedException( /** Name of the task where the error occurred */ - @SerialName("taskName") val serviceName: String, + val serviceName: String, - /** Id of the task where the error occurred */ + /** ID of the task where the error occurred */ val taskId: String, /** Method called where the error occurred */ val methodName: String, - /** cause of the error */ - val workerException: WorkerException + /** Description of the last failure **/ + val lastFailure: TaskFailure ) : DeferredFailedException() { - @Transient - override val description = "Unable to fetch the result of a remote task. " + + + override val description = REMOTE_TASK_ERROR + "It appears this task has failed " + "(Service Name: $serviceName, Method Name: $methodName, Task ID: $taskId)" @@ -312,34 +290,32 @@ data class TaskFailedException( serviceName = error.serviceName.toString(), taskId = error.taskId.toString(), methodName = error.methodName.toString(), - workerException = WorkerException.from(error.cause), + lastFailure = error.lastFailure, ) } } /** Exception occurring when waiting fora failed task */ -@Serializable data class WorkflowFailedException( /** Name of the workflow where the error occurred */ val workflowName: String, - /** Id of the workflow where the error occurred */ + /** ID of the workflow where the error occurred */ val workflowId: String, /** Method called where the error occurred */ val workflowMethodName: String, /** Id of the methodRun */ - val workflowMethodId: String?, + val workflowMethodId: String, /** cause of the error */ val deferredException: DeferredException ) : DeferredFailedException() { - @Transient - override val description = "Unable to fetch the result of a remote workflow. " + + override val description = REMOTE_WORKFLOW_ERROR + "It appears this workflow has failed " + "(Workflow Name: $workflowName, Workflow ID: $workflowId, Method Name: $workflowMethodName" + - (workflowMethodId?.let { ", Method ID: $it" } ?: "") + + workflowMethodId.let { ", Method ID: $it" } + ")." companion object { @@ -348,38 +324,36 @@ data class WorkflowFailedException( workflowName = error.workflowName.toString(), workflowId = error.workflowId.toString(), workflowMethodName = error.workflowMethodName.toString(), - workflowMethodId = error.workflowMethodId.toString(), + workflowMethodId = (error.workflowMethodId ?: error.workflowId).toString(), deferredException = from(error.deferredError), ) } } /** Exception occurred during a workflow task */ -@Serializable -data class WorkflowTaskFailedException( +data class WorkflowExecutorException( /** Name of the workflow for which the error occurred */ val workflowName: String, - /** Id of the workflow for which the error occurred */ + /** ID of the workflow for which the error occurred */ val workflowId: String, - /** Id of the workflow task for which the error occurred */ + /** ID of the workflow task for which the error occurred */ val workflowTaskId: String, - /** cause of the error */ - val workerException: WorkerException + /** Description of the last lastFailure **/ + val lastFailure: TaskFailure ) : DeferredFailedException() { - @Transient override val description = "Unable to continue the execution of this workflow. An exception has raised." companion object { - fun from(error: WorkflowTaskFailedError): WorkflowTaskFailedException = - WorkflowTaskFailedException( + fun from(error: WorkflowExecutorError): WorkflowExecutorException = + WorkflowExecutorException( workflowName = error.workflowName.toString(), workflowId = error.workflowId.toString(), workflowTaskId = error.workflowTaskId.toString(), - workerException = WorkerException.from(error.cause), + lastFailure = error.lastFailure, ) } } diff --git a/infinitic-common/src/main/kotlin/io/infinitic/exceptions/GenericException.kt b/infinitic-common/src/main/kotlin/io/infinitic/exceptions/GenericException.kt new file mode 100644 index 000000000..ff8ae71db --- /dev/null +++ b/infinitic-common/src/main/kotlin/io/infinitic/exceptions/GenericException.kt @@ -0,0 +1,139 @@ +/** + * "Commons Clause" License Condition v1.0 + * + * The Software is provided to you by the Licensor under the License, as defined below, subject to + * the following condition. + * + * Without limiting other conditions in the License, the grant of rights under the License will not + * include, and the License does not grant to you, the right to Sell the Software. + * + * For purposes of the foregoing, “Sell” means practicing any or all of the rights granted to you + * under the License to provide to third parties, for a fee or other consideration (including + * without limitation fees for hosting or consulting/ support services related to the Software), a + * product or service whose value derives, entirely or substantially, from the functionality of the + * Software. Any license notice or attribution required by the License must also include this + * Commons Clause License Condition notice. + * + * Software: Infinitic + * + * License: MIT License (https://opensource.org/licenses/MIT) + * + * Licensor: infinitic.io + */ +package io.infinitic.exceptions + +import com.github.avrokotlin.avro4k.AvroNamespace +import io.infinitic.cloudEvents.ERROR_CAUSE +import io.infinitic.cloudEvents.ERROR_CUSTOM_PROPERTIES +import io.infinitic.cloudEvents.ERROR_MESSAGE +import io.infinitic.cloudEvents.ERROR_NAME +import io.infinitic.common.serDe.SerializedData +import io.infinitic.common.utils.JsonAble +import io.infinitic.common.utils.toJson +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.JsonObject +import kotlinx.serialization.json.JsonPrimitive +import kotlin.reflect.KVisibility +import kotlin.reflect.full.memberProperties +import kotlin.reflect.jvm.isAccessible + +/** Data class representing an error */ +@Serializable +@AvroNamespace("io.infinitic.tasks.executor") +data class GenericException( + /** Name of the original exception */ + val name: String, + + /** Message of the original exception */ + override val message: String?, + + /** Serialized custom properties of the original exception **/ + private val serializedCustomProperties: Map, + + /** cause of the original exception */ + override val cause: GenericException? +) : Exception(), JsonAble { + + private val _customProperties: Map by lazy { + serializedCustomProperties.mapValues { (_, v) -> + try { + v.decode(null, null) + } catch (e: Exception) { + "Non deserializable object: ${v.toJson()}" + } + } + } + + /** Custom properties of the original exception **/ + fun getCustomProperties(): Map = _customProperties + + /** Custom property of the original exception **/ + fun getCustomProperty(name: String): Any? = _customProperties[name] + + companion object { + + fun from(throwable: Throwable): GenericException = + GenericException( + name = throwable::class.java.name, + message = throwable.message, + serializedCustomProperties = captureCustomProperties(throwable), + cause = when (val cause = throwable.cause) { + null, throwable -> null + else -> from(cause) + }, + ) + + private fun captureCustomProperties(exception: Throwable): Map = + exception::class.memberProperties + .filter { + // Ensure property is public and not a common property + (it.visibility == KVisibility.PUBLIC) && + it.name !in listOf( + "message", + "cause", + "stackTraceString", + "suppressed", + ) + } + .associateBy( + { it.name }, + { + try { + it.isAccessible = true + it.getter.call(exception) + } catch (e: Exception) { + "Error retrieving value: ${e.message}" + } + }, + ) + .mapValues { (_, v) -> + try { + SerializedData.encode(v, null, null) + } catch (e: Exception) { + SerializedData.encode( + "Non serializable object: ${v!!::class.java.name}", + null, + null, + ) + } + } + } + + // we remove end of line for stackTraceString of the output to preserve logs + override fun toString(): String = this::class.java.simpleName + "(" + + listOf( + "name" to name, + "message" to message, + "customProperties" to serializedCustomProperties.mapValues { (_, v) -> v.toJsonString() }, + "cause" to cause.toString(), + ).joinToString { "${it.first}=${it.second}" } + ")" + + override fun toJson(): JsonObject = JsonObject( + mapOf( + ERROR_NAME to JsonPrimitive(name), + ERROR_MESSAGE to JsonPrimitive(message), + ERROR_CUSTOM_PROPERTIES to JsonObject(serializedCustomProperties.mapValues { (_, v) -> v.toJson() }), + ERROR_CAUSE to cause.toJson(), + ), + ) +} diff --git a/infinitic-common/src/main/kotlin/io/infinitic/exceptions/WorkerException.kt b/infinitic-common/src/main/kotlin/io/infinitic/exceptions/WorkerException.kt deleted file mode 100644 index f71d97bd9..000000000 --- a/infinitic-common/src/main/kotlin/io/infinitic/exceptions/WorkerException.kt +++ /dev/null @@ -1,69 +0,0 @@ -/** - * "Commons Clause" License Condition v1.0 - * - * The Software is provided to you by the Licensor under the License, as defined below, subject to - * the following condition. - * - * Without limiting other conditions in the License, the grant of rights under the License will not - * include, and the License does not grant to you, the right to Sell the Software. - * - * For purposes of the foregoing, “Sell” means practicing any or all of the rights granted to you - * under the License to provide to third parties, for a fee or other consideration (including - * without limitation fees for hosting or consulting/ support services related to the Software), a - * product or service whose value derives, entirely or substantially, from the functionality of the - * Software. Any license notice or attribution required by the License must also include this - * Commons Clause License Condition notice. - * - * Software: Infinitic - * - * License: MIT License (https://opensource.org/licenses/MIT) - * - * Licensor: infinitic.io - */ -package io.infinitic.exceptions - -import io.infinitic.common.tasks.executors.errors.ExecutionError -import io.infinitic.common.workers.data.WorkerName -import kotlinx.serialization.Serializable - -@Serializable -data class WorkerException( - /** Name of the worker */ - val workerName: String, - - /** Name of the error */ - val name: String, - - /** Message of the error */ - override val message: String?, - - /** String version of the stack trace */ - val stackTraceToString: String, - - /** cause of the error */ - override val cause: WorkerException? -) : kotlin.RuntimeException() { - companion object { - fun from(error: ExecutionError): WorkerException = - WorkerException( - workerName = error.workerName.toString(), - name = error.name, - message = error.message, - stackTraceToString = error.stackTraceToString, - cause = error.cause?.let { from(it) }, - ) - - fun from(workerName: WorkerName, throwable: Throwable): WorkerException = - WorkerException( - workerName = workerName.toString(), - name = throwable::class.java.name, - message = throwable.message, - stackTraceToString = throwable.stackTraceToString(), - cause = when (val cause = throwable.cause) { - null, throwable -> null - - else -> from(workerName, cause) - }, - ) - } -} diff --git a/infinitic-common/src/main/kotlin/io/infinitic/tasks/Task.kt b/infinitic-common/src/main/kotlin/io/infinitic/tasks/Task.kt index 9f0bb1b83..138ef0720 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/tasks/Task.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/tasks/Task.kt @@ -23,7 +23,6 @@ package io.infinitic.tasks import io.infinitic.clients.InfiniticClientInterface -import io.infinitic.common.tasks.executors.errors.ExecutionError import org.jetbrains.annotations.TestOnly object Task { @@ -81,7 +80,7 @@ object Task { get() = context.get().workflowVersion?.toInt() @JvmStatic - val lastError: ExecutionError? + val lastError: TaskFailure? get() = context.get().lastError @JvmStatic diff --git a/infinitic-common/src/main/kotlin/io/infinitic/tasks/TaskContext.kt b/infinitic-common/src/main/kotlin/io/infinitic/tasks/TaskContext.kt index ce33710ff..478c1a02b 100644 --- a/infinitic-common/src/main/kotlin/io/infinitic/tasks/TaskContext.kt +++ b/infinitic-common/src/main/kotlin/io/infinitic/tasks/TaskContext.kt @@ -28,7 +28,6 @@ import io.infinitic.common.tasks.data.ServiceName import io.infinitic.common.tasks.data.TaskId import io.infinitic.common.tasks.data.TaskRetryIndex import io.infinitic.common.tasks.data.TaskRetrySequence -import io.infinitic.common.tasks.executors.errors.ExecutionError import io.infinitic.common.workers.config.WorkflowVersion import io.infinitic.common.workflows.data.workflows.WorkflowId import io.infinitic.common.workflows.data.workflows.WorkflowName @@ -44,7 +43,7 @@ interface TaskContext { val workflowVersion: WorkflowVersion? val retrySequence: TaskRetrySequence val retryIndex: TaskRetryIndex - val lastError: ExecutionError? + val lastError: TaskFailure? val batchKey: String? val tags: Set val meta: MutableMap diff --git a/infinitic-common/src/main/kotlin/io/infinitic/tasks/TaskFailure.kt b/infinitic-common/src/main/kotlin/io/infinitic/tasks/TaskFailure.kt new file mode 100644 index 000000000..e174b86e9 --- /dev/null +++ b/infinitic-common/src/main/kotlin/io/infinitic/tasks/TaskFailure.kt @@ -0,0 +1,117 @@ +/** + * "Commons Clause" License Condition v1.0 + * + * The Software is provided to you by the Licensor under the License, as defined below, subject to + * the following condition. + * + * Without limiting other conditions in the License, the grant of rights under the License will not + * include, and the License does not grant to you, the right to Sell the Software. + * + * For purposes of the foregoing, “Sell” means practicing any or all of the rights granted to you + * under the License to provide to third parties, for a fee or other consideration (including + * without limitation fees for hosting or consulting/ support services related to the Software), a + * product or service whose value derives, entirely or substantially, from the functionality of the + * Software. Any license notice or attribution required by the License must also include this + * Commons Clause License Condition notice. + * + * Software: Infinitic + * + * License: MIT License (https://opensource.org/licenses/MIT) + * + * Licensor: infinitic.io + */ +package io.infinitic.tasks + +import com.github.avrokotlin.avro4k.Avro +import com.github.avrokotlin.avro4k.AvroDefault +import com.github.avrokotlin.avro4k.AvroName +import com.github.avrokotlin.avro4k.AvroNamespace +import io.infinitic.cloudEvents.ERROR +import io.infinitic.cloudEvents.FAILURE_PREVIOUS +import io.infinitic.cloudEvents.FAILURE_STACKTRACE +import io.infinitic.cloudEvents.TASK_RETRY_DELAY +import io.infinitic.cloudEvents.TASK_RETRY_INDEX +import io.infinitic.cloudEvents.TASK_RETRY_SEQUENCE +import io.infinitic.cloudEvents.WORKER_NAME +import io.infinitic.common.exceptions.thisShouldNotHappen +import io.infinitic.common.utils.JsonAble +import io.infinitic.common.utils.toJson +import io.infinitic.exceptions.GenericException +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.JsonObject +import kotlinx.serialization.json.JsonPrimitive + +/** Data class representing an error */ +@Serializable +@AvroNamespace("io.infinitic.tasks.executor") +@AvroName("WorkerError") +data class TaskFailure( + /** Name of the worker */ + val workerName: String, + + /** Sequence of the retry */ + @AvroDefault("0") val retrySequence: Int?, + + /** Index of the retry */ + @AvroDefault("0") val retryIndex: Int?, + + /** Seconds before retry (null if no retry)*/ + @AvroDefault(Avro.NULL) val secondsBeforeRetry: Double?, + + /** String version of the stack trace */ + @SerialName("stackTraceToString") val stackTraceString: String?, + + /** details of the exception */ + @AvroDefault(Avro.NULL) var exception: GenericException?, + + /** cause of the error */ + @SerialName("cause") val previousFailure: TaskFailure?, + + /** Name of the error */ + @Deprecated("Unused after v0.17.0") private val name: String? = null, + /** Message of the error */ + @Deprecated("Unused after v0.17.0") private val message: String? = null, +) : JsonAble { + + init { + // Useful to convert data with version < 0.17.0 + exception = exception ?: GenericException( + name = name ?: thisShouldNotHappen(), + message = message, + serializedCustomProperties = emptyMap(), + cause = null, + ) + } + + override fun toJson(): JsonObject = JsonObject( + mapOf( + WORKER_NAME to JsonPrimitive(workerName), + TASK_RETRY_SEQUENCE to JsonPrimitive(retrySequence), + TASK_RETRY_INDEX to JsonPrimitive(retryIndex), + TASK_RETRY_DELAY to JsonPrimitive(secondsBeforeRetry), + ERROR to exception.toJson(), + FAILURE_STACKTRACE to JsonPrimitive(stackTraceString), + FAILURE_PREVIOUS to previousFailure.toJson(), + ), + ) + + fun toJsonWithoutAttemptDetails(): JsonObject = JsonObject( + mapOf( + ERROR to exception.toJson(), + FAILURE_STACKTRACE to JsonPrimitive(stackTraceString), + FAILURE_PREVIOUS to previousFailure.toJson(), + ), + ) + + override fun toString(): String { + return "TaskFailure(" + + "workerName='$workerName'," + + "retrySequence=$retrySequence, " + + "retryIndex=$retryIndex, " + + "secondsBeforeRetry=$secondsBeforeRetry, " + + "stackTraceString='${stackTraceString?.replace("\n", "\\n")}', " + + "exception=$exception, " + + "previousFailure=$previousFailure)" + } +} diff --git a/infinitic-common/src/main/resources/schemasclientEnvelope-0.16.0.avsc b/infinitic-common/src/main/resources/schemas/clientEnvelope-0.16.1.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasclientEnvelope-0.16.0.avsc rename to infinitic-common/src/main/resources/schemas/clientEnvelope-0.16.1.avsc diff --git a/infinitic-common/src/main/resources/schemasclientEnvelope-0.16.1.avsc b/infinitic-common/src/main/resources/schemas/clientEnvelope-0.16.2.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasclientEnvelope-0.16.1.avsc rename to infinitic-common/src/main/resources/schemas/clientEnvelope-0.16.2.avsc diff --git a/infinitic-common/src/main/resources/schemasclientEnvelope-0.16.2.avsc b/infinitic-common/src/main/resources/schemas/clientEnvelope-0.16.3.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasclientEnvelope-0.16.2.avsc rename to infinitic-common/src/main/resources/schemas/clientEnvelope-0.16.3.avsc diff --git a/infinitic-common/src/main/resources/schemasclientEnvelope-0.16.3.avsc b/infinitic-common/src/main/resources/schemas/clientEnvelope-0.16.4.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasclientEnvelope-0.16.3.avsc rename to infinitic-common/src/main/resources/schemas/clientEnvelope-0.16.4.avsc diff --git a/infinitic-common/src/main/resources/schemasclientEnvelope-0.16.4.avsc b/infinitic-common/src/main/resources/schemas/clientEnvelope-0.17.0.avsc similarity index 91% rename from infinitic-common/src/main/resources/schemasclientEnvelope-0.16.4.avsc rename to infinitic-common/src/main/resources/schemas/clientEnvelope-0.17.0.avsc index c5f6f0d0c..bee5c0434 100644 --- a/infinitic-common/src/main/resources/schemasclientEnvelope-0.16.4.avsc +++ b/infinitic-common/src/main/resources/schemas/clientEnvelope-0.17.0.avsc @@ -104,17 +104,52 @@ "name" : "workerName", "type" : "string" }, { - "name" : "name", - "type" : "string" + "name" : "retrySequence", + "type" : [ "int", "null" ], + "default" : 0 }, { - "name" : "message", - "type" : [ "null", "string" ] + "name" : "retryIndex", + "type" : [ "int", "null" ], + "default" : 0 + }, { + "name" : "secondsBeforeRetry", + "type" : [ "null", "double" ], + "default" : null }, { "name" : "stackTraceToString", - "type" : "string" + "type" : [ "null", "string" ] + }, { + "name" : "exception", + "type" : [ "null", { + "type" : "record", + "name" : "GenericException", + "fields" : [ { + "name" : "name", + "type" : "string" + }, { + "name" : "message", + "type" : [ "null", "string" ] + }, { + "name" : "serializedCustomProperties", + "type" : { + "type" : "map", + "values" : "io.infinitic.data.SerializedData" + } + }, { + "name" : "cause", + "type" : [ "null", "GenericException" ] + } ] + } ], + "default" : null }, { "name" : "cause", "type" : [ "null", "WorkerError" ] + }, { + "name" : "name", + "type" : [ "null", "string" ] + }, { + "name" : "message", + "type" : [ "null", "string" ] } ] } }, { diff --git a/infinitic-common/src/main/resources/schemasdelegatedTaskData-0.16.0.avsc b/infinitic-common/src/main/resources/schemas/delegatedTaskData-0.16.1.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasdelegatedTaskData-0.16.0.avsc rename to infinitic-common/src/main/resources/schemas/delegatedTaskData-0.16.1.avsc diff --git a/infinitic-common/src/main/resources/schemasdelegatedTaskData-0.16.1.avsc b/infinitic-common/src/main/resources/schemas/delegatedTaskData-0.16.2.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasdelegatedTaskData-0.16.1.avsc rename to infinitic-common/src/main/resources/schemas/delegatedTaskData-0.16.2.avsc diff --git a/infinitic-common/src/main/resources/schemasdelegatedTaskData-0.16.2.avsc b/infinitic-common/src/main/resources/schemas/delegatedTaskData-0.16.3.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasdelegatedTaskData-0.16.2.avsc rename to infinitic-common/src/main/resources/schemas/delegatedTaskData-0.16.3.avsc diff --git a/infinitic-common/src/main/resources/schemasdelegatedTaskData-0.16.3.avsc b/infinitic-common/src/main/resources/schemas/delegatedTaskData-0.16.4.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasdelegatedTaskData-0.16.3.avsc rename to infinitic-common/src/main/resources/schemas/delegatedTaskData-0.16.4.avsc diff --git a/infinitic-common/src/main/resources/schemasdelegatedTaskData-0.16.4.avsc b/infinitic-common/src/main/resources/schemas/delegatedTaskData-0.17.0.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasdelegatedTaskData-0.16.4.avsc rename to infinitic-common/src/main/resources/schemas/delegatedTaskData-0.17.0.avsc diff --git a/infinitic-common/src/main/resources/schemasserviceEventEnvelope-0.16.0.avsc b/infinitic-common/src/main/resources/schemas/serviceEventEnvelope-0.16.1.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasserviceEventEnvelope-0.16.0.avsc rename to infinitic-common/src/main/resources/schemas/serviceEventEnvelope-0.16.1.avsc diff --git a/infinitic-common/src/main/resources/schemasserviceEventEnvelope-0.16.1.avsc b/infinitic-common/src/main/resources/schemas/serviceEventEnvelope-0.16.2.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasserviceEventEnvelope-0.16.1.avsc rename to infinitic-common/src/main/resources/schemas/serviceEventEnvelope-0.16.2.avsc diff --git a/infinitic-common/src/main/resources/schemasserviceEventEnvelope-0.16.2.avsc b/infinitic-common/src/main/resources/schemas/serviceEventEnvelope-0.16.3.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasserviceEventEnvelope-0.16.2.avsc rename to infinitic-common/src/main/resources/schemas/serviceEventEnvelope-0.16.3.avsc diff --git a/infinitic-common/src/main/resources/schemasserviceEventEnvelope-0.16.3.avsc b/infinitic-common/src/main/resources/schemas/serviceEventEnvelope-0.16.4.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasserviceEventEnvelope-0.16.3.avsc rename to infinitic-common/src/main/resources/schemas/serviceEventEnvelope-0.16.4.avsc diff --git a/infinitic-common/src/main/resources/schemasserviceEventEnvelope-0.16.4.avsc b/infinitic-common/src/main/resources/schemas/serviceEventEnvelope-0.17.0.avsc similarity index 84% rename from infinitic-common/src/main/resources/schemasserviceEventEnvelope-0.16.4.avsc rename to infinitic-common/src/main/resources/schemas/serviceEventEnvelope-0.17.0.avsc index 49561923f..633149e7d 100644 --- a/infinitic-common/src/main/resources/schemasserviceEventEnvelope-0.16.4.avsc +++ b/infinitic-common/src/main/resources/schemas/serviceEventEnvelope-0.17.0.avsc @@ -149,17 +149,73 @@ "name" : "workerName", "type" : "string" }, { - "name" : "name", - "type" : "string" + "name" : "retrySequence", + "type" : [ "int", "null" ], + "default" : 0 }, { - "name" : "message", - "type" : [ "null", "string" ] + "name" : "retryIndex", + "type" : [ "int", "null" ], + "default" : 0 + }, { + "name" : "secondsBeforeRetry", + "type" : [ "null", "double" ], + "default" : null }, { "name" : "stackTraceToString", - "type" : "string" + "type" : [ "null", "string" ] + }, { + "name" : "exception", + "type" : [ "null", { + "type" : "record", + "name" : "GenericException", + "fields" : [ { + "name" : "name", + "type" : "string" + }, { + "name" : "message", + "type" : [ "null", "string" ] + }, { + "name" : "serializedCustomProperties", + "type" : { + "type" : "map", + "values" : { + "type" : "record", + "name" : "SerializedData", + "namespace" : "io.infinitic.data", + "fields" : [ { + "name" : "bytes", + "type" : "bytes" + }, { + "name" : "type", + "type" : { + "type" : "enum", + "name" : "SerializedDataType", + "symbols" : [ "NULL", "AVRO_WITH_SCHEMA", "JSON", "JSON_JACKSON", "JSON_KOTLIN" ] + } + }, { + "name" : "meta", + "type" : { + "type" : "map", + "values" : "bytes" + } + } ] + } + } + }, { + "name" : "cause", + "type" : [ "null", "GenericException" ] + } ] + } ], + "default" : null }, { "name" : "cause", "type" : [ "null", "WorkerError" ] + }, { + "name" : "name", + "type" : [ "null", "string" ] + }, { + "name" : "message", + "type" : [ "null", "string" ] } ] } } ] @@ -416,28 +472,7 @@ "type" : "boolean" }, { "name" : "returnValue", - "type" : { - "type" : "record", - "name" : "SerializedData", - "namespace" : "io.infinitic.data", - "fields" : [ { - "name" : "bytes", - "type" : "bytes" - }, { - "name" : "type", - "type" : { - "type" : "enum", - "name" : "SerializedDataType", - "symbols" : [ "NULL", "AVRO_WITH_SCHEMA", "JSON", "JSON_JACKSON", "JSON_KOTLIN" ] - } - }, { - "name" : "meta", - "type" : { - "type" : "map", - "values" : "bytes" - } - } ] - } + "type" : "io.infinitic.data.SerializedData" } ] } ] } ] diff --git a/infinitic-common/src/main/resources/schemasserviceExecutorEnvelope-0.16.0.avsc b/infinitic-common/src/main/resources/schemas/serviceExecutorEnvelope-0.16.1.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasserviceExecutorEnvelope-0.16.0.avsc rename to infinitic-common/src/main/resources/schemas/serviceExecutorEnvelope-0.16.1.avsc diff --git a/infinitic-common/src/main/resources/schemasserviceExecutorEnvelope-0.16.1.avsc b/infinitic-common/src/main/resources/schemas/serviceExecutorEnvelope-0.16.2.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasserviceExecutorEnvelope-0.16.1.avsc rename to infinitic-common/src/main/resources/schemas/serviceExecutorEnvelope-0.16.2.avsc diff --git a/infinitic-common/src/main/resources/schemasserviceExecutorEnvelope-0.16.2.avsc b/infinitic-common/src/main/resources/schemas/serviceExecutorEnvelope-0.16.3.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasserviceExecutorEnvelope-0.16.2.avsc rename to infinitic-common/src/main/resources/schemas/serviceExecutorEnvelope-0.16.3.avsc diff --git a/infinitic-common/src/main/resources/schemasserviceExecutorEnvelope-0.16.3.avsc b/infinitic-common/src/main/resources/schemas/serviceExecutorEnvelope-0.16.4.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasserviceExecutorEnvelope-0.16.3.avsc rename to infinitic-common/src/main/resources/schemas/serviceExecutorEnvelope-0.16.4.avsc diff --git a/infinitic-common/src/main/resources/schemasserviceExecutorEnvelope-0.16.4.avsc b/infinitic-common/src/main/resources/schemas/serviceExecutorEnvelope-0.17.0.avsc similarity index 77% rename from infinitic-common/src/main/resources/schemasserviceExecutorEnvelope-0.16.4.avsc rename to infinitic-common/src/main/resources/schemas/serviceExecutorEnvelope-0.17.0.avsc index e2e3f8c98..e06ee07cb 100644 --- a/infinitic-common/src/main/resources/schemasserviceExecutorEnvelope-0.16.4.avsc +++ b/infinitic-common/src/main/resources/schemas/serviceExecutorEnvelope-0.17.0.avsc @@ -72,15 +72,6 @@ } ] } ], "default" : null - }, { - "name" : "workflowName", - "type" : [ "null", "string" ] - }, { - "name" : "workflowId", - "type" : [ "null", "string" ] - }, { - "name" : "methodRunId", - "type" : [ "null", "string" ] }, { "name" : "taskTags", "type" : { @@ -141,19 +132,63 @@ "name" : "workerName", "type" : "string" }, { - "name" : "name", - "type" : "string" + "name" : "retrySequence", + "type" : [ "int", "null" ], + "default" : 0 }, { - "name" : "message", - "type" : [ "null", "string" ] + "name" : "retryIndex", + "type" : [ "int", "null" ], + "default" : 0 + }, { + "name" : "secondsBeforeRetry", + "type" : [ "null", "double" ], + "default" : null }, { "name" : "stackTraceToString", - "type" : "string" + "type" : [ "null", "string" ] + }, { + "name" : "exception", + "type" : [ "null", { + "type" : "record", + "name" : "GenericException", + "fields" : [ { + "name" : "name", + "type" : "string" + }, { + "name" : "message", + "type" : [ "null", "string" ] + }, { + "name" : "serializedCustomProperties", + "type" : { + "type" : "map", + "values" : "io.infinitic.data.SerializedData" + } + }, { + "name" : "cause", + "type" : [ "null", "GenericException" ] + } ] + } ], + "default" : null }, { "name" : "cause", "type" : [ "null", "WorkerError" ] + }, { + "name" : "name", + "type" : [ "null", "string" ] + }, { + "name" : "message", + "type" : [ "null", "string" ] } ] } ] + }, { + "name" : "workflowName", + "type" : [ "null", "string" ] + }, { + "name" : "workflowId", + "type" : [ "null", "string" ] + }, { + "name" : "methodRunId", + "type" : [ "null", "string" ] }, { "name" : "workflowVersion", "type" : [ "null", "int" ], diff --git a/infinitic-common/src/main/resources/schemasserviceTagEnvelope-0.16.0.avsc b/infinitic-common/src/main/resources/schemas/serviceTagEnvelope-0.16.1.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasserviceTagEnvelope-0.16.0.avsc rename to infinitic-common/src/main/resources/schemas/serviceTagEnvelope-0.16.1.avsc diff --git a/infinitic-common/src/main/resources/schemasserviceTagEnvelope-0.16.1.avsc b/infinitic-common/src/main/resources/schemas/serviceTagEnvelope-0.16.2.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasserviceTagEnvelope-0.16.1.avsc rename to infinitic-common/src/main/resources/schemas/serviceTagEnvelope-0.16.2.avsc diff --git a/infinitic-common/src/main/resources/schemasserviceTagEnvelope-0.16.2.avsc b/infinitic-common/src/main/resources/schemas/serviceTagEnvelope-0.16.3.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasserviceTagEnvelope-0.16.2.avsc rename to infinitic-common/src/main/resources/schemas/serviceTagEnvelope-0.16.3.avsc diff --git a/infinitic-common/src/main/resources/schemasserviceTagEnvelope-0.16.3.avsc b/infinitic-common/src/main/resources/schemas/serviceTagEnvelope-0.16.4.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasserviceTagEnvelope-0.16.3.avsc rename to infinitic-common/src/main/resources/schemas/serviceTagEnvelope-0.16.4.avsc diff --git a/infinitic-common/src/main/resources/schemasserviceTagEnvelope-0.16.4.avsc b/infinitic-common/src/main/resources/schemas/serviceTagEnvelope-0.17.0.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasserviceTagEnvelope-0.16.4.avsc rename to infinitic-common/src/main/resources/schemas/serviceTagEnvelope-0.17.0.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowCmdEnvelope-0.16.0.avsc b/infinitic-common/src/main/resources/schemas/workflowCmdEnvelope-0.16.1.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowCmdEnvelope-0.16.0.avsc rename to infinitic-common/src/main/resources/schemas/workflowCmdEnvelope-0.16.1.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowCmdEnvelope-0.16.1.avsc b/infinitic-common/src/main/resources/schemas/workflowCmdEnvelope-0.16.2.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowCmdEnvelope-0.16.1.avsc rename to infinitic-common/src/main/resources/schemas/workflowCmdEnvelope-0.16.2.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowCmdEnvelope-0.16.2.avsc b/infinitic-common/src/main/resources/schemas/workflowCmdEnvelope-0.16.3.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowCmdEnvelope-0.16.2.avsc rename to infinitic-common/src/main/resources/schemas/workflowCmdEnvelope-0.16.3.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowCmdEnvelope-0.16.3.avsc b/infinitic-common/src/main/resources/schemas/workflowCmdEnvelope-0.16.4.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowCmdEnvelope-0.16.3.avsc rename to infinitic-common/src/main/resources/schemas/workflowCmdEnvelope-0.16.4.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowCmdEnvelope-0.16.4.avsc b/infinitic-common/src/main/resources/schemas/workflowCmdEnvelope-0.17.0.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowCmdEnvelope-0.16.4.avsc rename to infinitic-common/src/main/resources/schemas/workflowCmdEnvelope-0.17.0.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowEngineEnvelope-0.16.0.avsc b/infinitic-common/src/main/resources/schemas/workflowEngineEnvelope-0.16.1.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowEngineEnvelope-0.16.0.avsc rename to infinitic-common/src/main/resources/schemas/workflowEngineEnvelope-0.16.1.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowEngineEnvelope-0.16.1.avsc b/infinitic-common/src/main/resources/schemas/workflowEngineEnvelope-0.16.2.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowEngineEnvelope-0.16.1.avsc rename to infinitic-common/src/main/resources/schemas/workflowEngineEnvelope-0.16.2.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowEngineEnvelope-0.16.2.avsc b/infinitic-common/src/main/resources/schemas/workflowEngineEnvelope-0.16.3.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowEngineEnvelope-0.16.2.avsc rename to infinitic-common/src/main/resources/schemas/workflowEngineEnvelope-0.16.3.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowEngineEnvelope-0.16.3.avsc b/infinitic-common/src/main/resources/schemas/workflowEngineEnvelope-0.16.4.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowEngineEnvelope-0.16.3.avsc rename to infinitic-common/src/main/resources/schemas/workflowEngineEnvelope-0.16.4.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowEngineEnvelope-0.16.4.avsc b/infinitic-common/src/main/resources/schemas/workflowEngineEnvelope-0.17.0.avsc similarity index 95% rename from infinitic-common/src/main/resources/schemasworkflowEngineEnvelope-0.16.4.avsc rename to infinitic-common/src/main/resources/schemas/workflowEngineEnvelope-0.17.0.avsc index 39eef2219..7d3b1d51f 100644 --- a/infinitic-common/src/main/resources/schemasworkflowEngineEnvelope-0.16.4.avsc +++ b/infinitic-common/src/main/resources/schemas/workflowEngineEnvelope-0.17.0.avsc @@ -748,17 +748,52 @@ "name" : "workerName", "type" : "string" }, { - "name" : "name", - "type" : "string" + "name" : "retrySequence", + "type" : [ "int", "null" ], + "default" : 0 }, { - "name" : "message", - "type" : [ "null", "string" ] + "name" : "retryIndex", + "type" : [ "int", "null" ], + "default" : 0 + }, { + "name" : "secondsBeforeRetry", + "type" : [ "null", "double" ], + "default" : null }, { "name" : "stackTraceToString", - "type" : "string" + "type" : [ "null", "string" ] + }, { + "name" : "exception", + "type" : [ "null", { + "type" : "record", + "name" : "GenericException", + "fields" : [ { + "name" : "name", + "type" : "string" + }, { + "name" : "message", + "type" : [ "null", "string" ] + }, { + "name" : "serializedCustomProperties", + "type" : { + "type" : "map", + "values" : "io.infinitic.data.SerializedData" + } + }, { + "name" : "cause", + "type" : [ "null", "GenericException" ] + } ] + } ], + "default" : null }, { "name" : "cause", "type" : [ "null", "WorkerError" ] + }, { + "name" : "name", + "type" : [ "null", "string" ] + }, { + "name" : "message", + "type" : [ "null", "string" ] } ] } } ] diff --git a/infinitic-common/src/main/resources/schemasworkflowEventEnvelope-0.16.0.avsc b/infinitic-common/src/main/resources/schemas/workflowEventEnvelope-0.16.1.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowEventEnvelope-0.16.0.avsc rename to infinitic-common/src/main/resources/schemas/workflowEventEnvelope-0.16.1.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowEventEnvelope-0.16.1.avsc b/infinitic-common/src/main/resources/schemas/workflowEventEnvelope-0.16.2.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowEventEnvelope-0.16.1.avsc rename to infinitic-common/src/main/resources/schemas/workflowEventEnvelope-0.16.2.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowEventEnvelope-0.16.2.avsc b/infinitic-common/src/main/resources/schemas/workflowEventEnvelope-0.16.3.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowEventEnvelope-0.16.2.avsc rename to infinitic-common/src/main/resources/schemas/workflowEventEnvelope-0.16.3.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowEventEnvelope-0.16.3.avsc b/infinitic-common/src/main/resources/schemas/workflowEventEnvelope-0.16.4.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowEventEnvelope-0.16.3.avsc rename to infinitic-common/src/main/resources/schemas/workflowEventEnvelope-0.16.4.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowEventEnvelope-0.16.4.avsc b/infinitic-common/src/main/resources/schemas/workflowEventEnvelope-0.17.0.avsc similarity index 95% rename from infinitic-common/src/main/resources/schemasworkflowEventEnvelope-0.16.4.avsc rename to infinitic-common/src/main/resources/schemas/workflowEventEnvelope-0.17.0.avsc index 66841e7bf..901ad6dc6 100644 --- a/infinitic-common/src/main/resources/schemasworkflowEventEnvelope-0.16.4.avsc +++ b/infinitic-common/src/main/resources/schemas/workflowEventEnvelope-0.17.0.avsc @@ -299,17 +299,52 @@ "name" : "workerName", "type" : "string" }, { - "name" : "name", - "type" : "string" + "name" : "retrySequence", + "type" : [ "int", "null" ], + "default" : 0 }, { - "name" : "message", - "type" : [ "null", "string" ] + "name" : "retryIndex", + "type" : [ "int", "null" ], + "default" : 0 + }, { + "name" : "secondsBeforeRetry", + "type" : [ "null", "double" ], + "default" : null }, { "name" : "stackTraceToString", - "type" : "string" + "type" : [ "null", "string" ] + }, { + "name" : "exception", + "type" : [ "null", { + "type" : "record", + "name" : "GenericException", + "fields" : [ { + "name" : "name", + "type" : "string" + }, { + "name" : "message", + "type" : [ "null", "string" ] + }, { + "name" : "serializedCustomProperties", + "type" : { + "type" : "map", + "values" : "io.infinitic.data.SerializedData" + } + }, { + "name" : "cause", + "type" : [ "null", "GenericException" ] + } ] + } ], + "default" : null }, { "name" : "cause", "type" : [ "null", "WorkerError" ] + }, { + "name" : "name", + "type" : [ "null", "string" ] + }, { + "name" : "message", + "type" : [ "null", "string" ] } ] } } ] diff --git a/infinitic-common/src/main/resources/schemasworkflowState-0.16.0.avsc b/infinitic-common/src/main/resources/schemas/workflowState-0.16.1.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowState-0.16.0.avsc rename to infinitic-common/src/main/resources/schemas/workflowState-0.16.1.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowState-0.16.1.avsc b/infinitic-common/src/main/resources/schemas/workflowState-0.16.2.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowState-0.16.1.avsc rename to infinitic-common/src/main/resources/schemas/workflowState-0.16.2.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowState-0.16.2.avsc b/infinitic-common/src/main/resources/schemas/workflowState-0.16.3.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowState-0.16.2.avsc rename to infinitic-common/src/main/resources/schemas/workflowState-0.16.3.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowState-0.16.3.avsc b/infinitic-common/src/main/resources/schemas/workflowState-0.16.4.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowState-0.16.3.avsc rename to infinitic-common/src/main/resources/schemas/workflowState-0.16.4.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowState-0.16.4.avsc b/infinitic-common/src/main/resources/schemas/workflowState-0.17.0.avsc similarity index 96% rename from infinitic-common/src/main/resources/schemasworkflowState-0.16.4.avsc rename to infinitic-common/src/main/resources/schemas/workflowState-0.17.0.avsc index 163f2abcd..c2f87c3ed 100644 --- a/infinitic-common/src/main/resources/schemasworkflowState-0.16.4.avsc +++ b/infinitic-common/src/main/resources/schemas/workflowState-0.17.0.avsc @@ -272,17 +272,52 @@ "name" : "workerName", "type" : "string" }, { - "name" : "name", - "type" : "string" + "name" : "retrySequence", + "type" : [ "int", "null" ], + "default" : 0 }, { - "name" : "message", - "type" : [ "null", "string" ] + "name" : "retryIndex", + "type" : [ "int", "null" ], + "default" : 0 + }, { + "name" : "secondsBeforeRetry", + "type" : [ "null", "double" ], + "default" : null }, { "name" : "stackTraceToString", - "type" : "string" + "type" : [ "null", "string" ] + }, { + "name" : "exception", + "type" : [ "null", { + "type" : "record", + "name" : "GenericException", + "fields" : [ { + "name" : "name", + "type" : "string" + }, { + "name" : "message", + "type" : [ "null", "string" ] + }, { + "name" : "serializedCustomProperties", + "type" : { + "type" : "map", + "values" : "io.infinitic.data.SerializedData" + } + }, { + "name" : "cause", + "type" : [ "null", "GenericException" ] + } ] + } ], + "default" : null }, { "name" : "cause", "type" : [ "null", "WorkerError" ] + }, { + "name" : "name", + "type" : [ "null", "string" ] + }, { + "name" : "message", + "type" : [ "null", "string" ] } ] } } ] diff --git a/infinitic-common/src/main/resources/schemasworkflowTagEnvelope-0.16.0.avsc b/infinitic-common/src/main/resources/schemas/workflowTagEnvelope-0.16.1.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowTagEnvelope-0.16.0.avsc rename to infinitic-common/src/main/resources/schemas/workflowTagEnvelope-0.16.1.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowTagEnvelope-0.16.1.avsc b/infinitic-common/src/main/resources/schemas/workflowTagEnvelope-0.16.2.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowTagEnvelope-0.16.1.avsc rename to infinitic-common/src/main/resources/schemas/workflowTagEnvelope-0.16.2.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowTagEnvelope-0.16.2.avsc b/infinitic-common/src/main/resources/schemas/workflowTagEnvelope-0.16.3.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowTagEnvelope-0.16.2.avsc rename to infinitic-common/src/main/resources/schemas/workflowTagEnvelope-0.16.3.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowTagEnvelope-0.16.3.avsc b/infinitic-common/src/main/resources/schemas/workflowTagEnvelope-0.16.4.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowTagEnvelope-0.16.3.avsc rename to infinitic-common/src/main/resources/schemas/workflowTagEnvelope-0.16.4.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowTagEnvelope-0.16.4.avsc b/infinitic-common/src/main/resources/schemas/workflowTagEnvelope-0.17.0.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowTagEnvelope-0.16.4.avsc rename to infinitic-common/src/main/resources/schemas/workflowTagEnvelope-0.17.0.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowTaskParameters-0.16.0.avsc b/infinitic-common/src/main/resources/schemas/workflowTaskParameters-0.16.1.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowTaskParameters-0.16.0.avsc rename to infinitic-common/src/main/resources/schemas/workflowTaskParameters-0.16.1.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowTaskParameters-0.16.1.avsc b/infinitic-common/src/main/resources/schemas/workflowTaskParameters-0.16.2.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowTaskParameters-0.16.1.avsc rename to infinitic-common/src/main/resources/schemas/workflowTaskParameters-0.16.2.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowTaskParameters-0.16.2.avsc b/infinitic-common/src/main/resources/schemas/workflowTaskParameters-0.16.3.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowTaskParameters-0.16.2.avsc rename to infinitic-common/src/main/resources/schemas/workflowTaskParameters-0.16.3.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowTaskParameters-0.16.3.avsc b/infinitic-common/src/main/resources/schemas/workflowTaskParameters-0.16.4.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowTaskParameters-0.16.3.avsc rename to infinitic-common/src/main/resources/schemas/workflowTaskParameters-0.16.4.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowTaskParameters-0.16.4.avsc b/infinitic-common/src/main/resources/schemas/workflowTaskParameters-0.17.0.avsc similarity index 94% rename from infinitic-common/src/main/resources/schemasworkflowTaskParameters-0.16.4.avsc rename to infinitic-common/src/main/resources/schemas/workflowTaskParameters-0.17.0.avsc index 45de57148..fe41b0d65 100644 --- a/infinitic-common/src/main/resources/schemasworkflowTaskParameters-0.16.4.avsc +++ b/infinitic-common/src/main/resources/schemas/workflowTaskParameters-0.17.0.avsc @@ -266,17 +266,52 @@ "name" : "workerName", "type" : "string" }, { - "name" : "name", - "type" : "string" + "name" : "retrySequence", + "type" : [ "int", "null" ], + "default" : 0 }, { - "name" : "message", - "type" : [ "null", "string" ] + "name" : "retryIndex", + "type" : [ "int", "null" ], + "default" : 0 + }, { + "name" : "secondsBeforeRetry", + "type" : [ "null", "double" ], + "default" : null }, { "name" : "stackTraceToString", - "type" : "string" + "type" : [ "null", "string" ] + }, { + "name" : "exception", + "type" : [ "null", { + "type" : "record", + "name" : "GenericException", + "fields" : [ { + "name" : "name", + "type" : "string" + }, { + "name" : "message", + "type" : [ "null", "string" ] + }, { + "name" : "serializedCustomProperties", + "type" : { + "type" : "map", + "values" : "io.infinitic.data.SerializedData" + } + }, { + "name" : "cause", + "type" : [ "null", "GenericException" ] + } ] + } ], + "default" : null }, { "name" : "cause", "type" : [ "null", "WorkerError" ] + }, { + "name" : "name", + "type" : [ "null", "string" ] + }, { + "name" : "message", + "type" : [ "null", "string" ] } ] } } ] diff --git a/infinitic-common/src/main/resources/schemasworkflowTaskReturnValue-0.16.0.avsc b/infinitic-common/src/main/resources/schemas/workflowTaskReturnValue-0.16.1.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowTaskReturnValue-0.16.0.avsc rename to infinitic-common/src/main/resources/schemas/workflowTaskReturnValue-0.16.1.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowTaskReturnValue-0.16.1.avsc b/infinitic-common/src/main/resources/schemas/workflowTaskReturnValue-0.16.2.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowTaskReturnValue-0.16.1.avsc rename to infinitic-common/src/main/resources/schemas/workflowTaskReturnValue-0.16.2.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowTaskReturnValue-0.16.2.avsc b/infinitic-common/src/main/resources/schemas/workflowTaskReturnValue-0.16.3.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowTaskReturnValue-0.16.2.avsc rename to infinitic-common/src/main/resources/schemas/workflowTaskReturnValue-0.16.3.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowTaskReturnValue-0.16.3.avsc b/infinitic-common/src/main/resources/schemas/workflowTaskReturnValue-0.16.4.avsc similarity index 100% rename from infinitic-common/src/main/resources/schemasworkflowTaskReturnValue-0.16.3.avsc rename to infinitic-common/src/main/resources/schemas/workflowTaskReturnValue-0.16.4.avsc diff --git a/infinitic-common/src/main/resources/schemasworkflowTaskReturnValue-0.16.4.avsc b/infinitic-common/src/main/resources/schemas/workflowTaskReturnValue-0.17.0.avsc similarity index 93% rename from infinitic-common/src/main/resources/schemasworkflowTaskReturnValue-0.16.4.avsc rename to infinitic-common/src/main/resources/schemas/workflowTaskReturnValue-0.17.0.avsc index 9c82c3e71..70a5743ab 100644 --- a/infinitic-common/src/main/resources/schemasworkflowTaskReturnValue-0.16.4.avsc +++ b/infinitic-common/src/main/resources/schemas/workflowTaskReturnValue-0.17.0.avsc @@ -137,17 +137,52 @@ "name" : "workerName", "type" : "string" }, { - "name" : "name", - "type" : "string" + "name" : "retrySequence", + "type" : [ "int", "null" ], + "default" : 0 }, { - "name" : "message", - "type" : [ "null", "string" ] + "name" : "retryIndex", + "type" : [ "int", "null" ], + "default" : 0 + }, { + "name" : "secondsBeforeRetry", + "type" : [ "null", "double" ], + "default" : null }, { "name" : "stackTraceToString", - "type" : "string" + "type" : [ "null", "string" ] + }, { + "name" : "exception", + "type" : [ "null", { + "type" : "record", + "name" : "GenericException", + "fields" : [ { + "name" : "name", + "type" : "string" + }, { + "name" : "message", + "type" : [ "null", "string" ] + }, { + "name" : "serializedCustomProperties", + "type" : { + "type" : "map", + "values" : "io.infinitic.data.SerializedData" + } + }, { + "name" : "cause", + "type" : [ "null", "GenericException" ] + } ] + } ], + "default" : null }, { "name" : "cause", "type" : [ "null", "WorkerError" ] + }, { + "name" : "name", + "type" : [ "null", "string" ] + }, { + "name" : "message", + "type" : [ "null", "string" ] } ] } } ] diff --git a/infinitic-common/src/main/resources/versions b/infinitic-common/src/main/resources/versions index 3145af8b7..240f5c56c 100644 --- a/infinitic-common/src/main/resources/versions +++ b/infinitic-common/src/main/resources/versions @@ -37,3 +37,4 @@ 0.16.2 0.16.3 0.16.4 +0.17.0 diff --git a/infinitic-common/src/test/kotlin/io/infinitic/common/clients/messages/ClientEnvelopeTests.kt b/infinitic-common/src/test/kotlin/io/infinitic/common/clients/messages/ClientEnvelopeTests.kt index 4563ebd53..56b64d66b 100644 --- a/infinitic-common/src/test/kotlin/io/infinitic/common/clients/messages/ClientEnvelopeTests.kt +++ b/infinitic-common/src/test/kotlin/io/infinitic/common/clients/messages/ClientEnvelopeTests.kt @@ -22,6 +22,7 @@ */ package io.infinitic.common.clients.messages +import io.infinitic.common.exceptions.ShouldNotHappenException import io.infinitic.common.fixtures.TestFactory import io.infinitic.common.fixtures.checkBackwardCompatibility import io.infinitic.common.fixtures.checkOrCreateCurrentFile @@ -31,6 +32,7 @@ import io.kotest.assertions.throwables.shouldThrowAny import io.kotest.core.spec.style.StringSpec import io.kotest.matchers.collections.shouldBeOneOf import io.kotest.matchers.shouldBe +import kotlinx.serialization.SerializationException class ClientEnvelopeTests : StringSpec( @@ -60,7 +62,12 @@ class ClientEnvelopeTests : val bytes = AvroSerDe.getRandomBinary(schema) // IllegalArgumentException is thrown because we have more than 1 message in the envelope val e = shouldThrowAny { ClientEnvelope.fromByteArray(bytes, schema) } + if (e is SerializationException) { + println(e.stackTraceToString()) + } e::class shouldBeOneOf listOf( + // ShouldNotHappenException can be thrown when deserializing ExceptionDetails + ShouldNotHappenException::class, // IllegalArgumentException is thrown because we have more than 1 message in the envelope IllegalArgumentException::class, // NullPointerException is thrown because message() can be null diff --git a/infinitic-common/src/test/kotlin/io/infinitic/common/tasks/events/ServiceEventEnvelopeTests.kt b/infinitic-common/src/test/kotlin/io/infinitic/common/tasks/events/ServiceEventEnvelopeTests.kt index ac391c58f..dab41b4cd 100644 --- a/infinitic-common/src/test/kotlin/io/infinitic/common/tasks/events/ServiceEventEnvelopeTests.kt +++ b/infinitic-common/src/test/kotlin/io/infinitic/common/tasks/events/ServiceEventEnvelopeTests.kt @@ -22,6 +22,7 @@ */ package io.infinitic.common.tasks.events +import io.infinitic.common.exceptions.ShouldNotHappenException import io.infinitic.common.fixtures.TestFactory import io.infinitic.common.fixtures.checkBackwardCompatibility import io.infinitic.common.fixtures.checkOrCreateCurrentFile @@ -71,7 +72,11 @@ class ServiceEventEnvelopeTests : "We should be able to read binary from previous version $version" { val bytes = AvroSerDe.getRandomBinary(schema) val e = shouldThrowAny { ServiceEventEnvelope.fromByteArray(bytes, schema) } + + println(e) e::class shouldBeOneOf listOf( + // ShouldNotHappenException can be thrown when deserializing ExceptionDetails + ShouldNotHappenException::class, // IllegalArgumentException is thrown because we have more than 1 message in the envelope IllegalArgumentException::class, // NullPointerException is thrown because message() can be null diff --git a/infinitic-common/src/test/kotlin/io/infinitic/common/tasks/executors/messages/ServiceExecutorEnvelopeTests.kt b/infinitic-common/src/test/kotlin/io/infinitic/common/tasks/executors/messages/ServiceExecutorEnvelopeTests.kt index f06174ee9..d17f2db2a 100644 --- a/infinitic-common/src/test/kotlin/io/infinitic/common/tasks/executors/messages/ServiceExecutorEnvelopeTests.kt +++ b/infinitic-common/src/test/kotlin/io/infinitic/common/tasks/executors/messages/ServiceExecutorEnvelopeTests.kt @@ -22,6 +22,7 @@ */ package io.infinitic.common.tasks.executors.messages +import io.infinitic.common.exceptions.ShouldNotHappenException import io.infinitic.common.fixtures.TestFactory import io.infinitic.common.fixtures.checkBackwardCompatibility import io.infinitic.common.fixtures.checkOrCreateCurrentFile @@ -95,7 +96,13 @@ class ServiceExecutorEnvelopeTests : "We should be able to read binary from previous version $version" { val bytes = AvroSerDe.getRandomBinary(schema) val e = shouldThrowAny { ServiceExecutorEnvelope.fromByteArray(bytes, schema) } + if (e is NullPointerException) { + // NullPointerException is thrown because message() can be null + println(e.stackTraceToString()) + } e::class shouldBeOneOf listOf( + // ShouldNotHappenException can be thrown when deserializing ExceptionDetails + ShouldNotHappenException::class, // IllegalArgumentException is thrown because we have more than 1 message in the envelope IllegalArgumentException::class, // NullPointerException is thrown because message() can be null diff --git a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/BatchByTests.kt b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/BatchByTests.kt index a0a8db3aa..0d82db87d 100644 --- a/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/BatchByTests.kt +++ b/infinitic-common/src/test/kotlin/io/infinitic/common/transport/consumers/BatchByTests.kt @@ -102,10 +102,9 @@ internal class BatchByTests : StringSpec( } }.join() - scope.isActive shouldBe true - // after scope cancelling scope.cancel() + // consumer channel should be closed shouldThrow { while (true) { diff --git a/infinitic-common/src/test/kotlin/io/infinitic/common/workflows/data/workflowTasks/WorkflowTaskParametersTests.kt b/infinitic-common/src/test/kotlin/io/infinitic/common/workflows/data/workflowTasks/WorkflowTaskParametersTests.kt index 3f20c162a..f7d32d09d 100644 --- a/infinitic-common/src/test/kotlin/io/infinitic/common/workflows/data/workflowTasks/WorkflowTaskParametersTests.kt +++ b/infinitic-common/src/test/kotlin/io/infinitic/common/workflows/data/workflowTasks/WorkflowTaskParametersTests.kt @@ -22,6 +22,7 @@ */ package io.infinitic.common.workflows.data.workflowTasks +import io.infinitic.common.exceptions.ShouldNotHappenException import io.infinitic.common.fixtures.TestFactory import io.infinitic.common.fixtures.checkBackwardCompatibility import io.infinitic.common.fixtures.checkOrCreateCurrentFile @@ -69,7 +70,11 @@ class WorkflowTaskParametersTests : "We should be able to read binary from version $version" { val bytes = AvroSerDe.getRandomBinaryWithSchemaFingerprint(schema) - shouldNotThrowAny { WorkflowTaskParameters.fromByteArray(bytes) } + try { + WorkflowTaskParameters.fromByteArray(bytes) + } catch (e: ShouldNotHappenException) { + // ShouldNotHappenException can be thrown when deserializing ExceptionDetails + } } } }, diff --git a/infinitic-common/src/test/kotlin/io/infinitic/common/workflows/data/workflowTasks/WorkflowTaskReturnValueTests.kt b/infinitic-common/src/test/kotlin/io/infinitic/common/workflows/data/workflowTasks/WorkflowTaskReturnValueTests.kt index 9c6a986b5..be095c8ee 100644 --- a/infinitic-common/src/test/kotlin/io/infinitic/common/workflows/data/workflowTasks/WorkflowTaskReturnValueTests.kt +++ b/infinitic-common/src/test/kotlin/io/infinitic/common/workflows/data/workflowTasks/WorkflowTaskReturnValueTests.kt @@ -22,6 +22,7 @@ */ package io.infinitic.common.workflows.data.workflowTasks +import io.infinitic.common.exceptions.ShouldNotHappenException import io.infinitic.common.fixtures.TestFactory import io.infinitic.common.fixtures.checkBackwardCompatibility import io.infinitic.common.fixtures.checkOrCreateCurrentFile @@ -70,7 +71,11 @@ class WorkflowTaskReturnValueTests : "We should be able to read binary from version $version" { val bytes = AvroSerDe.getRandomBinaryWithSchemaFingerprint(schema) - shouldNotThrowAny { WorkflowTaskReturnValue.fromByteArray(bytes) } + try { + WorkflowTaskReturnValue.fromByteArray(bytes) + } catch (e: ShouldNotHappenException) { + // do nothing + } } } }, diff --git a/infinitic-common/src/test/kotlin/io/infinitic/common/workflows/engine/messages/WorkflowEngineEnvelopeTests.kt b/infinitic-common/src/test/kotlin/io/infinitic/common/workflows/engine/messages/WorkflowEngineEnvelopeTests.kt index 68b10b3c8..b9866cfd5 100644 --- a/infinitic-common/src/test/kotlin/io/infinitic/common/workflows/engine/messages/WorkflowEngineEnvelopeTests.kt +++ b/infinitic-common/src/test/kotlin/io/infinitic/common/workflows/engine/messages/WorkflowEngineEnvelopeTests.kt @@ -22,6 +22,7 @@ */ package io.infinitic.common.workflows.engine.messages +import io.infinitic.common.exceptions.ShouldNotHappenException import io.infinitic.common.fixtures.TestFactory import io.infinitic.common.fixtures.checkBackwardCompatibility import io.infinitic.common.fixtures.checkOrCreateCurrentFile @@ -78,6 +79,8 @@ class WorkflowEngineEnvelopeTests : val bytes = AvroSerDe.getRandomBinary(schema) val e = shouldThrowAny { WorkflowEngineEnvelope.fromByteArray(bytes, schema) } e::class shouldBeOneOf listOf( + // ShouldNotHappenException can be thrown when deserializing ExceptionDetails + ShouldNotHappenException::class, // IllegalArgumentException is thrown because we have more than 1 message in the envelope IllegalArgumentException::class, // NullPointerException is thrown because message() can be null diff --git a/infinitic-common/src/test/kotlin/io/infinitic/common/workflows/engine/messages/WorkflowEventEnvelopeTests.kt b/infinitic-common/src/test/kotlin/io/infinitic/common/workflows/engine/messages/WorkflowEventEnvelopeTests.kt index 9af3a6365..71edaf467 100644 --- a/infinitic-common/src/test/kotlin/io/infinitic/common/workflows/engine/messages/WorkflowEventEnvelopeTests.kt +++ b/infinitic-common/src/test/kotlin/io/infinitic/common/workflows/engine/messages/WorkflowEventEnvelopeTests.kt @@ -22,6 +22,7 @@ */ package io.infinitic.common.workflows.engine.messages +import io.infinitic.common.exceptions.ShouldNotHappenException import io.infinitic.common.fixtures.TestFactory import io.infinitic.common.fixtures.checkBackwardCompatibility import io.infinitic.common.fixtures.checkOrCreateCurrentFile @@ -69,6 +70,8 @@ class WorkflowEventEnvelopeTests : val bytes = AvroSerDe.getRandomBinary(schema) val e = shouldThrowAny { WorkflowEventEnvelope.fromByteArray(bytes, schema) } e::class shouldBeOneOf listOf( + // ShouldNotHappenException can be thrown when deserializing ExceptionDetails + ShouldNotHappenException::class, // IllegalArgumentException is thrown because we have more than 1 message in the envelope IllegalArgumentException::class, // NullPointerException is thrown because message() can be null diff --git a/infinitic-common/src/test/kotlin/io/infinitic/common/workflows/engine/state/WorkflowStateTests.kt b/infinitic-common/src/test/kotlin/io/infinitic/common/workflows/engine/state/WorkflowStateTests.kt index 5a9389a6b..62a311216 100644 --- a/infinitic-common/src/test/kotlin/io/infinitic/common/workflows/engine/state/WorkflowStateTests.kt +++ b/infinitic-common/src/test/kotlin/io/infinitic/common/workflows/engine/state/WorkflowStateTests.kt @@ -22,6 +22,7 @@ */ package io.infinitic.common.workflows.engine.state +import io.infinitic.common.exceptions.ShouldNotHappenException import io.infinitic.common.fixtures.TestFactory import io.infinitic.common.fixtures.checkBackwardCompatibility import io.infinitic.common.fixtures.checkOrCreateCurrentFile @@ -54,7 +55,11 @@ class WorkflowStateTests : getAllSchemas(WorkflowState::class).forEach { (_, schema) -> val bytes = getRandomBinaryWithSchemaFingerprint(schema) - shouldNotThrowAny { WorkflowState.fromByteArray(bytes) } + try { + WorkflowState.fromByteArray(bytes) + } catch (e: ShouldNotHappenException) { + // we ignore this exception + } } } }, diff --git a/infinitic-common/src/testFixtures/kotlin/io/infinitic/common/fixtures/TestFactory.kt b/infinitic-common/src/testFixtures/kotlin/io/infinitic/common/fixtures/TestFactory.kt index b0961d3f1..6ef4c6af4 100644 --- a/infinitic-common/src/testFixtures/kotlin/io/infinitic/common/fixtures/TestFactory.kt +++ b/infinitic-common/src/testFixtures/kotlin/io/infinitic/common/fixtures/TestFactory.kt @@ -26,18 +26,22 @@ import io.infinitic.common.data.MessageId import io.infinitic.common.data.Version import io.infinitic.common.data.methods.MethodArgs import io.infinitic.common.serDe.SerializedData +import io.infinitic.common.tasks.data.ServiceName import io.infinitic.common.tasks.events.messages.ServiceEventEnvelope import io.infinitic.common.tasks.events.messages.ServiceExecutorEventMessage import io.infinitic.common.tasks.executors.errors.DeferredError -import io.infinitic.common.tasks.executors.errors.ExecutionError import io.infinitic.common.workflows.data.commands.CommandId import io.infinitic.common.workflows.data.steps.NewStep import io.infinitic.common.workflows.data.steps.Step import io.infinitic.common.workflows.data.steps.StepStatus +import io.infinitic.common.workflows.data.workflows.WorkflowName import io.infinitic.common.workflows.engine.messages.WorkflowEngineEnvelope import io.infinitic.common.workflows.engine.messages.WorkflowEventEnvelope import io.infinitic.common.workflows.engine.messages.WorkflowStateEngineMessage import io.infinitic.common.workflows.engine.messages.WorkflowStateEventMessage +import io.infinitic.exceptions.GenericException +import io.infinitic.tasks.TaskFailure +import org.apache.commons.lang3.RandomStringUtils import org.jeasy.random.EasyRandom import org.jeasy.random.EasyRandomParameters import org.jeasy.random.FieldPredicates @@ -71,13 +75,26 @@ object TestFactory { .randomize(StepStatus.Completed::class.java) { StepStatus.Completed(random(), random()) } .randomize(ByteArray::class.java) { Random(seed).nextBytes(10) } .randomize(ByteBuffer::class.java) { ByteBuffer.wrap(random()) } - .randomize(ExecutionError::class.java) { - ExecutionError(random(), random(), random(), random(), null) - } .randomize(Version::class.java) { Version(random()) } + .randomize(ServiceName::class.java) { ServiceName(RandomStringUtils.random(10)) } + .randomize(WorkflowName::class.java) { WorkflowName(RandomStringUtils.random(10)) } .randomize(SerializedData::class.java) { SerializedData.encode(random(), String::class.java, null) } + .randomize(TaskFailure::class.java) { + TaskFailure( + workerName = random(), + retrySequence = random(), + retryIndex = random(), + secondsBeforeRetry = random(), + stackTraceString = random(), + exception = random(), + previousFailure = null, + ) + } + .randomize(GenericException::class.java) { + GenericException(random(), random(), emptyMap(), null) + } .randomize(MethodArgs::class.java) { methodParametersFrom(random(), random()) } diff --git a/infinitic-common/src/testFixtures/kotlin/io/infinitic/common/fixtures/schemasHelpers.kt b/infinitic-common/src/testFixtures/kotlin/io/infinitic/common/fixtures/schemasHelpers.kt index 46e7f06d0..52d27582a 100644 --- a/infinitic-common/src/testFixtures/kotlin/io/infinitic/common/fixtures/schemasHelpers.kt +++ b/infinitic-common/src/testFixtures/kotlin/io/infinitic/common/fixtures/schemasHelpers.kt @@ -39,9 +39,9 @@ import kotlin.reflect.KClass fun checkOrCreateCurrentFile(klass: KClass, serializer: KSerializer) { // Non-release version are not saved to the sources if (isRelease) { - val schemasFolderPath = File(System.getProperty("resourcePath"), "/$SCHEMAS_FOLDER/") + val schemasFolderPath = File(System.getProperty("resourcePath"), "/$SCHEMAS_FOLDER") val schemaFilePath = File( - "$schemasFolderPath${getSchemaFilePrefix(klass)}-$currentVersion.avsc", + "$schemasFolderPath/${getSchemaFilePrefix(klass)}-$currentVersion.avsc", ) val schema = AvroSerDe.currentSchema(serializer).toString(true) diff --git a/infinitic-task-executor/src/main/kotlin/io/infinitic/tasks/executor/TaskExecutor.kt b/infinitic-task-executor/src/main/kotlin/io/infinitic/tasks/executor/TaskExecutor.kt index 6f12010ed..d4abe5158 100644 --- a/infinitic-task-executor/src/main/kotlin/io/infinitic/tasks/executor/TaskExecutor.kt +++ b/infinitic-task-executor/src/main/kotlin/io/infinitic/tasks/executor/TaskExecutor.kt @@ -79,17 +79,45 @@ import java.lang.reflect.Type import java.util.concurrent.TimeoutException import kotlin.reflect.jvm.javaMethod + +/** + * Handles the execution of workflow and service tasks. The `TaskExecutor` coordinates + * task execution by processing messages related to workflow or service tasks, adhering + * to specific timeout and retry policies defined for each task type. + * + * This executor leverages a registry, a producer, and a client to: + * - Retrieve configuration settings and task-specific executors + * - Send task-related messages such as "task started", "task failed", or "task completed" + * - Execute tasks directly or in batches, depending on task type + * + * The class supports detailed handling of timeouts, retries, and exceptions during task execution. + */ class TaskExecutor( private val registry: ExecutorRegistryInterface, private val producer: InfiniticProducer, private val client: InfiniticClientInterface ) { + + /** + * Processes a given message of type `ServiceExecutorMessage`. + * If the message is an instance of `ExecuteTask`, it delegates the processing to the `process` method of `ExecuteTask`. + */ suspend fun process(msg: ServiceExecutorMessage) { when (msg) { is ExecuteTask -> msg.process() } } + + /** + * Processes a batch of service execution messages. Depending on the type of task represented + * by each message (workflow task or service task), the messages are processed either in parallel + * or using a batch processing method. If a batch method is not available for service tasks, + * the tasks are processed individually. + * + * @param messages a list of `ServiceExecutorMessage` objects to be processed. Each message + * represents either a workflow task or a service task associated with a specific service and method. + */ suspend fun batchProcess(messages: List) { val executeTasks = messages.map { when (it) { @@ -122,6 +150,11 @@ class TaskExecutor( } } + + /** + * Retrieves the batch key associated with the given service executor message. + * If the message is an instance of `ExecuteTask`, it delegates the retrieval to the `getBatchKey` method of `ExecuteTask`. + */ fun getBatchKey(msg: ServiceExecutorMessage): String? = when (msg) { is ExecuteTask -> msg.getBatchKey() } @@ -164,6 +197,7 @@ class TaskExecutor( private suspend fun ExecuteTask.process() { logDebug { "Start processing $this" } + // Signal that the task has started sendTaskStarted() @@ -482,7 +516,7 @@ class TaskExecutor( ) { logWarn(cause) { "Retrying in $delay" } - val executeTask = ExecuteTask.retryFrom(this, emitterName, cause, meta) + val executeTask = ExecuteTask.retryFrom(this, emitterName, delay, cause, meta) with(producer) { executeTask.sendTo(ServiceExecutorRetryTopic, delay) } // once sent, we publish the event @@ -568,7 +602,7 @@ class TaskExecutor( workflowVersion = requester.workflowVersion, retrySequence = taskRetrySequence, retryIndex = taskRetryIndex, - lastError = lastError, + lastError = lastFailure, tags = taskTags.map { it.tag }.toSet(), meta = taskMeta.map.toMutableMap(), withRetry = withRetry, diff --git a/infinitic-task-executor/src/main/kotlin/io/infinitic/tasks/executor/task/TaskContextImpl.kt b/infinitic-task-executor/src/main/kotlin/io/infinitic/tasks/executor/task/TaskContextImpl.kt index 513629c92..ec61f20ef 100644 --- a/infinitic-task-executor/src/main/kotlin/io/infinitic/tasks/executor/task/TaskContextImpl.kt +++ b/infinitic-task-executor/src/main/kotlin/io/infinitic/tasks/executor/task/TaskContextImpl.kt @@ -29,11 +29,11 @@ import io.infinitic.common.tasks.data.TaskId import io.infinitic.common.tasks.data.TaskMeta import io.infinitic.common.tasks.data.TaskRetryIndex import io.infinitic.common.tasks.data.TaskRetrySequence -import io.infinitic.common.tasks.executors.errors.ExecutionError import io.infinitic.common.workers.config.WorkflowVersion import io.infinitic.common.workflows.data.workflows.WorkflowId import io.infinitic.common.workflows.data.workflows.WorkflowName import io.infinitic.tasks.TaskContext +import io.infinitic.tasks.TaskFailure import io.infinitic.tasks.WithRetry import io.infinitic.tasks.WithTimeout @@ -47,7 +47,7 @@ data class TaskContextImpl( override val workflowVersion: WorkflowVersion?, override val retrySequence: TaskRetrySequence, override val retryIndex: TaskRetryIndex, - override val lastError: ExecutionError?, + override val lastError: TaskFailure?, override val tags: Set, override val meta: MutableMap, override val withTimeout: WithTimeout?, diff --git a/infinitic-task-executor/src/test/kotlin/io/infinitic/tasks/executor/TaskEventHandlerTests.kt b/infinitic-task-executor/src/test/kotlin/io/infinitic/tasks/executor/TaskEventHandlerTests.kt index b19de77b8..5e7f7b53b 100644 --- a/infinitic-task-executor/src/test/kotlin/io/infinitic/tasks/executor/TaskEventHandlerTests.kt +++ b/infinitic-task-executor/src/test/kotlin/io/infinitic/tasks/executor/TaskEventHandlerTests.kt @@ -285,7 +285,7 @@ private fun getTaskRetried(requester: Requester) = TaskRetriedEvent( taskRetryIndex = TaskRetryIndex(7), requester = requester, clientWaiting = null, - lastError = TestFactory.random(), + failure = TestFactory.random(), taskMeta = TestFactory.random(), taskTags = TestFactory.random(), taskRetryDelay = TestFactory.random(), @@ -357,7 +357,7 @@ private fun getTaskFailed(requester: Requester) = TaskFailedEvent( clientWaiting = null, taskTags = TestFactory.random(), taskMeta = TestFactory.random(), - executionError = TestFactory.random(), + failure = TestFactory.random(), deferredError = TestFactory.random(), ) @@ -365,7 +365,7 @@ private fun getTaskFailedClient(msg: TaskFailedEvent) = io.infinitic.common.clients.messages.TaskFailed( recipientName = msg.requester.clientName!!, taskId = msg.taskId, - cause = msg.executionError, + cause = msg.failure, emitterName = testEmitterName, ) @@ -380,7 +380,7 @@ private fun getTaskFailedWorkflow(msg: TaskFailedEvent) = serviceName = msg.serviceName, methodName = msg.methodName, taskId = msg.taskId, - cause = msg.executionError, + lastFailure = msg.failure, ), deferredError = msg.deferredError, emitterName = testEmitterName, diff --git a/infinitic-task-executor/src/test/kotlin/io/infinitic/tasks/executor/TaskExecutorTests.kt b/infinitic-task-executor/src/test/kotlin/io/infinitic/tasks/executor/TaskExecutorTests.kt index a78c7a2e9..3da0d1033 100644 --- a/infinitic-task-executor/src/test/kotlin/io/infinitic/tasks/executor/TaskExecutorTests.kt +++ b/infinitic-task-executor/src/test/kotlin/io/infinitic/tasks/executor/TaskExecutorTests.kt @@ -505,8 +505,8 @@ fun ExecuteTask.check( methodName shouldBe msg.methodName taskMeta shouldBe TaskMeta(meta) taskId shouldBe msg.taskId - lastError!!.name shouldBe errorName - lastError!!.workerName shouldBe WorkerName.from(msg.emitterName) + lastFailure!!.exception!!.name shouldBe errorName + lastFailure!!.workerName shouldBe msg.emitterName.toString() } fun TaskFailedEvent.check( @@ -521,8 +521,8 @@ fun TaskFailedEvent.check( taskRetryIndex shouldBe msg.taskRetryIndex requester shouldBe msg.requester clientWaiting shouldBe msg.clientWaiting - executionError.name shouldBe errorName - executionError.workerName shouldBe WorkerName.from(msg.emitterName) + failure.exception!!.name shouldBe errorName + failure.workerName shouldBe msg.emitterName.toString() methodName shouldBe msg.methodName taskMeta shouldBe TaskMeta(meta) taskId shouldBe msg.taskId @@ -538,13 +538,13 @@ fun TaskRetriedEvent.check( taskId shouldBe msg.taskId emitterName shouldBe testEmitterName taskRetrySequence shouldBe msg.taskRetrySequence - taskRetryIndex shouldBe msg.taskRetryIndex + 1 + taskRetryIndex shouldBe msg.taskRetryIndex requester shouldBe msg.requester taskTags shouldBe msg.taskTags taskMeta shouldBe TaskMeta(meta) taskRetryDelay shouldBe delay - lastError.name shouldBe errorName - lastError.workerName shouldBe WorkerName.from(msg.emitterName) + failure.exception!!.name shouldBe errorName + failure.workerName shouldBe msg.emitterName.toString() } internal fun getExecuteTask(method: String, input: Array, types: List?) = @@ -561,7 +561,7 @@ internal fun getExecuteTask(method: String, input: Array, types: List< methodName = MethodName(method), methodParameterTypes = types?.let { MethodParameterTypes(it) }, methodArgs = methodParametersFrom(*input), - lastError = null, + lastFailure = null, ) private fun getTaskStarted(msg: ExecuteTask, messageId: MessageId) = TaskStartedEvent( diff --git a/infinitic-tests/src/test/kotlin/io/infinitic/tests/errors/ErrorsWorkflow.kt b/infinitic-tests/src/test/kotlin/io/infinitic/tests/errors/ErrorsWorkflow.kt index f72a3eaa4..29494aa82 100644 --- a/infinitic-tests/src/test/kotlin/io/infinitic/tests/errors/ErrorsWorkflow.kt +++ b/infinitic-tests/src/test/kotlin/io/infinitic/tests/errors/ErrorsWorkflow.kt @@ -41,6 +41,10 @@ interface ErrorsWorkflow { fun failingWithException() + fun failingWithCustomException() + + fun failingWithNestedCustomException() + fun failingWithThrowable() fun failing2a(): Long @@ -118,6 +122,10 @@ class ErrorsWorkflowImpl : Workflow(), ErrorsWorkflow { override fun failingWithException() = utilService.failingWithException() + override fun failingWithCustomException() = utilService.failingWithCustomException() + + override fun failingWithNestedCustomException() = utilService.failingWithNestedCustomException() + override fun failingWithThrowable() = utilService.failingWithThrowable() override fun failing2a(): Long { @@ -187,7 +195,7 @@ class ErrorsWorkflowImpl : Workflow(), ErrorsWorkflow { } catch (e: WorkflowFailedException) { val deferredException = e.deferredException as TaskFailedException utilService.await(100) - deferredException.workerException.name + deferredException.lastFailure.exception!!.name } override fun failing8() = utilService.successAtRetry() diff --git a/infinitic-tests/src/test/kotlin/io/infinitic/tests/errors/ErrorsWorkflowTests.kt b/infinitic-tests/src/test/kotlin/io/infinitic/tests/errors/ErrorsWorkflowTests.kt index 29d4392cc..a125695f9 100644 --- a/infinitic-tests/src/test/kotlin/io/infinitic/tests/errors/ErrorsWorkflowTests.kt +++ b/infinitic-tests/src/test/kotlin/io/infinitic/tests/errors/ErrorsWorkflowTests.kt @@ -26,10 +26,11 @@ import io.infinitic.Test import io.infinitic.common.fixtures.later import io.infinitic.exceptions.TaskFailedException import io.infinitic.exceptions.WorkflowCanceledException +import io.infinitic.exceptions.WorkflowExecutorException import io.infinitic.exceptions.WorkflowFailedException -import io.infinitic.exceptions.WorkflowTaskFailedException import io.infinitic.exceptions.WorkflowUnknownException import io.infinitic.tests.channels.ChannelsWorkflow +import io.infinitic.utils.CustomException import io.infinitic.utils.UtilService import io.infinitic.utils.UtilWorkflow import io.infinitic.workflows.DeferredStatus @@ -64,7 +65,7 @@ internal class ErrorsWorkflowTests : val e = shouldThrow { errorsWorkflow.failing0() } - e.deferredException shouldBe instanceOf() + e.deferredException shouldBe instanceOf() } "try/catch a failing task" { @@ -76,7 +77,38 @@ internal class ErrorsWorkflowTests : val taskException = error.deferredException as TaskFailedException taskException.serviceName shouldBe UtilService::class.java.name - taskException.workerException.name shouldBe Exception::class.java.name + taskException.lastFailure.exception!!.name shouldBe Exception::class.java.name + } + + "I can retrieve custom properties from Custom Exception" { + val error = shouldThrow { + errorsWorkflow.failingWithCustomException() + } + + val taskFailed = error.deferredException as TaskFailedException + taskFailed.serviceName shouldBe UtilService::class.java.name + + val detail = taskFailed.lastFailure.exception!! + detail.name shouldBe CustomException::class.java.name + detail.getCustomProperty("customString") shouldBe CustomException().customString + detail.getCustomProperty("customList") shouldBe CustomException().customList + + // retrieve also the previous failure + taskFailed.lastFailure.previousFailure!!.exception!!.name shouldBe CustomException::class.java.name + } + + "I can retrieve custom properties from nested Custom Exception" { + val error = shouldThrow { + errorsWorkflow.failingWithNestedCustomException() + } + + val taskFailed = error.deferredException as TaskFailedException + taskFailed.serviceName shouldBe UtilService::class.java.name + + val detail = taskFailed.lastFailure.exception!!.cause!! + detail.name shouldBe CustomException::class.java.name + detail.getCustomProperty("customString") shouldBe CustomException().customString + detail.getCustomProperty("customList") shouldBe CustomException().customList } // This test checks that a throwable triggering a message sent to DLQ is correctly handle by the engine diff --git a/infinitic-tests/src/test/kotlin/io/infinitic/tests/inline/InlineWorkflowTests.kt b/infinitic-tests/src/test/kotlin/io/infinitic/tests/inline/InlineWorkflowTests.kt index 03dfd42c2..6263da214 100644 --- a/infinitic-tests/src/test/kotlin/io/infinitic/tests/inline/InlineWorkflowTests.kt +++ b/infinitic-tests/src/test/kotlin/io/infinitic/tests/inline/InlineWorkflowTests.kt @@ -23,8 +23,8 @@ package io.infinitic.tests.inline import io.infinitic.Test +import io.infinitic.exceptions.WorkflowExecutorException import io.infinitic.exceptions.WorkflowFailedException -import io.infinitic.exceptions.WorkflowTaskFailedException import io.infinitic.exceptions.workflows.InvalidInlineException import io.kotest.assertions.throwables.shouldThrow import io.kotest.core.spec.style.StringSpec @@ -47,15 +47,15 @@ internal class InlineWorkflowTests : "Inline task with asynchronous task inside" { val error = shouldThrow { inlineWorkflow.inline2(21) } - val deferredException = error.deferredException as WorkflowTaskFailedException - deferredException.workerException.name shouldBe InvalidInlineException::class.java.name + val workflowTaskFailed = error.deferredException as WorkflowExecutorException + workflowTaskFailed.lastFailure.exception!!.name shouldBe InvalidInlineException::class.java.name } "Inline task with synchronous task inside" { val error = shouldThrow { inlineWorkflow.inline3(14) } - val deferredException = error.deferredException as WorkflowTaskFailedException - deferredException.workerException.name shouldBe InvalidInlineException::class.java.name + val workflowTaskFailed = error.deferredException as WorkflowExecutorException + workflowTaskFailed.lastFailure.exception!!.name shouldBe InvalidInlineException::class.java.name } }, ) diff --git a/infinitic-tests/src/test/kotlin/io/infinitic/tests/properties/PropertiesWorkflowTests.kt b/infinitic-tests/src/test/kotlin/io/infinitic/tests/properties/PropertiesWorkflowTests.kt index 482d9e5ba..da7ea4493 100644 --- a/infinitic-tests/src/test/kotlin/io/infinitic/tests/properties/PropertiesWorkflowTests.kt +++ b/infinitic-tests/src/test/kotlin/io/infinitic/tests/properties/PropertiesWorkflowTests.kt @@ -23,8 +23,8 @@ package io.infinitic.tests.properties import io.infinitic.Test +import io.infinitic.exceptions.WorkflowExecutorException import io.infinitic.exceptions.WorkflowFailedException -import io.infinitic.exceptions.WorkflowTaskFailedException import io.infinitic.workflows.Deferred import io.kotest.assertions.throwables.shouldThrow import io.kotest.core.spec.style.StringSpec @@ -66,9 +66,10 @@ internal class PropertiesWorkflowTests : val e = shouldThrow { propertiesWorkflow.prop6() shouldBe "abab" } - e.deferredException shouldBe instanceOf() - val workerException = (e.deferredException as WorkflowTaskFailedException).workerException - workerException.message shouldContain Deferred::class.java.name + e.deferredException shouldBe instanceOf() + val exception = + (e.deferredException as WorkflowExecutorException).lastFailure.exception!! + exception.message shouldContain Deferred::class.java.name } /** @@ -78,9 +79,10 @@ internal class PropertiesWorkflowTests : val e = shouldThrow { propertiesWorkflow.prop6bis() shouldBe "abab" } - e.deferredException shouldBe instanceOf() - val workerException = (e.deferredException as WorkflowTaskFailedException).workerException - workerException.message shouldContain Deferred::class.java.name + e.deferredException shouldBe instanceOf() + val exception = + (e.deferredException as WorkflowExecutorException).lastFailure.exception!! + exception.message shouldContain Deferred::class.java.name } /** @@ -90,9 +92,10 @@ internal class PropertiesWorkflowTests : val e = shouldThrow { propertiesWorkflow.prop7() shouldBe "abab" } - e.deferredException shouldBe instanceOf() - val workerException = (e.deferredException as WorkflowTaskFailedException).workerException - workerException.message shouldContain Deferred::class.java.name + e.deferredException shouldBe instanceOf() + val exception = + (e.deferredException as WorkflowExecutorException).lastFailure.exception!! + exception.message shouldContain Deferred::class.java.name } /** @@ -102,9 +105,10 @@ internal class PropertiesWorkflowTests : val e = shouldThrow { propertiesWorkflow.prop7bis() shouldBe "abab" } - e.deferredException shouldBe instanceOf() - val workerException = (e.deferredException as WorkflowTaskFailedException).workerException - workerException.message shouldContain Deferred::class.java.name + e.deferredException shouldBe instanceOf() + val exception = + (e.deferredException as WorkflowExecutorException).lastFailure.exception!! + exception.message shouldContain Deferred::class.java.name } "Check prop8" { diff --git a/infinitic-tests/src/test/kotlin/io/infinitic/tests/syntax/SyntaxWorkflow.kt b/infinitic-tests/src/test/kotlin/io/infinitic/tests/syntax/SyntaxWorkflow.kt index eee4e7389..9a7deeab6 100644 --- a/infinitic-tests/src/test/kotlin/io/infinitic/tests/syntax/SyntaxWorkflow.kt +++ b/infinitic-tests/src/test/kotlin/io/infinitic/tests/syntax/SyntaxWorkflow.kt @@ -26,7 +26,7 @@ import io.infinitic.utils.ParentInterface import io.infinitic.utils.UtilService import io.infinitic.workflows.Workflow -interface SyntaxWorkflow : ParentInterface { +internal interface SyntaxWorkflow : ParentInterface { fun empty(): String fun await(duration: Long): Long diff --git a/infinitic-tests/src/test/kotlin/io/infinitic/utils/UtilService.kt b/infinitic-tests/src/test/kotlin/io/infinitic/utils/UtilService.kt index fa8b2af73..734b58275 100644 --- a/infinitic-tests/src/test/kotlin/io/infinitic/utils/UtilService.kt +++ b/infinitic-tests/src/test/kotlin/io/infinitic/utils/UtilService.kt @@ -31,11 +31,11 @@ import io.infinitic.common.tasks.data.TaskMeta import io.infinitic.tasks.Task import io.infinitic.workflows.DeferredStatus -interface ParentInterface { +internal interface ParentInterface { fun parent(): String } -interface UtilService : ParentInterface { +internal interface UtilService : ParentInterface { fun concat(str1: String, str2: String): String fun reverse(str: String): String @@ -52,6 +52,10 @@ interface UtilService : ParentInterface { fun failingWithException() + fun failingWithCustomException() + + fun failingWithNestedCustomException() + fun failingWithThrowable() fun successAtRetry(): String @@ -77,7 +81,7 @@ interface UtilService : ParentInterface { } @Retry(Only1Retry::class) -class UtilServiceImpl : UtilService { +internal class UtilServiceImpl : UtilService { override fun concat(str1: String, str2: String): String = str1 + str2 override fun reverse(str: String) = str.reversed() @@ -105,6 +109,10 @@ class UtilServiceImpl : UtilService { override fun failingWithException() = throw Exception("sorry") + override fun failingWithCustomException() = throw CustomException() + + override fun failingWithNestedCustomException() = throw Exception(CustomException()) + override fun failingWithThrowable() = throw Throwable("really sorry") @Retry(NoRetry::class) @@ -148,3 +156,10 @@ class UtilServiceImpl : UtilService { lateinit var delegatedServiceName: String } } + +internal class CustomException( + val customString: String = "42", + val customInt: Int = 42, + val customDouble: Double = 42.0, + val customList: List = List(10) { it } +) : Exception() diff --git a/infinitic-workflow-engine/src/main/kotlin/io/infinitic/workflows/engine/handlers/workflowTaskFailed.kt b/infinitic-workflow-engine/src/main/kotlin/io/infinitic/workflows/engine/handlers/workflowTaskFailed.kt index c7b72b664..8ac9f40b7 100644 --- a/infinitic-workflow-engine/src/main/kotlin/io/infinitic/workflows/engine/handlers/workflowTaskFailed.kt +++ b/infinitic-workflow-engine/src/main/kotlin/io/infinitic/workflows/engine/handlers/workflowTaskFailed.kt @@ -25,11 +25,9 @@ package io.infinitic.workflows.engine.handlers import io.infinitic.common.emitters.EmitterName import io.infinitic.common.exceptions.thisShouldNotHappen import io.infinitic.common.tasks.executors.errors.DeferredError -import io.infinitic.common.tasks.executors.errors.ExecutionError -import io.infinitic.common.tasks.executors.errors.WorkflowTaskFailedError +import io.infinitic.common.tasks.executors.errors.WorkflowExecutorError import io.infinitic.common.transport.WorkflowStateEventTopic import io.infinitic.common.transport.interfaces.InfiniticProducer -import io.infinitic.common.workers.data.WorkerName import io.infinitic.common.workflows.data.workflowMethods.awaitingRequesters import io.infinitic.common.workflows.engine.messages.MethodFailedEvent import io.infinitic.common.workflows.engine.messages.RemoteTaskFailed @@ -48,19 +46,11 @@ internal fun CoroutineScope.workflowTaskFailed( val deferredError: DeferredError = when (val deferredError = message.deferredError) { // an Exception has thrown in the workflow task - null -> WorkflowTaskFailedError( + null -> WorkflowExecutorError( workflowName = message.workflowName, workflowId = message.workflowId, workflowTaskId = message.taskId(), - cause = with(message.taskFailedError.cause) { - ExecutionError( - workerName = WorkerName.from(message.emitterName), - name = name, - message = this.message, - stackTraceToString = stackTraceToString, - cause = cause, - ) - }, + lastFailure = message.taskFailedError.lastFailure, ) // a deferred Exception has thrown in the workflow task else -> deferredError diff --git a/pulsar/docker-compose.yml b/pulsar/docker-compose.yml index 6ed0ba2a1..8cb75dbf8 100644 --- a/pulsar/docker-compose.yml +++ b/pulsar/docker-compose.yml @@ -22,48 +22,52 @@ # Licensor: infinitic.io services: - # Pulsar settings pulsar: - image: apachepulsar/pulsar:2.11.2 - environment: - - BOOKIE_MEM=" -Xms512m -Xmx512m -XX:MaxDirectMemorySize=1g" - command: > - /bin/bash -c "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone" - volumes: - - "pulsardata:/pulsar/data" - - "pulsarconf:/pulsar/conf" - - "../infinitic-pulsar/build/libs:/infinitic/libs:delegated" - - "../infinitic-pulsar/build/schemas:/infinitic/schemas:delegated" + image: apachepulsar/pulsar:3.0.7 + command: sh -c "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone" + hostname: pulsar ports: - - "6650:6650" - - "8080:8080" - - "8081:8081" + - '8080:8080' + - '6650:6650' restart: unless-stopped + volumes: + - './data/pulsar/data/:/pulsar/data' + healthcheck: + test: [ 'CMD', 'bin/pulsar-admin', 'brokers', 'healthcheck' ] + interval: 5s + timeout: 10s + retries: 10 + environment: + - 'PULSAR_PREFIX_advertisedAddress=pulsar' + - 'PULSAR_PREFIX_advertisedListeners=internal:pulsar://pulsar:6650,external:pulsar://127.0.0.1:6650' + - 'PULSAR_STANDALONE_USE_ZOOKEEPER=1' - # Pulsar Manager - dashboard: - image: apachepulsar/pulsar-manager:v0.3.0 + streamvisor: + image: ghcr.io/streamvisor/streamvisor:3.1.1 ports: - - "9527:9527" - - "7750:7750" + - '8888:8888' + - '7750:7750' + volumes: + - './data/streamvisor/data/:/streamvisor/data' depends_on: - - pulsar + pulsar: + condition: service_healthy links: - pulsar - volumes: - - "pulsardata:/data" environment: - SPRING_CONFIGURATION_FILE: /pulsar-manager/pulsar-manager/application.properties + - 'STREAMVISOR_ENVIRONMENTS_0_NAME=standalone' + - 'STREAMVISOR_ENVIRONMENTS_0_SERVICEURL=http://pulsar:8080' + - 'STREAMVISOR_ENVIRONMENTS_0_BROKERURL=pulsar://pulsar:6650' - # Redis storage for state persistence - redis: - image: redis:6.0-alpine - ports: - - "6379:6379" - volumes: - - "redisdata:/data" - -volumes: - pulsardata: - pulsarconf: - redisdata: + data-faker: + image: ghcr.io/streamvisor/data-faker:1.2.1 + profiles: + - demo + depends_on: + pulsar: + condition: service_healthy + links: + - pulsar + environment: + - 'PULSAR_ADMIN_URL=http://pulsar:8080' + - 'PULSAR_BROKER_SERVICEURL=pulsar://pulsar:6650'