Skip to content

Commit

Permalink
refactor: channel-based backpressure (#514)
Browse files Browse the repository at this point in the history
refactor: channel-based backpressure

Co-authored-by: Regis Kuckaertz <[email protected]>
  • Loading branch information
regiskuckaertz and regiskuckaertz authored Jun 19, 2023
1 parent 4c20b9b commit 7eb76a1
Showing 1 changed file with 15 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import zio.stm.TSemaphore
import zio.Exit.Failure
import zio.Exit.Success
import scala.annotation.tailrec
import zio.stream.ZSink
import zio.stream.ZChannel
import scalapb.zio_grpc.GIO

class ZServerCallHandler[Req, Res](
runtime: Runtime[Any],
Expand Down Expand Up @@ -107,56 +110,22 @@ object ZServerCallHandler {
call: ZServerCall[Res],
stream: ZStream[Any, StatusException, Res]
): ZIO[Any, StatusException, Unit] = {
def takeFromQueue(
queue: Dequeue[Exit[Option[StatusException], Res]]
): ZIO[Any, StatusException, Unit] =
queue.takeAll.flatMap(takeFromCache(_, queue))
val backpressureSink = {
def go: ZChannel[Any, ZNothing, Chunk[Res], Any, StatusException, Chunk[Res], Unit] =
ZChannel.readWithCause(
xs =>
ZChannel.fromZIO(GIO.attempt(xs.foreach(call.call.sendMessage))) *>
ZChannel.suspend(if (call.call.isReady()) go else ZChannel.fromZIO(call.awaitReady) *> go),
c => ZChannel.failCause(c),
_ => ZChannel.unit
)

def takeFromCache(
xs: Chunk[Exit[Option[StatusException], Res]],
queue: Dequeue[Exit[Option[StatusException], Res]]
): ZIO[Any, StatusException, Unit] =
ZIO.suspendSucceed {
@tailrec def innerLoop(loop: Boolean, i: Int): IO[StatusException, Unit] =
if (i < xs.length && loop) {
xs(i) match {
case Failure(cause) =>
cause.failureOrCause match {
case Left(Some(status)) =>
ZIO.fail(status)
case Left(None) =>
ZIO.unit
case Right(cause) =>
ZIO.failCause(cause)
}
case Success(value) =>
call.call.sendMessage(value)
// the loop iteration may only continue if the call can
// still accept elements and we have more elements to send
innerLoop(call.call.isReady, i + 1)
}
} else if (loop)
// ^ if we reached the end of the chunk but the call can still
// proceed, we pull from the queue and continue
takeFromQueue(queue)
else
// ^ otherwise, we wait for the call to be ready and then start again
call.awaitReady *> takeFromCache(xs.drop(i), queue)

if (xs.isEmpty)
takeFromQueue(queue)
else
innerLoop(true, 0)
}
ZSink.fromChannel(go)
}

for {
queueSize <- backpressureQueueSize
_ <- ZIO
.scoped[Any](
stream
.toQueueOfElements(queueSize)
.flatMap(queue => call.awaitReady *> takeFromQueue(queue))
)
_ <- stream.buffer(queueSize).run(backpressureSink)
} yield ()
}
}

0 comments on commit 7eb76a1

Please sign in to comment.