Skip to content

Commit

Permalink
added retry() to RPCClient
Browse files Browse the repository at this point in the history
  • Loading branch information
jpzk committed Nov 19, 2019
1 parent 1b918ff commit cf8edcb
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 22 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# 1.4.0

* added feature to add multiple hosts as fallback (not getNextBlockHash yet)
* interface changed for instantiating Bitcoin etc. provide sequence of fallbacks

# 1.22

* added getNextBlockHash for Omni
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
lazy val commonSettings = Seq(
organization := "io.tokenanalyst",
version := "1.22.0",
version := "1.4.0",
scalaVersion := "2.12.10",
description := "bitcoin-rpc")

Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/Protocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ trait RPCDecoder[A] {
}

case class Config(
host: String,
hosts: Seq[String],
port: Option[Int],
username: Option[String],
password: Option[String],
Expand All @@ -53,7 +53,7 @@ object EnvConfig {
case Seq(None, _, _, _, _) =>
throw new Exception("Pass at least BITCOIN_RPC_HOST.")
case Seq(Some(h), port, user, pass, zmqPort) =>
Config(h, port.map(_.toInt), user, pass, zmqPort.map(_.toInt))
Config(h.split(","), port.map(_.toInt), user, pass, zmqPort.map(_.toInt))
}
}
}
Expand Down
52 changes: 36 additions & 16 deletions src/main/scala/RPCClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ import org.http4s.dsl.io._
import org.http4s.headers.{Authorization, _}
import org.http4s.{BasicCredentials, MediaType, Request, Uri}

import java.net.SocketTimeoutException
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

object RPCClient {

def bitcoin(
host: String,
hosts: Seq[String],
port: Option[Int] = None,
username: Option[String] = None,
password: Option[String] = None,
Expand All @@ -42,7 +43,7 @@ object RPCClient {
implicit ec: ExecutionContext,
cs: ContextShift[IO]
): Resource[IO, Bitcoin] = {
val config = Config(host, port, username, password, zmqPort)
val config = Config(hosts, port, username, password, zmqPort)
for (client <- make(config)) yield Bitcoin(client)
}

Expand All @@ -55,7 +56,7 @@ object RPCClient {
}

def omni(
host: String,
hosts: Seq[String],
port: Option[Int] = None,
username: Option[String] = None,
password: Option[String] = None,
Expand All @@ -64,7 +65,7 @@ object RPCClient {
implicit ec: ExecutionContext,
cs: ContextShift[IO]
): Resource[IO, Omni] = {
val config = Config(host, port, username, password, zmqPort)
val config = Config(hosts, port, username, password, zmqPort)
for (client <- make(config)) yield Omni(client)
}

Expand All @@ -82,40 +83,42 @@ object RPCClient {
): Resource[IO, RPCClient] = {
for {
client <- BlazeClientBuilder[IO](ec)
.withConnectTimeout(2.minutes)
.withConnectTimeout(5.seconds)
.withRequestTimeout(2.minutes)
.resource
socket <- ZeroMQ.socket(config.host, config.zmqPort.getOrElse(28332))
socket <- ZeroMQ.socket(config.hosts.head, config.zmqPort.getOrElse(28332))
} yield new RPCClient(client, socket, config)
}
}

class RPCClient(client: Client[IO], zmq: ZeroMQ.Socket, config: Config)
extends Http4sClientDsl[IO] {

val uri = Uri
.fromString(s"http://${config.host}:${config.port.getOrElse(8332)}")
.getOrElse(throw new Exception("Could not parse URL"))

// is blocking
def nextBlockHash(): IO[String] = zmq.nextBlock()

def request[A <: RPCRequest: Encoder, B <: RPCResponse: Decoder](
request: A
): IO[B] =
): IO[B] = retry(config.hosts) { host =>
for {
req <- post(request)
req <- post(host, request)
res <- client.expect[B](req)
} yield res
}

def requestJson[A <: RPCRequest: Encoder](request: A): IO[Json] =
for {
req <- post(request)
res <- client.expect[Json](req)
} yield res
retry(config.hosts) { host =>
for {
req <- post(host, request)
res <- client.expect[Json](req)
} yield res
}

private def post[A <: RPCRequest: Encoder](
host: String,
request: A
): IO[Request[IO]] = {
val uri = getUriForHost(host)
(config.username, config.password) match {
case (Some(user), Some(pass)) =>
POST(
Expand All @@ -132,4 +135,21 @@ class RPCClient(client: Client[IO], zmq: ZeroMQ.Socket, config: Config)
)
}
}

def getUriForHost(host: String) =
Uri
.fromString(s"http://${host}:${config.port.getOrElse(8332)}")
.getOrElse(throw new Exception("Could not parse URL"))

def retry[A](fallbacks: Seq[String], current: Int = 0, maxRetries: Int = 10)(
f: String => IO[A]
): IO[A] =
f(fallbacks(current % fallbacks.size)).handleErrorWith {
case e: SocketTimeoutException =>
if (current <= maxRetries)
retry(fallbacks, current + 1, maxRetries)(f)
else
IO.raiseError(new Exception(s"Running out of retries for: ${e}"))
case e => IO.raiseError(e)
}
}
1 change: 0 additions & 1 deletion src/main/scala/ZeroMQ.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ object ZeroMQ {
val topic = zMsg.popString()
val body = zMsg.popString()
val seq = ByteBuffer.wrap(zMsg.pop().getData.reverse).getInt

message(topic, body, seq)
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/examples/GetBlockHash.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ object GetBlockHash extends IOApp {
implicit val ec = global
RPCClient
.bitcoin(
"127.0.0.1",
Seq("127.0.0.1"),
username = Some("user"),
password = Some("password")
)
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/examples/SubscribeToBlockUpdates.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ object SubscribeToBlockUpdates extends IOApp {
implicit val ec = global
RPCClient
.bitcoin(
host = "127.0.0.1",
hosts = Seq("127.0.0.1"),
username = Some("user"),
password = Some("password")
)
Expand Down

0 comments on commit cf8edcb

Please sign in to comment.