diff --git a/.gitignore b/.gitignore index e84b067d..7db1cad2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +bin/ target/ lib_managed/ src_managed/ @@ -9,3 +10,4 @@ build/ *.iml *.ipr *.iws +*.swp diff --git a/README.md b/README.md index 47f1f364..9648ce32 100644 --- a/README.md +++ b/README.md @@ -52,16 +52,18 @@ Norbert provides two ways to interact with the cluster. ### Writing the code - the Scala version - object NorbertClient { - def main(args: Array[String]) { - val cc = ClusterClient("norbert", "localhost:2181", 30000) (1) - cc.awaitConnectionUninterruptibly (2) - cc.nodes (3) - cc.addListener(new MyClusterListener) (4) - cc.markNodeAvailable(1) (5) - cc.shutdown (6) - } - } +```scala +object NorbertClient { + def main(args: Array[String]) { + val cc = ClusterClient("norbert", "localhost:2181", 30000) (1) + cc.awaitConnectionUninterruptibly (2) + cc.nodes (3) + cc.addListener(new MyClusterListener) (4) + cc.markNodeAvailable(1) (5) + cc.shutdown (6) + } +} +``` 1. The `ClusterClient` companion object provides an easy way to instantiate and start a `ClusterClient` instance. 2. Before using the ClusterClient you must wait for it to finish connecting. @@ -72,16 +74,18 @@ Norbert provides two ways to interact with the cluster. ### Writing the code - the Java version - public class NorbertClient { - public static void main(String[] args) { - ClusterClient cc = new ZooKeeperClusterClient("norbert", "localhost:2181", 30000); (1) - cc.awaiteConnectionUninterruptibly(); (2) - cc.getNodes(); (3) - cc.addListener(new MyClusterListener()); (4) - cc.markNodeAvailable(1); (5). - cluster.shutdown(6) - } - } +```java +public class NorbertClient { + public static void main(String[] args) { + ClusterClient cc = new ZooKeeperClusterClient("norbert", "localhost:2181", 30000); (1) + cc.awaiteConnectionUninterruptibly(); (2) + cc.getNodes(); (3) + cc.addListener(new MyClusterListener()); (4) + cc.markNodeAvailable(1); (5). + cluster.shutdown(6) + } +} +``` 1. There are currently two ClusterClient implementations in Norbert, this code is instantiating the one that uses ZooKeeper. 2. Before using the ClusterClient you must wait for it to finish connecting. @@ -110,32 +114,34 @@ Norbert's client/server library uses message passing semantics and, specifically ### Load Balancers Norbert uses a software load balancer mechanism to route a request from a client to a server. Both partitioned and unpartitioned clusters are supported. - -If you are building a service which will use an unpartitioned cluster, you must provide your `NetworkClient` instance with a `LoadBalancerFactory`. The `LoadBalancerFactory` is used to create `LoadBalancer` instance that will be used to route requests. A round robin load balancer factory is provided. + +If you are building a service which will use an unpartitioned cluster, you must provide your `NetworkClient` instance with a `LoadBalancerFactory`. The `LoadBalancerFactory` is used to create `LoadBalancer` instance that will be used to route requests. A round robin load balancer factory is provided. If you are building a partitioned cluster then you will want to use the `PartitionedNetworkClient` and a `PartitionedLoadBalancerFactory`. These are generic classes that have a PartitionedId type parameter. PartitionedId is the type of the id that you use to partition your cluster (e.g. a member id). A consistent hash load balancer factory is provided. ### Writing a network server - the Scala version - object NorbertNetworkServer { - def main(args: Array[String]) { - val config = new NetworkServerConfig (1) - config.serviceName = "norbert" - config.zooKeeperConnectString = "localhost:2181" - config.zooKeeperSessionTimeoutMillis = 30000 - config.requestThreadCorePoolSize = 5 - config.requestThreadMaxPoolSize = 10 - config.requestThreadKeepAliveTimeSecs = 300 - - val server = NetworkServer(config) (2) - server.registerHandler(MyRequestMessage.getDefaultInstance, MyResponseMessage.getDefaultInstance, messageHandler _) (3) - server.bind(nodeId) (4) - } - - private def messageHandler(message: Message): Message = { - // application logic which returns a MyResponseMessage - } - } +```scala +object NorbertNetworkServer { + def main(args: Array[String]) { + val config = new NetworkServerConfig (1) + config.serviceName = "norbert" + config.zooKeeperConnectString = "localhost:2181" + config.zooKeeperSessionTimeoutMillis = 30000 + config.requestThreadCorePoolSize = 5 + config.requestThreadMaxPoolSize = 10 + config.requestThreadKeepAliveTimeSecs = 300 + + val server = NetworkServer(config) (2) + server.registerHandler(MyRequestMessage.getDefaultInstance, MyResponseMessage.getDefaultInstance, messageHandler _) (3) + server.bind(nodeId) (4) + } + + private def messageHandler(message: Message): Message = { + // application logic which returns a MyResponseMessage + } +} +``` 1. A `NetworkServerConfig` contains the configuration data for a `NetworkServer`. 2. The `NetworkServer` companion object provides an easy to instantiate a new `NetworkServer` instance. @@ -144,21 +150,23 @@ If you are building a partitioned cluster then you will want to use the `Partiti ### Writing a network server - the Java version - public class NorbertNetworkServer { - public static void main(String[] args) { - NetworkServerConfig config = new NetworkServerConfig(); - config.setServiceName("norbert"); - config.setZooKeeperConnectString("localhost:2181"); - config.setZooKeeperSessionTimeoutMillis(30000); - config.setRequestThreadCorePoolSize(5); - config.setRequestThreadMaxPoolSize(10); - config.setRequestThreadKeepAliveTimeSecs(300); - - NetworkServer ns = new NettyNetworkServer(config); - ns.registerHandler(MyRequestMessage.getDefaultInstance(), MyResponseMessage.getDefaultInstance(), new MessageHandler()); - ns.bind(nodeId); - } - } +```java +public class NorbertNetworkServer { + public static void main(String[] args) { + NetworkServerConfig config = new NetworkServerConfig(); + config.setServiceName("norbert"); + config.setZooKeeperConnectString("localhost:2181"); + config.setZooKeeperSessionTimeoutMillis(30000); + config.setRequestThreadCorePoolSize(5); + config.setRequestThreadMaxPoolSize(10); + config.setRequestThreadKeepAliveTimeSecs(300); + + NetworkServer ns = new NettyNetworkServer(config); + ns.registerHandler(MyRequestMessage.getDefaultInstance(), MyResponseMessage.getDefaultInstance(), new MessageHandler()); + ns.bind(nodeId); + } +} +``` 1. A `NetworkServerConfig` contains the configuration data for a `NetworkServer`. 2. `NettyNetworkServer` is currently the only implementation of `NetworkServer`. @@ -177,37 +185,39 @@ If you are building a partitioned cluster then you will want to use the `Partiti ### Writing the client code - the Scala version - object NorbertNetworkClient { - def main(args: Array[String]) { - val config = new NetworkClientConfig (1) - config.serviceName = "norbert" - config.zooKeeperConnectString = "localhost:2181" - config.zooKeeperSessionTimeoutMillis = 30000 - config.connectTimeoutMillis = 1000 - config.writeTimeoutMillis = 150 - config.maxConnectionsPerNode = 5 - config.staleRequestTimeoutMins = 10 - config.staleRequestCleanupFrequenceMins = 10 - - val nc = NetworkClient(config, new RoundRobinLoadBalancerFactory) (2) - OR - val nc = PartitionedNetworkClient(config, new IntegerConsistentHashPartitionedLoadBalancerFactory) - - nc.registerRequest(MyRequestMessage.getDefaultInstance(), MyResponseMessage.getDefaultInstance()) (3) - - val f = nc.sendMessage(myRequestMessageInstance) (4) - OR - val f = nc.sendMessage(1210, myRequestMessageInstance) - - try { - val response = f.get(500, TimeUnit.MILLISECONDS).asInstanceOf[MyResponseMessage] (5) - // do something with the response - } catch { - case ex: TimeoutException => println("Timed out") - case ex: ExecutionException => println("Error: %s".format(ex.getCause)) - } - } +```scala +object NorbertNetworkClient { + def main(args: Array[String]) { + val config = new NetworkClientConfig (1) + config.serviceName = "norbert" + config.zooKeeperConnectString = "localhost:2181" + config.zooKeeperSessionTimeoutMillis = 30000 + config.connectTimeoutMillis = 1000 + config.writeTimeoutMillis = 150 + config.maxConnectionsPerNode = 5 + config.staleRequestTimeoutMins = 10 + config.staleRequestCleanupFrequenceMins = 10 + + val nc = NetworkClient(config, new RoundRobinLoadBalancerFactory) (2) + OR + val nc = PartitionedNetworkClient(config, new IntegerConsistentHashPartitionedLoadBalancerFactory) + + nc.registerRequest(MyRequestMessage.getDefaultInstance(), MyResponseMessage.getDefaultInstance()) (3) + + val f = nc.sendMessage(myRequestMessageInstance) (4) + OR + val f = nc.sendMessage(1210, myRequestMessageInstance) + + try { + val response = f.get(500, TimeUnit.MILLISECONDS).asInstanceOf[MyResponseMessage] (5) + // do something with the response + } catch { + case ex: TimeoutException => println("Timed out") + case ex: ExecutionException => println("Error: %s".format(ex.getCause)) } + } +} +``` 1. A `NetworkClientConfig` contains the configuration data for a `NetworkClient`. 2. The `NetworkClient` companion object provides an easy to instantiate a new `NetworkClient` instance. Alternatively the `PartitionedNetworkClient` companion object provides the same functionality for `PartitionedNetworkClient`s. @@ -217,40 +227,42 @@ If you are building a partitioned cluster then you will want to use the `Partiti ### Writing the client code - the Java version - public class NorbertNetworkClient { - public static void main(String[] args) { - NetworkClientConfig config = new NetworkClientConfig(); (1) - config.setServiceName("norbert"); - config.setZooKeeperConnectString("localhost:2181"); - config.setZooKeeperSessionTimeoutMillis(30000); - config.setConnectTimeoutMillis(1000); - config.setWriteTimeoutMillis(150); - config.setConnectionsPerNode(5); - config.setStaleRequestTimeoutMins(10); - config.setStaleRequestCleanupFrequenceMins10); - - NetworkClient nc = new NettyNetworkClient(config, new RoundRobinLoadBalancerFactory()); (2) - OR - PartitionedNetworkClient nc = new NettyPartitionedNetworkClient(config, new IntegerConsistentHashPartitionedLoadBalancerFactory()); - - nc.registerRequest(MyRequestMessage.getDefaultInstance(), MyResponseMessage.getDefaultInstance()); (3) - - Future f = nc.sendMessage(myRequestMessageInstance); (4) - OR - Future f = nc.sendMessage(1210, myRequestMessageInstance); - - try { - MyResponseMessage response = (MyResponseMessage) f.get(500, TimeUnit.MILLISECONDS); (5) - // do something with the response - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (ExecutionException e) { - e.printStackTrace(); - } catch (TimeoutException e) { - e.printStackTrace(); - } - } +```java +public class NorbertNetworkClient { + public static void main(String[] args) { + NetworkClientConfig config = new NetworkClientConfig(); (1) + config.setServiceName("norbert"); + config.setZooKeeperConnectString("localhost:2181"); + config.setZooKeeperSessionTimeoutMillis(30000); + config.setConnectTimeoutMillis(1000); + config.setWriteTimeoutMillis(150); + config.setConnectionsPerNode(5); + config.setStaleRequestTimeoutMins(10); + config.setStaleRequestCleanupFrequenceMins10); + + NetworkClient nc = new NettyNetworkClient(config, new RoundRobinLoadBalancerFactory()); (2) + OR + PartitionedNetworkClient nc = new NettyPartitionedNetworkClient(config, new IntegerConsistentHashPartitionedLoadBalancerFactory()); + + nc.registerRequest(MyRequestMessage.getDefaultInstance(), MyResponseMessage.getDefaultInstance()); (3) + + Future f = nc.sendMessage(myRequestMessageInstance); (4) + OR + Future f = nc.sendMessage(1210, myRequestMessageInstance); + + try { + MyResponseMessage response = (MyResponseMessage) f.get(500, TimeUnit.MILLISECONDS); (5) + // do something with the response + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ExecutionException e) { + e.printStackTrace(); + } catch (TimeoutException e) { + e.printStackTrace(); } + } +} +``` 1. A `NetworkClientConfig` contains the configuration data for a `NetworkClient`. 2. `NettyNetworkClient` and `NettyPartitionedNetworkClient` are currently the only implementations of `NetworkClient` and `PartitionedNetworkClient` respectively. @@ -264,9 +276,8 @@ If you are building a partitioned cluster then you will want to use the `Partiti * zooKeeperConnectString - the connection string passed to ZooKeeper * zooKeeperSessionTimeoutMillis - the session timeout passed to ZooKeeper in milliseconds * clusterClient - as an alternative the the prior configuration parameters, you can create a `ClusterClient` instance yourself and have the `NetworkServer` use that instance by setting this field -* connectTimeoutMillis - the maximum number of milliseconds to allow a connection attempt to take +* connectTimeoutMillis - the maximum number of milliseconds to allow a connection attempt to take * writeTimeoutMillis - the number of milliseconds a request can be queued for write before it is considered stale * maxConnectionsPerNode - the maximum number of open connections to a node. The total number of connections that can be opened by a network client is maxConnectionsPerNode * number of nodes * staleRequestTimeoutMins - the number of minutes to keep a request that is waiting for a response * staleRequestCleanupFrequenceMins - the frequency to clean up stale requests -