From 1b76d54b488e22ad3ab4aa0ea7f80a86cc128a81 Mon Sep 17 00:00:00 2001 From: Pierre Ricadat Date: Tue, 13 Feb 2024 11:10:36 +0900 Subject: [PATCH] Add storage client using Redisson (#108) --- build.sbt | 14 ++++ .../devsisters/shardcake/RedisConfig.scala | 12 ++++ .../devsisters/shardcake/StorageRedis.scala | 72 +++++++++++++++++++ .../shardcake/StorageRedisSpec.scala | 68 ++++++++++++++++++ vuepress/docs/docs/customization.md | 2 +- 5 files changed, 167 insertions(+), 1 deletion(-) create mode 100644 storage-redisson/src/main/scala/com/devsisters/shardcake/RedisConfig.scala create mode 100644 storage-redisson/src/main/scala/com/devsisters/shardcake/StorageRedis.scala create mode 100644 storage-redisson/src/test/scala/com/devsisters/shardcake/StorageRedisSpec.scala diff --git a/build.sbt b/build.sbt index 30cd5a9..373d135 100644 --- a/build.sbt +++ b/build.sbt @@ -10,6 +10,7 @@ val zioCatsInteropVersion = "23.1.0.0" val sttpVersion = "3.9.1" val calibanVersion = "2.4.3" val redis4catsVersion = "1.5.2" +val redissonVersion = "3.23.0" val scalaKryoVersion = "1.0.2" val testContainersVersion = "0.40.9" @@ -53,6 +54,7 @@ lazy val root = project entities, healthK8s, storageRedis, + storageRedisson, serializationKryo, grpcProtocol, examples @@ -126,6 +128,18 @@ lazy val storageRedis = project ) ) +lazy val storageRedisson = project + .in(file("storage-redisson")) + .settings(name := "shardcake-storage-redisson") + .settings(commonSettings) + .dependsOn(core) + .settings( + libraryDependencies ++= + Seq( + "org.redisson" % "redisson" % redissonVersion + ) + ) + lazy val serializationKryo = project .in(file("serialization-kryo")) .settings(name := "shardcake-serialization-kryo") diff --git a/storage-redisson/src/main/scala/com/devsisters/shardcake/RedisConfig.scala b/storage-redisson/src/main/scala/com/devsisters/shardcake/RedisConfig.scala new file mode 100644 index 0000000..3e64c3b --- /dev/null +++ b/storage-redisson/src/main/scala/com/devsisters/shardcake/RedisConfig.scala @@ -0,0 +1,12 @@ +package com.devsisters.shardcake + +/** + * The configuration for the Redis storage implementation. + * @param assignmentsKey the key to store shard assignments + * @param podsKey the key to store registered pods + */ +case class RedisConfig(assignmentsKey: String, podsKey: String) + +object RedisConfig { + val default: RedisConfig = RedisConfig(assignmentsKey = "shard_assignments", podsKey = "pods") +} diff --git a/storage-redisson/src/main/scala/com/devsisters/shardcake/StorageRedis.scala b/storage-redisson/src/main/scala/com/devsisters/shardcake/StorageRedis.scala new file mode 100644 index 0000000..3b2ae47 --- /dev/null +++ b/storage-redisson/src/main/scala/com/devsisters/shardcake/StorageRedis.scala @@ -0,0 +1,72 @@ +package com.devsisters.shardcake + +import scala.jdk.CollectionConverters._ + +import com.devsisters.shardcake.interfaces.Storage +import org.redisson.api.RedissonClient +import org.redisson.api.listener.MessageListener +import org.redisson.client.codec.StringCodec +import zio.stream.ZStream +import zio.{ Queue, Task, Unsafe, ZIO, ZLayer } + +object StorageRedis { + + /** + * A layer that returns a Storage implementation using Redis + */ + val live: ZLayer[RedissonClient with RedisConfig, Nothing, Storage] = + ZLayer { + for { + config <- ZIO.service[RedisConfig] + redisClient <- ZIO.service[RedissonClient] + assignmentsMap = redisClient.getMap[String, String](config.assignmentsKey) + podsMap = redisClient.getMap[String, String](config.podsKey) + assignmentsTopic = redisClient.getTopic(config.assignmentsKey, StringCodec.INSTANCE) + } yield new Storage { + def getAssignments: Task[Map[ShardId, Option[PodAddress]]] = + ZIO + .fromCompletionStage(assignmentsMap.readAllEntrySetAsync()) + .map( + _.asScala + .flatMap(entry => + entry.getKey.toIntOption.map( + _ -> (if (entry.getValue.isEmpty) None + else PodAddress(entry.getValue)) + ) + ) + .toMap + ) + def saveAssignments(assignments: Map[ShardId, Option[PodAddress]]): Task[Unit] = + ZIO.fromCompletionStage(assignmentsMap.putAllAsync(assignments.map { case (k, v) => + k.toString -> v.fold("")(_.toString) + }.asJava)) *> + ZIO.fromCompletionStage(assignmentsTopic.publishAsync("ping")).unit + def assignmentsStream: ZStream[Any, Throwable, Map[ShardId, Option[PodAddress]]] = + ZStream.unwrap { + for { + queue <- Queue.unbounded[String] + runtime <- ZIO.runtime[Any] + _ <- ZIO.fromCompletionStage( + assignmentsTopic.addListenerAsync( + classOf[String], + new MessageListener[String] { + def onMessage(channel: CharSequence, msg: String): Unit = + Unsafe.unsafe(implicit unsafe => runtime.unsafe.run(queue.offer(msg))) + } + ) + ) + } yield ZStream.fromQueueWithShutdown(queue).mapZIO(_ => getAssignments) + } + def getPods: Task[Map[PodAddress, Pod]] = + ZIO + .fromCompletionStage(podsMap.readAllEntrySetAsync()) + .map( + _.asScala + .flatMap(entry => PodAddress(entry.getKey).map(address => address -> Pod(address, entry.getValue))) + .toMap + ) + def savePods(pods: Map[PodAddress, Pod]): Task[Unit] = + ZIO.fromCompletionStage(podsMap.putAllAsync(pods.map { case (k, v) => k.toString -> v.version }.asJava)).unit + } + } +} diff --git a/storage-redisson/src/test/scala/com/devsisters/shardcake/StorageRedisSpec.scala b/storage-redisson/src/test/scala/com/devsisters/shardcake/StorageRedisSpec.scala new file mode 100644 index 0000000..9a71b2f --- /dev/null +++ b/storage-redisson/src/test/scala/com/devsisters/shardcake/StorageRedisSpec.scala @@ -0,0 +1,68 @@ +package com.devsisters.shardcake + +import com.devsisters.shardcake.interfaces.Storage +import com.dimafeng.testcontainers.GenericContainer +import org.redisson.Redisson +import org.redisson.config.{ Config => RedissonConfig } +import org.redisson.api.RedissonClient +import zio.Clock.ClockLive +import zio._ +import zio.stream.ZStream +import zio.test.TestAspect.sequential +import zio.test._ + +object StorageRedisSpec extends ZIOSpecDefault { + val container: ZLayer[Any, Nothing, GenericContainer] = + ZLayer.scoped { + ZIO.acquireRelease { + ZIO.attemptBlocking { + val container = new GenericContainer(dockerImage = "redis:6.2.5", exposedPorts = Seq(6379)) + container.start() + container + }.orDie + }(container => ZIO.attemptBlocking(container.stop()).orDie) + } + + val redis: ZLayer[GenericContainer, Throwable, RedissonClient] = + ZLayer { + for { + container <- ZIO.service[GenericContainer] + uri = s"redis://foobared@${container.host}:${container.mappedPort(container.exposedPorts.head)}" + redissonConfig = new RedissonConfig() + _ = redissonConfig.useSingleServer().setAddress(uri) + client = Redisson.create(redissonConfig) + } yield client + } + + def spec: Spec[TestEnvironment with Scope, Any] = + suite("StorageRedisSpec")( + test("save and get pods") { + val expected = List(Pod(PodAddress("host1", 1), "1.0.0"), Pod(PodAddress("host2", 2), "2.0.0")) + .map(p => p.address -> p) + .toMap + for { + _ <- ZIO.serviceWithZIO[Storage](_.savePods(expected)) + actual <- ZIO.serviceWithZIO[Storage](_.getPods) + } yield assertTrue(expected == actual) + }, + test("save and get assignments") { + val expected = Map(1 -> Some(PodAddress("host1", 1)), 2 -> None) + for { + _ <- ZIO.serviceWithZIO[Storage](_.saveAssignments(expected)) + actual <- ZIO.serviceWithZIO[Storage](_.getAssignments) + } yield assertTrue(expected == actual) + }, + test("assignments stream") { + val expected = Map(1 -> Some(PodAddress("host1", 1)), 2 -> None) + for { + p <- Promise.make[Nothing, Map[Int, Option[PodAddress]]] + _ <- ZStream.serviceWithStream[Storage](_.assignmentsStream).runForeach(p.succeed(_)).fork + _ <- ClockLive.sleep(1 second) + _ <- ZIO.serviceWithZIO[Storage](_.saveAssignments(expected)) + actual <- p.await + } yield assertTrue(expected == actual) + } + ).provideLayerShared( + container >>> redis ++ ZLayer.succeed(RedisConfig.default) >>> StorageRedis.live + ) @@ sequential +} diff --git a/vuepress/docs/docs/customization.md b/vuepress/docs/docs/customization.md index 4b5fbd8..11f5339 100644 --- a/vuepress/docs/docs/customization.md +++ b/vuepress/docs/docs/customization.md @@ -26,7 +26,7 @@ trait Storage { For testing, you can use the `Storage.memory` layer that keeps data in memory. -Shardcake provides an implementation of `Storage` using Redis. To use it, add the following dependency: +Shardcake provides an implementation of `Storage` using Redis with the Redis4cats library (there's also an alternative using Redisson). To use it, add the following dependency: ```scala libraryDependencies += "com.devsisters" %% "shardcake-storage-redis" % "2.1.0" ```