Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Anigam selective retry stuff #6

Closed
wants to merge 73 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
aefb584
Ported to gradle build
Jun 28, 2013
38c3c3d
Merge pull request #24 from divchenko/feature/gradle-build
jhartman Jul 5, 2013
25f4460
FIX: change version back to 0.6.33
Jul 11, 2013
b718e17
Merge pull request #26 from vikstrous/0.6.33
jhartman Jul 11, 2013
11d6c89
Adjusting the norbert tuning parameters affected by the change from m…
Jul 15, 2013
ad21559
Merge remote-tracking branch 'upstream/master'
Jul 15, 2013
37ce05b
Upping the version
Jul 15, 2013
079ebea
Merge pull request #27 from abhinigam/master
jhartman Jul 16, 2013
1977d54
Updating Netty Version in Norbert for USCP 2050
Aug 1, 2013
38bcb5b
Merge pull request #1 from navina/master
jhartman Aug 5, 2013
276fb1f
Upping the version
Aug 5, 2013
92e1a21
Merge pull request #2 from navina/master
navina Aug 5, 2013
44d86ea
Modifying the JMX stats so that they remain unchanged even after mill…
Aug 12, 2013
135ebc2
Adding metrics: active pool size, current pool size, queue time, tota…
Aug 14, 2013
d981f34
Testing push mechanism
Aug 15, 2013
b43b725
Moving queueTime from endRequest to beginRequest, leaving existing la…
Sep 9, 2013
ef742cc
Fixing tests
Sep 9, 2013
f06acf7
Upgrading the version of norbert
Sep 9, 2013
f674813
Part of selective retry commit
Oct 25, 2013
800a3ba
Part of selective retry commit
Oct 25, 2013
cc5af61
Final set of changes for selective retry
Oct 25, 2013
1fe42ec
Version upgrade change
Oct 25, 2013
18efc73
Enhancement to make sure in case the number of errors exceeds thresho…
Oct 25, 2013
c8a4ed3
Enhanced the retry logic to continue beyond the initial timeout to re…
Oct 25, 2013
e082a5c
Removing an extraneous comment
Oct 25, 2013
39d857c
Add support in partitioned factory for specifying selective retry
Oct 25, 2013
c5761ac
Spring/java friendly SelectiveRetry interface
Oct 30, 2013
189cc3e
Bumping up the version due to artifactory issues
Oct 30, 2013
b1ed1e6
Adding logging for selective retry
Oct 30, 2013
dc790a8
Bumping the version
Oct 30, 2013
9da27f9
Upgrading the version of norbert
Nov 1, 2013
4d4f0d6
Fixing requests per second metric
Nov 3, 2013
4d36a5a
Bumping the version
Nov 3, 2013
53824e4
Adding bindByPort method which looks up node configurations in zookeeper
Nov 13, 2013
071bb8a
Merge pull request #3 from linkedin/tworboys-branch
tworboys Nov 14, 2013
a0482ce
Fixing the race condition when we restart after SIGKILL
Nov 16, 2013
a4c343b
Expose duplicatesOk as config and change Future in APIs to FutureList…
Nov 22, 2013
25935a7
Bumping the version up
Nov 22, 2013
e552acd
Hooking up FutureAdapterListener to multiple places
Dec 2, 2013
f254c47
Bumping up the version
Dec 2, 2013
97cea23
Bumped Netty version to fix shutdown hang.
nsabovic Dec 4, 2013
84897ed
Undo protobuf version bump...
nsabovic Dec 4, 2013
0c0f791
Adding hash functions and fixing a load balancer bug
Dec 5, 2013
65f36cb
Merge pull request #4 from linkedin/tworboys-branch
tworboys Dec 6, 2013
df51ea3
Bumping the version
Dec 6, 2013
c7852e5
Adding handling/logging of new zookeeper events. New API for overridi…
Dec 7, 2013
8eb463d
Adding logging/handling of additional zookeeper events. New API which…
Dec 7, 2013
dc57cc1
Fixing the md5 hash function
Dec 10, 2013
601d2f7
Merge pull request #6 from linkedin/tworboys-branch
tworboys Dec 10, 2013
1d3e8c1
Bumping the version
Dec 10, 2013
d82b911
Merge pull request #7 from linkedin/tworboys-branch
tworboys Dec 10, 2013
0c5b147
Allowing bind to work with hostname and port
Dec 19, 2013
7b2d1fa
update gitignore for bin folder and swp files
mavlee Dec 19, 2013
abb0de0
fix documentation to use syntax highlighting
mavlee Dec 19, 2013
1966867
Merge pull request #9 from mavlee/master
mavlee Dec 20, 2013
91ea1c4
Fixing the test case concurrency issue. Test cases share the one sing…
sungjuc Jan 10, 2014
a9f40da
Merge pull request #10 from sungjuc/master
mavlee Jan 11, 2014
d7e5c15
Merge pull request #8 from linkedin/tworboys-branch
tworboys Jan 19, 2014
e90a75a
Fixing inefficiency of default round-robin load balancer for partitio…
sungjuc Jan 23, 2014
87400ea
Merge pull request #11 from sungjuc/master
sungjuc Jan 23, 2014
557805b
Bumping up the version for load balancer fixes.
sungjuc Jan 23, 2014
acd781c
Merge pull request #12 from sungjuc/master
sungjuc Jan 23, 2014
8c2c81b
refactored code to be source compatible with Scala 2.10 and added sup…
cdfreeman Jan 26, 2014
372d7d6
Merge pull request #13 from cdfreeman/cfreeman-branch
Jan 27, 2014
f0f8291
Bumping version number after adding support for scala 2.10.x
Jan 27, 2014
05f596b
Fixing the capability round-robin test cases.
sungjuc Feb 3, 2014
35ab7ed
Merge pull request #14 from sungjuc/master
sungjuc Feb 3, 2014
439e283
Fixing counter incrementing for capability round-robin.
sungjuc Feb 3, 2014
63c50e8
Merge pull request #15 from sungjuc/master
sungjuc Feb 3, 2014
3eb0101
Bumping version after Jo's fixes
Feb 4, 2014
6643f56
bumping version to 0.6.55
Feb 5, 2014
5544436
Merge pull request #16 from jarekr/master
tworboys Feb 5, 2014
db7c50f
Fixing a bug in latest selective retry code
Feb 24, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
bin/
target/
lib_managed/
src_managed/
project/boot/
project/plugins/project
out/
build/
.gradle
*.iml
*.ipr
*.iws
*.swp
252 changes: 132 additions & 120 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,18 @@ Norbert provides two ways to interact with the cluster.

### Writing the code - the Scala version

object NorbertClient {
def main(args: Array[String]) {
val cc = ClusterClient("norbert", "localhost:2181", 30000) (1)
cc.awaitConnectionUninterruptibly (2)
cc.nodes (3)
cc.addListener(new MyClusterListener) (4)
cc.markNodeAvailable(1) (5)
cc.shutdown (6)
}
}
```scala
object NorbertClient {
def main(args: Array[String]) {
val cc = ClusterClient("norbert", "localhost:2181", 30000) (1)
cc.awaitConnectionUninterruptibly (2)
cc.nodes (3)
cc.addListener(new MyClusterListener) (4)
cc.markNodeAvailable(1) (5)
cc.shutdown (6)
}
}
```

1. The `ClusterClient` companion object provides an easy way to instantiate and start a `ClusterClient` instance.
2. Before using the ClusterClient you must wait for it to finish connecting.
Expand All @@ -72,16 +74,18 @@ Norbert provides two ways to interact with the cluster.

### Writing the code - the Java version

public class NorbertClient {
public static void main(String[] args) {
ClusterClient cc = new ZooKeeperClusterClient("norbert", "localhost:2181", 30000); (1)
cc.awaiteConnectionUninterruptibly(); (2)
cc.getNodes(); (3)
cc.addListener(new MyClusterListener()); (4)
cc.markNodeAvailable(1); (5).
cluster.shutdown(6)
}
}
```java
public class NorbertClient {
public static void main(String[] args) {
ClusterClient cc = new ZooKeeperClusterClient("norbert", "localhost:2181", 30000); (1)
cc.awaiteConnectionUninterruptibly(); (2)
cc.getNodes(); (3)
cc.addListener(new MyClusterListener()); (4)
cc.markNodeAvailable(1); (5).
cluster.shutdown(6)
}
}
```

1. There are currently two ClusterClient implementations in Norbert, this code is instantiating the one that uses ZooKeeper.
2. Before using the ClusterClient you must wait for it to finish connecting.
Expand Down Expand Up @@ -110,32 +114,34 @@ Norbert's client/server library uses message passing semantics and, specifically
### Load Balancers

Norbert uses a software load balancer mechanism to route a request from a client to a server. Both partitioned and unpartitioned clusters are supported.
If you are building a service which will use an unpartitioned cluster, you must provide your `NetworkClient` instance with a `LoadBalancerFactory`. The `LoadBalancerFactory` is used to create `LoadBalancer` instance that will be used to route requests. A round robin load balancer factory is provided.

If you are building a service which will use an unpartitioned cluster, you must provide your `NetworkClient` instance with a `LoadBalancerFactory`. The `LoadBalancerFactory` is used to create `LoadBalancer` instance that will be used to route requests. A round robin load balancer factory is provided.

If you are building a partitioned cluster then you will want to use the `PartitionedNetworkClient` and a `PartitionedLoadBalancerFactory`. These are generic classes that have a PartitionedId type parameter. PartitionedId is the type of the id that you use to partition your cluster (e.g. a member id). A consistent hash load balancer factory is provided.

### Writing a network server - the Scala version

object NorbertNetworkServer {
def main(args: Array[String]) {
val config = new NetworkServerConfig (1)
config.serviceName = "norbert"
config.zooKeeperConnectString = "localhost:2181"
config.zooKeeperSessionTimeoutMillis = 30000
config.requestThreadCorePoolSize = 5
config.requestThreadMaxPoolSize = 10
config.requestThreadKeepAliveTimeSecs = 300

val server = NetworkServer(config) (2)
server.registerHandler(MyRequestMessage.getDefaultInstance, MyResponseMessage.getDefaultInstance, messageHandler _) (3)
server.bind(nodeId) (4)
}

private def messageHandler(message: Message): Message = {
// application logic which returns a MyResponseMessage
}
}
```scala
object NorbertNetworkServer {
def main(args: Array[String]) {
val config = new NetworkServerConfig (1)
config.serviceName = "norbert"
config.zooKeeperConnectString = "localhost:2181"
config.zooKeeperSessionTimeoutMillis = 30000
config.requestThreadCorePoolSize = 5
config.requestThreadMaxPoolSize = 10
config.requestThreadKeepAliveTimeSecs = 300

val server = NetworkServer(config) (2)
server.registerHandler(MyRequestMessage.getDefaultInstance, MyResponseMessage.getDefaultInstance, messageHandler _) (3)
server.bind(nodeId) (4)
}

private def messageHandler(message: Message): Message = {
// application logic which returns a MyResponseMessage
}
}
```

1. A `NetworkServerConfig` contains the configuration data for a `NetworkServer`.
2. The `NetworkServer` companion object provides an easy to instantiate a new `NetworkServer` instance.
Expand All @@ -144,21 +150,23 @@ If you are building a partitioned cluster then you will want to use the `Partiti

### Writing a network server - the Java version

public class NorbertNetworkServer {
public static void main(String[] args) {
NetworkServerConfig config = new NetworkServerConfig();
config.setServiceName("norbert");
config.setZooKeeperConnectString("localhost:2181");
config.setZooKeeperSessionTimeoutMillis(30000);
config.setRequestThreadCorePoolSize(5);
config.setRequestThreadMaxPoolSize(10);
config.setRequestThreadKeepAliveTimeSecs(300);

NetworkServer ns = new NettyNetworkServer(config);
ns.registerHandler(MyRequestMessage.getDefaultInstance(), MyResponseMessage.getDefaultInstance(), new MessageHandler());
ns.bind(nodeId);
}
}
```java
public class NorbertNetworkServer {
public static void main(String[] args) {
NetworkServerConfig config = new NetworkServerConfig();
config.setServiceName("norbert");
config.setZooKeeperConnectString("localhost:2181");
config.setZooKeeperSessionTimeoutMillis(30000);
config.setRequestThreadCorePoolSize(5);
config.setRequestThreadMaxPoolSize(10);
config.setRequestThreadKeepAliveTimeSecs(300);

NetworkServer ns = new NettyNetworkServer(config);
ns.registerHandler(MyRequestMessage.getDefaultInstance(), MyResponseMessage.getDefaultInstance(), new MessageHandler());
ns.bind(nodeId);
}
}
```

1. A `NetworkServerConfig` contains the configuration data for a `NetworkServer`.
2. `NettyNetworkServer` is currently the only implementation of `NetworkServer`.
Expand All @@ -177,37 +185,39 @@ If you are building a partitioned cluster then you will want to use the `Partiti

### Writing the client code - the Scala version

object NorbertNetworkClient {
def main(args: Array[String]) {
val config = new NetworkClientConfig (1)
config.serviceName = "norbert"
config.zooKeeperConnectString = "localhost:2181"
config.zooKeeperSessionTimeoutMillis = 30000
config.connectTimeoutMillis = 1000
config.writeTimeoutMillis = 150
config.maxConnectionsPerNode = 5
config.staleRequestTimeoutMins = 10
config.staleRequestCleanupFrequenceMins = 10

val nc = NetworkClient(config, new RoundRobinLoadBalancerFactory) (2)
OR
val nc = PartitionedNetworkClient(config, new IntegerConsistentHashPartitionedLoadBalancerFactory)

nc.registerRequest(MyRequestMessage.getDefaultInstance(), MyResponseMessage.getDefaultInstance()) (3)

val f = nc.sendMessage(myRequestMessageInstance) (4)
OR
val f = nc.sendMessage(1210, myRequestMessageInstance)

try {
val response = f.get(500, TimeUnit.MILLISECONDS).asInstanceOf[MyResponseMessage] (5)
// do something with the response
} catch {
case ex: TimeoutException => println("Timed out")
case ex: ExecutionException => println("Error: %s".format(ex.getCause))
}
}
```scala
object NorbertNetworkClient {
def main(args: Array[String]) {
val config = new NetworkClientConfig (1)
config.serviceName = "norbert"
config.zooKeeperConnectString = "localhost:2181"
config.zooKeeperSessionTimeoutMillis = 30000
config.connectTimeoutMillis = 1000
config.writeTimeoutMillis = 150
config.maxConnectionsPerNode = 5
config.staleRequestTimeoutMins = 10
config.staleRequestCleanupFrequenceMins = 10

val nc = NetworkClient(config, new RoundRobinLoadBalancerFactory) (2)
OR
val nc = PartitionedNetworkClient(config, new IntegerConsistentHashPartitionedLoadBalancerFactory)

nc.registerRequest(MyRequestMessage.getDefaultInstance(), MyResponseMessage.getDefaultInstance()) (3)

val f = nc.sendMessage(myRequestMessageInstance) (4)
OR
val f = nc.sendMessage(1210, myRequestMessageInstance)

try {
val response = f.get(500, TimeUnit.MILLISECONDS).asInstanceOf[MyResponseMessage] (5)
// do something with the response
} catch {
case ex: TimeoutException => println("Timed out")
case ex: ExecutionException => println("Error: %s".format(ex.getCause))
}
}
}
```

1. A `NetworkClientConfig` contains the configuration data for a `NetworkClient`.
2. The `NetworkClient` companion object provides an easy to instantiate a new `NetworkClient` instance. Alternatively the `PartitionedNetworkClient` companion object provides the same functionality for `PartitionedNetworkClient`s.
Expand All @@ -217,40 +227,42 @@ If you are building a partitioned cluster then you will want to use the `Partiti

### Writing the client code - the Java version

public class NorbertNetworkClient {
public static void main(String[] args) {
NetworkClientConfig config = new NetworkClientConfig(); (1)
config.setServiceName("norbert");
config.setZooKeeperConnectString("localhost:2181");
config.setZooKeeperSessionTimeoutMillis(30000);
config.setConnectTimeoutMillis(1000);
config.setWriteTimeoutMillis(150);
config.setConnectionsPerNode(5);
config.setStaleRequestTimeoutMins(10);
config.setStaleRequestCleanupFrequenceMins10);

NetworkClient nc = new NettyNetworkClient(config, new RoundRobinLoadBalancerFactory()); (2)
OR
PartitionedNetworkClient<Integer> nc = new NettyPartitionedNetworkClient<Integer>(config, new IntegerConsistentHashPartitionedLoadBalancerFactory());

nc.registerRequest(MyRequestMessage.getDefaultInstance(), MyResponseMessage.getDefaultInstance()); (3)

Future<Message> f = nc.sendMessage(myRequestMessageInstance); (4)
OR
Future<Message> f = nc.sendMessage(1210, myRequestMessageInstance);

try {
MyResponseMessage response = (MyResponseMessage) f.get(500, TimeUnit.MILLISECONDS); (5)
// do something with the response
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
```java
public class NorbertNetworkClient {
public static void main(String[] args) {
NetworkClientConfig config = new NetworkClientConfig(); (1)
config.setServiceName("norbert");
config.setZooKeeperConnectString("localhost:2181");
config.setZooKeeperSessionTimeoutMillis(30000);
config.setConnectTimeoutMillis(1000);
config.setWriteTimeoutMillis(150);
config.setConnectionsPerNode(5);
config.setStaleRequestTimeoutMins(10);
config.setStaleRequestCleanupFrequenceMins10);

NetworkClient nc = new NettyNetworkClient(config, new RoundRobinLoadBalancerFactory()); (2)
OR
PartitionedNetworkClient<Integer> nc = new NettyPartitionedNetworkClient<Integer>(config, new IntegerConsistentHashPartitionedLoadBalancerFactory());

nc.registerRequest(MyRequestMessage.getDefaultInstance(), MyResponseMessage.getDefaultInstance()); (3)

Future<Message> f = nc.sendMessage(myRequestMessageInstance); (4)
OR
Future<Message> f = nc.sendMessage(1210, myRequestMessageInstance);

try {
MyResponseMessage response = (MyResponseMessage) f.get(500, TimeUnit.MILLISECONDS); (5)
// do something with the response
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
```

1. A `NetworkClientConfig` contains the configuration data for a `NetworkClient`.
2. `NettyNetworkClient` and `NettyPartitionedNetworkClient` are currently the only implementations of `NetworkClient` and `PartitionedNetworkClient` respectively.
Expand All @@ -264,7 +276,7 @@ If you are building a partitioned cluster then you will want to use the `Partiti
* zooKeeperConnectString - the connection string passed to ZooKeeper
* zooKeeperSessionTimeoutMillis - the session timeout passed to ZooKeeper in milliseconds
* clusterClient - as an alternative the the prior configuration parameters, you can create a `ClusterClient` instance yourself and have the `NetworkServer` use that instance by setting this field
* connectTimeoutMillis - the maximum number of milliseconds to allow a connection attempt to take
* connectTimeoutMillis - the maximum number of milliseconds to allow a connection attempt to take
* writeTimeoutMillis - the number of milliseconds a request can be queued for write before it is considered stale
* maxConnectionsPerNode - the maximum number of open connections to a node. The total number of connections that can be opened by a network client is maxConnectionsPerNode * number of nodes
* staleRequestTimeoutMins - the number of minutes to keep a request that is waiting for a response
Expand Down
35 changes: 35 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
project.ext.isDefaultEnvironment = !project.hasProperty('overrideBuildEnvironment')

File getEnvironmentScript()
{
final File env = file(isDefaultEnvironment ? 'defaultEnvironment.gradle' : project.overrideBuildEnvironment)
assert env.isFile() : "The environment script [$env] does not exists or is not a file."
return env
}

apply from: environmentScript

project.ext.externalDependency = [
'zookeeper':'org.apache.zookeeper:zookeeper:3.3.4',
'protobuf':'com.google.protobuf:protobuf-java:2.4.0a',
'log4j':'log4j:log4j:1.2.17',
'netty':'io.netty:netty:3.7.0.Final',
'slf4jApi':'org.slf4j:slf4j-api:1.7.5',
'slf4jLog4j':'org.slf4j:slf4j-log4j12:1.7.5',
'specs':'org.scala-tools.testing:specs_2.8.1:1.6.8',
'mockitoAll':'org.mockito:mockito-all:1.8.4',
'cglib':'cglib:cglib:2.1_3',
'objenesis':'org.objenesis:objenesis:1.2',
'scalaCompiler': 'org.scala-lang:scala-compiler:2.8.1',
'scalaLibrary': 'org.scala-lang:scala-library:2.8.1',
'scalatest': 'org.scalatest:scalatest:1.2',
'junit':'junit:junit:4.8.1'
];

subprojects {
plugins.withType(JavaPlugin) {
project.dependencies {
testCompile externalDependency.junit
}
}
}
16 changes: 16 additions & 0 deletions cluster/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
apply plugin: 'java'
apply plugin: 'scala'

dependencies {
compile externalDependency.scalaLibrary
compile externalDependency.zookeeper
compile externalDependency.protobuf
compile externalDependency.log4j

testCompile externalDependency.specs
testCompile externalDependency.mockitoAll
testCompile externalDependency.cglib
testCompile externalDependency.objenesis
testCompile externalDependency.junit
}

Loading