forked from pawelkaczor/akka-ddd
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathJson4sEsSerializer.scala
140 lines (115 loc) · 4.9 KB
/
Json4sEsSerializer.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package pl.newicom.eventstore
import java.nio.ByteBuffer
import java.nio.charset.Charset
import akka.actor.{ActorRef, ExtendedActorSystem}
import akka.persistence.eventstore.EventStoreSerializer
import akka.persistence.eventstore.snapshot.EventStoreSnapshotStore.SnapshotEvent
import akka.persistence.eventstore.snapshot.EventStoreSnapshotStore.SnapshotEvent.Snapshot
import akka.persistence.{PersistentRepr, SnapshotMetadata}
import akka.serialization.{Serialization, SerializationExtension}
import akka.util.ByteString
import eventstore.{Content, ContentType, Event, EventData}
import org.joda.time.DateTime
import org.json4s.Extraction.decompose
import org.json4s.JsonAST.{JField, JObject, JString}
import org.json4s._
import org.json4s.ext.{JodaTimeSerializers, UUIDSerializer}
import org.json4s.native.Serialization.{read, write}
import org.json4s.reflect.TypeInfo
import pl.newicom.dddd.aggregate.EntityId
import pl.newicom.dddd.delivery.protocol.Processed
import pl.newicom.dddd.delivery.protocol.alod.{Processed => AlodProcessed}
import pl.newicom.dddd.messaging.MetaData
import pl.newicom.dddd.messaging.event.EventMessage
import pl.newicom.eventstore.Json4sEsSerializer._
class Json4sEsSerializer(system: ExtendedActorSystem) extends EventStoreSerializer {
def identifier = Identifier
val defaultFormats: Formats = DefaultFormats + ActorRefSerializer + new SnapshotSerializer(system) ++ JodaTimeSerializers.all + UUIDSerializer +
new FullTypeHints(List(
classOf[MetaData],
classOf[Processed],
classOf[AlodProcessed],
classOf[PersistentRepr]))
implicit val formats: Formats = defaultFormats
def includeManifest = true
override def fromBinary(bytes: Array[Byte], manifestOpt: Option[Class[_]]): AnyRef = {
implicit val manifest = manifestOpt match {
case Some(x) => Manifest.classType(x)
case None => Manifest.AnyRef
}
try {
read(new String(bytes, UTF8))
} catch {
case th: Throwable =>
th.printStackTrace()
throw th;
}
}
override def toBinary(o: AnyRef) = {
write(o).getBytes(UTF8)
}
override def toPayloadAndMetadata(e: AnyRef) = e match {
case em: EventMessage => (em.event,
em.withMetaData(Map("id" -> em.id, "timestamp" -> em.timestamp)).metadata)
case _ => super.toPayloadAndMetadata(e)
}
override def fromPayloadAndMetadata(payload: AnyRef, maybeMetadata: Option[AnyRef]): AnyRef = {
if (maybeMetadata.isDefined) {
val metadata = maybeMetadata.get.asInstanceOf[MetaData]
val id: EntityId = metadata.get("id")
val timestamp = DateTime.parse(metadata.get("timestamp"))
new EventMessage(payload, id, timestamp).withMetaData(Some(metadata))
} else {
new EventMessage(payload)
}
}
override def toEvent(x: AnyRef) = x match {
case x: SnapshotEvent => EventData(
eventType = x.getClass.getName,
data = Content(ByteString(toBinary(x)), ContentType.Json))
case _ => sys.error(s"Cannot serialize $x, SnapshotEvent expected")
}
override def fromEvent(event: Event, manifest: Class[_]) = {
val clazz = Class.forName(event.data.eventType)
val result = fromBinary(event.data.data.value.toArray, clazz)
if (manifest.isInstance(result)) result
else sys.error(s"Cannot deserialize event as $manifest, event: $event")
}
def contentType = ContentType.Json
object ActorRefSerializer extends CustomSerializer[ActorRef](format => (
{
case JString(s) => system.provider.resolveActorRef(s)
},
{
case x: ActorRef => JString(Serialization.serializedActorPath(x))
}
))
}
object Json4sEsSerializer {
val UTF8 = Charset.forName("UTF-8")
val Identifier: Int = ByteBuffer.wrap("json4s".getBytes(UTF8)).getInt
case class SnapshotSerializer(sys: ExtendedActorSystem) extends Serializer[Snapshot] {
val Clazz = classOf[Snapshot]
import akka.serialization.{Serialization => SysSerialization}
lazy val serialization: SysSerialization = SerializationExtension(sys)
def deserialize(implicit format: Formats) = {
case (TypeInfo(Clazz, _), JObject(List(JField("dataClass", JString(dataClass)), JField("data", JString(x)), JField("metadata", metadata)))) =>
import Base64._
val data = serialization.deserialize(x.toByteArray, Class.forName(dataClass)).get
val metaData = metadata.extract[SnapshotMetadata]
Snapshot(data, metaData)
}
def serializeAnyRef(data: AnyRef)(implicit format: Formats): String = {
import Base64._
serialization.serialize(data).get.toBase64
}
def serialize(implicit format: Formats) = {
case Snapshot(data, metadata) =>
val dataSerialized: String = data match {
case data: AnyRef => serializeAnyRef(data)
case _ => data.toString
}
JObject("jsonClass" -> JString(Clazz.getName), "dataClass" -> JString(data.getClass.getName), "data" -> JString(dataSerialized), "metadata" -> decompose(metadata))
}
}
}