diff --git a/CHANGELOG.md b/CHANGELOG.md index 85f1f8d..3bd326d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +# 1.4.0 + +* added feature to add multiple hosts as fallback (not getNextBlockHash yet) +* interface changed for instantiating Bitcoin etc. provide sequence of fallbacks + # 1.22 * added getNextBlockHash for Omni diff --git a/build.sbt b/build.sbt index c1b3cfc..28e7a58 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,6 @@ lazy val commonSettings = Seq( organization := "io.tokenanalyst", - version := "1.22.0", + version := "1.4.0", scalaVersion := "2.12.10", description := "bitcoin-rpc") diff --git a/src/main/scala/Protocol.scala b/src/main/scala/Protocol.scala index 689c3ea..8ee2f20 100644 --- a/src/main/scala/Protocol.scala +++ b/src/main/scala/Protocol.scala @@ -33,7 +33,7 @@ trait RPCDecoder[A] { } case class Config( - host: String, + hosts: Seq[String], port: Option[Int], username: Option[String], password: Option[String], @@ -53,7 +53,7 @@ object EnvConfig { case Seq(None, _, _, _, _) => throw new Exception("Pass at least BITCOIN_RPC_HOST.") case Seq(Some(h), port, user, pass, zmqPort) => - Config(h, port.map(_.toInt), user, pass, zmqPort.map(_.toInt)) + Config(h.split(","), port.map(_.toInt), user, pass, zmqPort.map(_.toInt)) } } } diff --git a/src/main/scala/RPCClient.scala b/src/main/scala/RPCClient.scala index 36cc53b..e2f40aa 100644 --- a/src/main/scala/RPCClient.scala +++ b/src/main/scala/RPCClient.scala @@ -27,13 +27,14 @@ import org.http4s.dsl.io._ import org.http4s.headers.{Authorization, _} import org.http4s.{BasicCredentials, MediaType, Request, Uri} +import java.net.SocketTimeoutException import scala.concurrent.ExecutionContext import scala.concurrent.duration._ object RPCClient { def bitcoin( - host: String, + hosts: Seq[String], port: Option[Int] = None, username: Option[String] = None, password: Option[String] = None, @@ -42,7 +43,7 @@ object RPCClient { implicit ec: ExecutionContext, cs: ContextShift[IO] ): Resource[IO, Bitcoin] = { - val config = Config(host, port, username, password, zmqPort) + val config = Config(hosts, port, username, password, zmqPort) for (client <- make(config)) yield Bitcoin(client) } @@ -55,7 +56,7 @@ object RPCClient { } def omni( - host: String, + hosts: Seq[String], port: Option[Int] = None, username: Option[String] = None, password: Option[String] = None, @@ -64,7 +65,7 @@ object RPCClient { implicit ec: ExecutionContext, cs: ContextShift[IO] ): Resource[IO, Omni] = { - val config = Config(host, port, username, password, zmqPort) + val config = Config(hosts, port, username, password, zmqPort) for (client <- make(config)) yield Omni(client) } @@ -82,9 +83,10 @@ object RPCClient { ): Resource[IO, RPCClient] = { for { client <- BlazeClientBuilder[IO](ec) - .withConnectTimeout(2.minutes) + .withConnectTimeout(5.seconds) + .withRequestTimeout(2.minutes) .resource - socket <- ZeroMQ.socket(config.host, config.zmqPort.getOrElse(28332)) + socket <- ZeroMQ.socket(config.hosts.head, config.zmqPort.getOrElse(28332)) } yield new RPCClient(client, socket, config) } } @@ -92,30 +94,31 @@ object RPCClient { class RPCClient(client: Client[IO], zmq: ZeroMQ.Socket, config: Config) extends Http4sClientDsl[IO] { - val uri = Uri - .fromString(s"http://${config.host}:${config.port.getOrElse(8332)}") - .getOrElse(throw new Exception("Could not parse URL")) - // is blocking def nextBlockHash(): IO[String] = zmq.nextBlock() def request[A <: RPCRequest: Encoder, B <: RPCResponse: Decoder]( request: A - ): IO[B] = + ): IO[B] = retry(config.hosts) { host => for { - req <- post(request) + req <- post(host, request) res <- client.expect[B](req) } yield res + } def requestJson[A <: RPCRequest: Encoder](request: A): IO[Json] = - for { - req <- post(request) - res <- client.expect[Json](req) - } yield res + retry(config.hosts) { host => + for { + req <- post(host, request) + res <- client.expect[Json](req) + } yield res + } private def post[A <: RPCRequest: Encoder]( + host: String, request: A ): IO[Request[IO]] = { + val uri = getUriForHost(host) (config.username, config.password) match { case (Some(user), Some(pass)) => POST( @@ -132,4 +135,21 @@ class RPCClient(client: Client[IO], zmq: ZeroMQ.Socket, config: Config) ) } } + + def getUriForHost(host: String) = + Uri + .fromString(s"http://${host}:${config.port.getOrElse(8332)}") + .getOrElse(throw new Exception("Could not parse URL")) + + def retry[A](fallbacks: Seq[String], current: Int = 0, maxRetries: Int = 10)( + f: String => IO[A] + ): IO[A] = + f(fallbacks(current % fallbacks.size)).handleErrorWith { + case e: SocketTimeoutException => + if (current <= maxRetries) + retry(fallbacks, current + 1, maxRetries)(f) + else + IO.raiseError(new Exception(s"Running out of retries for: ${e}")) + case e => IO.raiseError(e) + } } diff --git a/src/main/scala/ZeroMQ.scala b/src/main/scala/ZeroMQ.scala index 8e1e691..0595bc3 100644 --- a/src/main/scala/ZeroMQ.scala +++ b/src/main/scala/ZeroMQ.scala @@ -13,7 +13,6 @@ object ZeroMQ { val topic = zMsg.popString() val body = zMsg.popString() val seq = ByteBuffer.wrap(zMsg.pop().getData.reverse).getInt - message(topic, body, seq) } diff --git a/src/main/scala/examples/GetBlockHash.scala b/src/main/scala/examples/GetBlockHash.scala index 9470c5b..3f955f7 100644 --- a/src/main/scala/examples/GetBlockHash.scala +++ b/src/main/scala/examples/GetBlockHash.scala @@ -27,7 +27,7 @@ object GetBlockHash extends IOApp { implicit val ec = global RPCClient .bitcoin( - "127.0.0.1", + Seq("127.0.0.1"), username = Some("user"), password = Some("password") ) diff --git a/src/main/scala/examples/SubscribeToBlockUpdates.scala b/src/main/scala/examples/SubscribeToBlockUpdates.scala index f07b545..e1b656b 100644 --- a/src/main/scala/examples/SubscribeToBlockUpdates.scala +++ b/src/main/scala/examples/SubscribeToBlockUpdates.scala @@ -27,7 +27,7 @@ object SubscribeToBlockUpdates extends IOApp { implicit val ec = global RPCClient .bitcoin( - host = "127.0.0.1", + hosts = Seq("127.0.0.1"), username = Some("user"), password = Some("password") )