diff --git a/core/src/main/scalajvm/scalapb/zio_grpc/server/ZServerCallHandler.scala b/core/src/main/scalajvm/scalapb/zio_grpc/server/ZServerCallHandler.scala index 2b70299a5..d11a656ca 100644 --- a/core/src/main/scalajvm/scalapb/zio_grpc/server/ZServerCallHandler.scala +++ b/core/src/main/scalajvm/scalapb/zio_grpc/server/ZServerCallHandler.scala @@ -14,6 +14,9 @@ import zio.stm.TSemaphore import zio.Exit.Failure import zio.Exit.Success import scala.annotation.tailrec +import zio.stream.ZSink +import zio.stream.ZChannel +import scalapb.zio_grpc.GIO class ZServerCallHandler[Req, Res]( runtime: Runtime[Any], @@ -107,56 +110,22 @@ object ZServerCallHandler { call: ZServerCall[Res], stream: ZStream[Any, StatusException, Res] ): ZIO[Any, StatusException, Unit] = { - def takeFromQueue( - queue: Dequeue[Exit[Option[StatusException], Res]] - ): ZIO[Any, StatusException, Unit] = - queue.takeAll.flatMap(takeFromCache(_, queue)) + val backpressureSink = { + def go: ZChannel[Any, ZNothing, Chunk[Res], Any, StatusException, Chunk[Res], Unit] = + ZChannel.readWithCause( + xs => + ZChannel.fromZIO(GIO.attempt(xs.foreach(call.call.sendMessage))) *> + ZChannel.suspend(if (call.call.isReady()) go else ZChannel.fromZIO(call.awaitReady) *> go), + c => ZChannel.failCause(c), + _ => ZChannel.unit + ) - def takeFromCache( - xs: Chunk[Exit[Option[StatusException], Res]], - queue: Dequeue[Exit[Option[StatusException], Res]] - ): ZIO[Any, StatusException, Unit] = - ZIO.suspendSucceed { - @tailrec def innerLoop(loop: Boolean, i: Int): IO[StatusException, Unit] = - if (i < xs.length && loop) { - xs(i) match { - case Failure(cause) => - cause.failureOrCause match { - case Left(Some(status)) => - ZIO.fail(status) - case Left(None) => - ZIO.unit - case Right(cause) => - ZIO.failCause(cause) - } - case Success(value) => - call.call.sendMessage(value) - // the loop iteration may only continue if the call can - // still accept elements and we have more elements to send - innerLoop(call.call.isReady, i + 1) - } - } else if (loop) - // ^ if we reached the end of the chunk but the call can still - // proceed, we pull from the queue and continue - takeFromQueue(queue) - else - // ^ otherwise, we wait for the call to be ready and then start again - call.awaitReady *> takeFromCache(xs.drop(i), queue) - - if (xs.isEmpty) - takeFromQueue(queue) - else - innerLoop(true, 0) - } + ZSink.fromChannel(go) + } for { queueSize <- backpressureQueueSize - _ <- ZIO - .scoped[Any]( - stream - .toQueueOfElements(queueSize) - .flatMap(queue => call.awaitReady *> takeFromQueue(queue)) - ) + _ <- stream.buffer(queueSize).run(backpressureSink) } yield () } }