Skip to content

Commit

Permalink
event listener implementation - WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
geomagilles committed Jan 20, 2024
1 parent 9435868 commit 7e36b5e
Show file tree
Hide file tree
Showing 28 changed files with 562 additions and 584 deletions.
1 change: 1 addition & 0 deletions buildSrc/src/main/kotlin/Libs.kt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ object Libs {
private const val version = "2.5.0"
const val core = "io.cloudevents:cloudevents-core:$version"
const val api = "io.cloudevents:cloudevents-api:$version"
const val json = "io.cloudevents:cloudevents-json-jackson:$version"
}

object JsonPath {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ data class MillisInstant(val long: Long = 0) : Comparable<Long> {

override fun toString() = "$long"

fun toInstant() = Instant.ofEpochSecond(long)

override operator fun compareTo(other: Long): Int = this.long.compareTo(other)

operator fun plus(other: MillisDuration) = MillisInstant(this.long + other.long)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,160 +32,164 @@ import io.infinitic.exceptions.serialization.JsonDeserializationException
import io.infinitic.exceptions.serialization.KotlinDeserializationException
import io.infinitic.exceptions.serialization.MissingMetaJavaClassException
import io.infinitic.exceptions.serialization.SerializerNotFoundException
import kotlinx.serialization.*
import kotlinx.serialization.InternalSerializationApi
import kotlinx.serialization.KSerializer
import kotlinx.serialization.Serializable
import kotlinx.serialization.SerializationException
import kotlinx.serialization.serializerOrNull
import java.math.BigInteger
import java.security.MessageDigest
import io.infinitic.common.serDe.json.Json as JsonJackson

@Serializable
@AvroNamespace("io.infinitic.data")
data class SerializedData(
var bytes: ByteArray,
@AvroNamespace("io.infinitic.data") var type: SerializedDataType,
val meta: Map<String, ByteArray> = mapOf()
var bytes: ByteArray,
@AvroNamespace("io.infinitic.data") var type: SerializedDataType,
val meta: Map<String, ByteArray> = mapOf()
) {
companion object {
// DO NOT CHANGE THOSE VALUES
private const val WORKFLOW_TASK_PARAMETERS = "WorkflowTaskParameters"
private const val WORKFLOW_TASK_RETURN_VALUE = "WorkflowTaskReturnValue"

// meta key containing the name of the serialized java class
const val META_JAVA_CLASS = "javaClass"

// use a less obvious key than "type" for polymorphic data, to avoid collusion
private val jsonKotlin = kotlinx.serialization.json.Json {
classDiscriminator = "#klass"
ignoreUnknownKeys = true
companion object {
// DO NOT CHANGE THOSE VALUES
private const val WORKFLOW_TASK_PARAMETERS = "WorkflowTaskParameters"
private const val WORKFLOW_TASK_RETURN_VALUE = "WorkflowTaskReturnValue"

// meta key containing the name of the serialized java class
const val META_JAVA_CLASS = "javaClass"

// use a less obvious key than "type" for polymorphic data, to avoid collusion
private val jsonKotlin = kotlinx.serialization.json.Json {
classDiscriminator = "#klass"
ignoreUnknownKeys = true
}

private fun String.toBytes(): ByteArray = toByteArray(charset = Charsets.UTF_8)

private fun Any.getClassInBytes(): ByteArray = this::class.java.name.toBytes()

/** @return serialized value */
fun <T : Any> from(value: T?): SerializedData {
val bytes: ByteArray
val type: SerializedDataType
val meta: Map<String, ByteArray>

when (value) {
null -> {
type = SerializedDataType.NULL
bytes = "".toByteArray()
meta = mutableMapOf()
}

private fun String.toBytes(): ByteArray = toByteArray(charset = Charsets.UTF_8)

private fun Any.getClassInBytes(): ByteArray = this::class.java.name.toBytes()

/** @return serialized value */
fun <T : Any> from(value: T?): SerializedData {
val bytes: ByteArray
val type: SerializedDataType
val meta: Map<String, ByteArray>

when (value) {
null -> {
type = SerializedDataType.NULL
bytes = "".toByteArray()
meta = mutableMapOf()
}

is WorkflowTaskParameters -> {
type = SerializedDataType.AVRO_WITH_SCHEMA
bytes = value.toByteArray()
meta = mutableMapOf(META_JAVA_CLASS to WORKFLOW_TASK_PARAMETERS.toBytes())
}

is WorkflowTaskReturnValue -> {
type = SerializedDataType.AVRO_WITH_SCHEMA
bytes = value.toByteArray()
meta = mutableMapOf(META_JAVA_CLASS to WORKFLOW_TASK_RETURN_VALUE.toBytes())
}

else -> {
@OptIn(InternalSerializationApi::class)
when (val serializer = value::class.serializerOrNull()) {
null -> {
type = SerializedDataType.JSON_JACKSON
bytes = JsonJackson.stringify(value).toByteArray()
}

else -> {
type = SerializedDataType.JSON_KOTLIN
@Suppress("UNCHECKED_CAST")
bytes = jsonKotlin.encodeToString(serializer as KSerializer<T>, value).toByteArray()
}
}
meta = mutableMapOf(META_JAVA_CLASS to value.getClassInBytes())
}
}
return SerializedData(bytes, type, meta)
is WorkflowTaskParameters -> {
type = SerializedDataType.AVRO_WITH_SCHEMA
bytes = value.toByteArray()
meta = mutableMapOf(META_JAVA_CLASS to WORKFLOW_TASK_PARAMETERS.toBytes())
}
}

/** @return deserialized value */
fun deserialize(): Any? =
when (type) {
SerializedDataType.NULL -> null

SerializedDataType.JSON_JACKSON -> {
val klass = getDataClass()
try {
JsonJackson.parse(getJson(), klass)
} catch (e: JsonProcessingException) {
throw JsonDeserializationException(klass.name, causeString = e.toString())
}
is WorkflowTaskReturnValue -> {
type = SerializedDataType.AVRO_WITH_SCHEMA
bytes = value.toByteArray()
meta = mutableMapOf(META_JAVA_CLASS to WORKFLOW_TASK_RETURN_VALUE.toBytes())
}

else -> {
@OptIn(InternalSerializationApi::class)
when (val serializer = value::class.serializerOrNull()) {
null -> {
type = SerializedDataType.JSON_JACKSON
bytes = JsonJackson.stringify(value).toByteArray()
}

else -> {
type = SerializedDataType.JSON_KOTLIN
@Suppress("UNCHECKED_CAST")
bytes = jsonKotlin.encodeToString(serializer as KSerializer<T>, value).toByteArray()
}
}
meta = mutableMapOf(META_JAVA_CLASS to value.getClassInBytes())
}
}
return SerializedData(bytes, type, meta)
}
}

/** @return deserialized value */
fun deserialize(): Any? =
when (type) {
SerializedDataType.NULL -> null

SerializedDataType.JSON_JACKSON -> {
val klass = getDataClass()
try {
JsonJackson.parse(toJson(), klass)
} catch (e: JsonProcessingException) {
throw JsonDeserializationException(klass.name, causeString = e.toString())
}
}

SerializedDataType.JSON_KOTLIN -> {
val klass = getDataClass()
SerializedDataType.JSON_KOTLIN -> {
val klass = getDataClass()

@OptIn(InternalSerializationApi::class)
val serializer = klass.kotlin.serializerOrNull()
?: throw SerializerNotFoundException(klass.name)
@OptIn(InternalSerializationApi::class)
val serializer = klass.kotlin.serializerOrNull()
?: throw SerializerNotFoundException(klass.name)

try {
jsonKotlin.decodeFromString(serializer, getJson())
} catch (e: SerializationException) {
throw KotlinDeserializationException(klass.name, causeString = e.toString())
}
}
try {
jsonKotlin.decodeFromString(serializer, toJson())
} catch (e: SerializationException) {
throw KotlinDeserializationException(klass.name, causeString = e.toString())
}
}

SerializedDataType.AVRO_WITH_SCHEMA -> {
when (getDataClassString()) {
WORKFLOW_TASK_PARAMETERS -> WorkflowTaskParameters.fromByteArray(bytes)
WORKFLOW_TASK_RETURN_VALUE -> WorkflowTaskReturnValue.fromByteArray(bytes)
else -> thisShouldNotHappen()
}
}
SerializedDataType.AVRO_WITH_SCHEMA -> {
when (getDataClassString()) {
WORKFLOW_TASK_PARAMETERS -> WorkflowTaskParameters.fromByteArray(bytes)
WORKFLOW_TASK_RETURN_VALUE -> WorkflowTaskReturnValue.fromByteArray(bytes)
else -> thisShouldNotHappen()
}
}
}

fun getJson(): String = String(bytes, Charsets.UTF_8)
fun toJson(): String = String(bytes, Charsets.UTF_8)

fun hash(): String {
// MD5 implementation, enough to avoid collision in practical cases
val md = MessageDigest.getInstance("MD5")
return BigInteger(1, md.digest(bytes)).toString(16).padStart(32, '0')
}
fun hash(): String {
// MD5 implementation, enough to avoid collision in practical cases
val md = MessageDigest.getInstance("MD5")
return BigInteger(1, md.digest(bytes)).toString(16).padStart(32, '0')
}

/** Readable version */
override fun toString() = mapOf(
"bytes" to getJson().replace("\n", ""),
"type" to type,
"meta" to meta.mapValues { String(it.value) },
).toString()
/** Readable version */
override fun toString() = mapOf(
"bytes" to toJson().replace("\n", ""),
"type" to type,
"meta" to meta.mapValues { String(it.value) },
).toString()

private fun getDataClass(): Class<out Any> = getDataClassName().getClass().getOrThrow()
private fun getDataClass(): Class<out Any> = getDataClassName().getClass().getOrThrow()

private fun getDataClassName(): String = when (val klass = getDataClassString()) {
WORKFLOW_TASK_PARAMETERS -> WorkflowTaskParameters::class.java.name
WORKFLOW_TASK_RETURN_VALUE -> WorkflowTaskReturnValue::class.java.name
null -> throw MissingMetaJavaClassException
else -> klass
}
private fun getDataClassName(): String = when (val klass = getDataClassString()) {
WORKFLOW_TASK_PARAMETERS -> WorkflowTaskParameters::class.java.name
WORKFLOW_TASK_RETURN_VALUE -> WorkflowTaskReturnValue::class.java.name
null -> throw MissingMetaJavaClassException
else -> klass
}

private fun getDataClassString(): String? =
meta[META_JAVA_CLASS]?.let { String(it, charset = Charsets.UTF_8) }
private fun getDataClassString(): String? =
meta[META_JAVA_CLASS]?.let { String(it, charset = Charsets.UTF_8) }

override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false

other as SerializedData
other as SerializedData

if (!bytes.contentEquals(other.bytes)) return false
if (type != other.type) return false
if (!bytes.contentEquals(other.bytes)) return false
if (type != other.type) return false

if (meta.keys != other.meta.keys) return false
if (meta.map { it.value.contentEquals(other.meta[it.key]!!) }.any { !it }) return false
if (meta.keys != other.meta.keys) return false
if (meta.map { it.value.contentEquals(other.meta[it.key]!!) }.any { !it }) return false

return true
}
return true
}

override fun hashCode(): Int = bytes.contentHashCode()
override fun hashCode(): Int = bytes.contentHashCode()
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.kotlin.KotlinFeature
import com.fasterxml.jackson.module.kotlin.KotlinModule
import com.fasterxml.jackson.module.kotlin.jsonMapper
import java.io.IOException
import org.apache.avro.specific.SpecificRecordBase
import java.io.IOException

object Json {
private val mapper = jsonMapper {
Expand All @@ -47,6 +47,8 @@ object Json {
configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
}

fun bytesify(msg: Any?): ByteArray = mapper.writeValueAsBytes(msg)

fun stringify(msg: Any?, pretty: Boolean = false): String =
when (pretty) {
true -> mapper.writerWithDefaultPrettyPrinter().writeValueAsString(msg)
Expand All @@ -59,9 +61,11 @@ object Json {
* Cause should not be included to the json, as it triggers a circular reference when cause = this
*/
private abstract class ExceptionMixIn {
@JsonIgnore abstract fun getCause(): Throwable
@JsonIgnore
abstract fun getCause(): Throwable

@JsonIgnore abstract fun getMessage(): String
@JsonIgnore
abstract fun getMessage(): String
}

/**
Expand All @@ -71,9 +75,11 @@ object Json {
* IMPORTANT: properties of generated Avro classes MUST have public visibility
*/
private abstract class AvroMixIn {
@JsonIgnore abstract fun getSchema(): org.apache.avro.Schema
@JsonIgnore
abstract fun getSchema(): org.apache.avro.Schema

@JsonIgnore abstract fun getSpecificData(): org.apache.avro.specific.SpecificData
@JsonIgnore
abstract fun getSpecificData(): org.apache.avro.specific.SpecificData

@JsonSerialize(using = AvroListStringSerializer::class)
abstract fun getListOfString(): List<String>
Expand All @@ -83,9 +89,9 @@ object Json {
private class AvroListStringSerializer : JsonSerializer<List<String>>() {
@Throws(IOException::class)
override fun serialize(
value: List<String>,
gen: JsonGenerator,
serializers: SerializerProvider
value: List<String>,
gen: JsonGenerator,
serializers: SerializerProvider
) {
gen.writeStartArray()
for (o in value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ data class TaskRetriedEvent(
override val taskTags: Set<TaskTag>,
override val taskMeta: TaskMeta,
val taskRetryDelay: MillisDuration,
val lastError: ExecutionError?,
val lastError: ExecutionError,
) : ServiceEventMessage() {

companion object {
Expand Down
Loading

0 comments on commit 7e36b5e

Please sign in to comment.