Skip to content

Commit

Permalink
handle ConnectException for failover
Browse files Browse the repository at this point in the history
  • Loading branch information
jpzk committed Nov 20, 2019
1 parent 56cb6cb commit 91f04a0
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 14 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# 2.2.0

* handle ConnectException with failover

# 2.1.0

* changed env flag for hosts BITCOIN_RPC_HOST -> BITCOIN_RPC_HOSTS
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
lazy val commonSettings = Seq(
organization := "io.tokenanalyst",
version := "2.1.0",
version := "2.2.0",
scalaVersion := "2.12.10",
description := "bitcoin-rpc")

Expand Down
31 changes: 19 additions & 12 deletions src/main/scala/RPCClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.http4s.dsl.io._
import org.http4s.headers.{Authorization, _}
import org.http4s.{BasicCredentials, MediaType, Request, Uri}

import java.net.SocketTimeoutException
import java.net.{ConnectException, SocketTimeoutException}
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

Expand Down Expand Up @@ -86,7 +86,10 @@ object RPCClient {
.withConnectTimeout(5.seconds)
.withRequestTimeout(2.minutes)
.resource
socket <- ZeroMQ.socket(config.hosts.head, config.zmqPort.getOrElse(28332))
socket <- ZeroMQ.socket(
config.hosts.head,
config.zmqPort.getOrElse(28332)
)
} yield new RPCClient(client, socket, config)
}
}
Expand Down Expand Up @@ -119,8 +122,8 @@ class RPCClient(client: Client[IO], zmq: ZeroMQ.Socket, config: Config)
request: A
): IO[Request[IO]] = {
val uri = Uri
.fromString(s"http://${host}:${config.port.getOrElse(8332)}")
.getOrElse(throw new Exception("Could not parse URL"))
.fromString(s"http://${host}:${config.port.getOrElse(8332)}")
.getOrElse(throw new Exception("Could not parse URL"))
(config.username, config.password) match {
case (Some(user), Some(pass)) =>
POST(
Expand All @@ -138,15 +141,19 @@ class RPCClient(client: Client[IO], zmq: ZeroMQ.Socket, config: Config)
}
}

def retry[A](fallbacks: Seq[String], current: Int = 0, maxRetries: Int = 10)(
def retry[A](fallbacks: Seq[String], current: Int = 0, max: Int = 10)(
f: String => IO[A]
): IO[A] =
): IO[A] = {
val handle = (e: Exception) => {
if (current <= max)
retry(fallbacks, current + 1, max)(f)
else
IO.raiseError(new Exception(s"Running out of retries for: ${e}"))
}
f(fallbacks(current % fallbacks.size)).handleErrorWith {
case e: SocketTimeoutException =>
if (current <= maxRetries)
retry(fallbacks, current + 1, maxRetries)(f)
else
IO.raiseError(new Exception(s"Running out of retries for: ${e}"))
case e => IO.raiseError(e)
case e: ConnectException => handle(e)
case e: SocketTimeoutException => handle(e)
case e => IO.raiseError(e)
}
}
}
2 changes: 1 addition & 1 deletion src/main/scala/ZeroMQ.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ object ZeroMQ {
class Socket(host: String, port: Int) extends Closeable {
val context = new ZContext()
val socket: ZMQ.Socket = context.createSocket(SocketType.SUB)

//http://api.zeromq.org/2-1:zmq-setsockopt
socket.setHWM(0)
socket.subscribe("hashblock".map(_.toByte).toArray)
Expand Down

0 comments on commit 91f04a0

Please sign in to comment.