Skip to content

Commit

Permalink
Merge pull request #230 from magro/master
Browse files Browse the repository at this point in the history
Add simple, queue based kryo pool
  • Loading branch information
magro committed Jul 25, 2014
2 parents e1e3b0b + 05fed9c commit 23db0d8
Show file tree
Hide file tree
Showing 6 changed files with 438 additions and 0 deletions.
54 changes: 54 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ If you are planning to use Kryo for network communication, the [KryoNet](https:/
- [Interoperability](#interoperability)
- [Stack size](#stack-size)
- [Threading](#threading)
- [Pooling Kryo instances](#pooling-kryo-instances)
- [Logging](#logging)
- [Integration with Maven](#integration-with-maven)
- [Using Kryo without Maven](#using-kryo-without-maven)
Expand Down Expand Up @@ -531,6 +532,59 @@ The serializers Kryo provides use the call stack when serializing nested objects

**Kryo is not thread safe. Each thread should have its own Kryo, Input, and Output instances. Also, the byte[] Input uses may be modified and then returned to its original state during deserialization, so the same byte[] "should not be used concurrently in separate threads**.

## Pooling Kryo instances

Because the creation/initialization of `Kryo` instances is rather expensive, in a multithreaded scenario you should pool `Kryo` instances.
A very simple solution is to bind `Kryo` instances to Threads using `ThreadLocal`, like this:

```java
// Setup ThreadLocal of Kryo instances
private ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() {
protected Kryo initialValue() {
Kryo kryo = new Kryo();
// configure kryo instance, customize settings
return kryo;
};
};

// Somewhere else, use Kryo
Kryo k = kryos.get();
...
```

Alternatively you may want to use the `KryoPool` provided by kryo. The `KryoPool` keeps references to `Kryo` instances
using `SoftReference`s, so that `Kryo` instances will be GC'ed when the JVM starts to run out of memory
(of course you could use `ThreadLocal` with `SoftReference`s as well).

Here's an example that shows how to use the `KryoPool`:

```java
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.pool.KryoCallback;
import com.esotericsoftware.kryo.pool.KryoFactory;
import com.esotericsoftware.kryo.pool.KryoPool;

KryoFactory factory = new KryoFactory() {
public Kryo create () {
Kryo kryo = new Kryo();
// configure kryo instance, customize settings
return kryo;
}
};
KryoPool pool = new KryoPool(factory);
Kryo kryo = pool.borrow();
// do s.th. with kryo here, and afterwards release it
pool.release(kryo);

// or use a callback to work with kryo - no need to borrow/release,
// that's done by `run`.
String value = pool.run(new KryoCallback() {
public String execute(Kryo kryo) {
return kryo.readObject(input, String.class);
}
});
```

## Logging

Kryo makes use of the low overhead, lightweight [MinLog logging library](http://code.google.com/p/minlog/). The logging level can be set by one of the following methods:
Expand Down
14 changes: 14 additions & 0 deletions src/com/esotericsoftware/kryo/pool/KryoCallback.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.esotericsoftware.kryo.pool;

import com.esotericsoftware.kryo.Kryo;

/**
* Callback to run with a provided kryo instance.
*
* @author Martin Grotzke
*
* @param <T> The type of the result of the interaction with kryo.
*/
public interface KryoCallback<T> {
T execute(Kryo kryo);
}
12 changes: 12 additions & 0 deletions src/com/esotericsoftware/kryo/pool/KryoFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.esotericsoftware.kryo.pool;

import com.esotericsoftware.kryo.Kryo;

/**
* Factory to create new configured instances of {@link Kryo}.
*
* @author Martin Grotzke
*/
public interface KryoFactory {
Kryo create();
}
87 changes: 87 additions & 0 deletions src/com/esotericsoftware/kryo/pool/KryoPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package com.esotericsoftware.kryo.pool;

import java.lang.ref.SoftReference;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;

import com.esotericsoftware.kryo.Kryo;

/**
* A simple, queue based pool for {@link Kryo} instances. Kryo instances
* are cached using {@link SoftReference}s and therefore should not cause
* OOMEs.
*
* Usage:
* <pre>
* import com.esotericsoftware.kryo.Kryo;
* import com.esotericsoftware.kryo.pool.KryoCallback;
* import com.esotericsoftware.kryo.pool.KryoFactory;
* import com.esotericsoftware.kryo.pool.KryoPool;
*
* KryoFactory factory = new KryoFactory() {
* public Kryo create () {
* Kryo kryo = new Kryo();
* // configure kryo instance, customize settings
* return kryo;
* }
* };
* KryoPool pool = new KryoPool(factory);
* Kryo kryo = pool.borrow();
* // do s.th. with kryo here, and afterwards release it
* pool.release(kryo);
*
* // or use a callback to work with kryo (pool.run borrows+releases for you)
* String value = pool.run(new KryoCallback<String>() {
* public String execute(Kryo kryo) {
* return kryo.readObject(input, String.class);
* }
* });
*
* </pre>
*
* @author Martin Grotzke
*/
public class KryoPool {

private final Queue<SoftReference<Kryo>> queue;
private final KryoFactory factory;

public KryoPool(KryoFactory factory) {
this(factory, new ConcurrentLinkedQueue<SoftReference<Kryo>>());
}

public KryoPool(KryoFactory factory, Queue<SoftReference<Kryo>> queue) {
this.factory = factory;
this.queue = queue;
}

public int size () {
return queue.size();
}

public Kryo borrow () {
Kryo res;
SoftReference<Kryo> ref;
while((ref = queue.poll()) != null) {
if((res = ref.get()) != null) {
return res;
}
}
return factory.create();
}

public void release (Kryo kryo) {
queue.offer(new SoftReference(kryo));
}

public <T> T run(KryoCallback<T> callback) {
Kryo kryo = borrow();
try {
return callback.execute(kryo);
} finally {
release(kryo);
}
}

}
159 changes: 159 additions & 0 deletions test/com/esotericsoftware/kryo/pool/KryoPoolBenchmarkTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package com.esotericsoftware.kryo.pool;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

import org.junit.Before;
import org.junit.Test;

import com.esotericsoftware.kryo.FieldSerializerTest.DefaultTypes;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoSerializable;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

public class KryoPoolBenchmarkTest {

private static final int WARMUP_ITERATIONS = 10000;

/** Number of runs. */
private static final int RUN_CNT = 10;

/** Number of iterations. Set it to something rather big for obtaining meaningful results */
// private static final int ITER_CNT = 200000;
private static final int ITER_CNT = 10000;
private static final int SLEEP_BETWEEN_RUNS = 100;

KryoFactory factory;
KryoPool pool;

@Before
public void beforeMethod() {
factory = new KryoFactory() {
@Override
public Kryo create () {
Kryo kryo = new Kryo();
kryo.register(DefaultTypes.class);
kryo.register(SampleObject.class);
return kryo;
}
};
pool = new KryoPool(factory);
}

@Test
public void testWithoutPool() throws Exception {
// Warm-up phase: Perform 100000 iterations
runWithoutPool(1, WARMUP_ITERATIONS, false);
runWithoutPool(RUN_CNT, ITER_CNT, true);
}

@Test
public void testWithPool() throws Exception {
// Warm-up phase: Perform 100000 iterations
runWithPool(1, WARMUP_ITERATIONS, false);
runWithPool(RUN_CNT, ITER_CNT, true);
}

private void run (String description, Runnable runnable, final int runCount, final int iterCount, boolean outputResults) throws Exception {
long avgDur = 0;
long bestTime = Long.MAX_VALUE;

for (int i = 0; i < runCount; i++) {
long start = System.nanoTime();

for (int j = 0; j < iterCount; j++) {
runnable.run();
}

long dur = System.nanoTime() - start;
dur = TimeUnit.NANOSECONDS.toMillis(dur);

if (outputResults) System.out.format(">>> %s (run %d): %,d ms\n", description, i + 1, dur);
avgDur += dur;
bestTime = Math.min(bestTime, dur);
systemCleanupAfterRun();
}

avgDur /= runCount;

if (outputResults) {
System.out.format("\n>>> %s (average): %,d ms", description, avgDur);
System.out.format("\n>>> %s (best time): %,d ms\n\n", description, bestTime);
}

}

private void runWithoutPool (final int runCount, final int iterCount, boolean outputResults) throws Exception {
run("Without pool", new Runnable() {
@Override
public void run () {
factory.create();
}
}, runCount, iterCount, outputResults);
}

private void runWithPool (final int runCount, final int iterCount, boolean outputResults) throws Exception {
run("With pool", new Runnable() {
@Override
public void run () {
Kryo kryo = pool.borrow();
pool.release(kryo);
}
}, runCount, iterCount, outputResults);
}

private void systemCleanupAfterRun () throws InterruptedException {
System.gc();
Thread.sleep(SLEEP_BETWEEN_RUNS);
System.gc();
}

private static class SampleObject {
private int intVal;
private float floatVal;
private Short shortVal;
private long[] longArr;
private double[] dblArr;
private String str;

public SampleObject () {
}

SampleObject (int intVal, float floatVal, Short shortVal, long[] longArr, double[] dblArr, String str) {
this.intVal = intVal;
this.floatVal = floatVal;
this.shortVal = shortVal;
this.longArr = longArr;
this.dblArr = dblArr;
this.str = str;
}

/** {@inheritDoc} */
@Override
public boolean equals (Object other) {
if (this == other) return true;

if (other == null || getClass() != other.getClass()) return false;

SampleObject obj = (SampleObject)other;

return intVal == obj.intVal && floatVal == obj.floatVal && shortVal.equals(obj.shortVal)
&& Arrays.equals(dblArr, obj.dblArr) && Arrays.equals(longArr, obj.longArr) && (str == null ? obj.str == null : str.equals(obj.str));
}
}


}
Loading

0 comments on commit 23db0d8

Please sign in to comment.