diff --git a/README.md b/README.md index 8a9e36022..1d4157ae8 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,7 @@ If you are planning to use Kryo for network communication, the [KryoNet](https:/ - [Interoperability](#interoperability) - [Stack size](#stack-size) - [Threading](#threading) +- [Pooling Kryo instances](#pooling-kryo-instances) - [Logging](#logging) - [Integration with Maven](#integration-with-maven) - [Using Kryo without Maven](#using-kryo-without-maven) @@ -531,6 +532,59 @@ The serializers Kryo provides use the call stack when serializing nested objects **Kryo is not thread safe. Each thread should have its own Kryo, Input, and Output instances. Also, the byte[] Input uses may be modified and then returned to its original state during deserialization, so the same byte[] "should not be used concurrently in separate threads**. +## Pooling Kryo instances + +Because the creation/initialization of `Kryo` instances is rather expensive, in a multithreaded scenario you should pool `Kryo` instances. +A very simple solution is to bind `Kryo` instances to Threads using `ThreadLocal`, like this: + +```java +// Setup ThreadLocal of Kryo instances +private ThreadLocal kryos = new ThreadLocal() { + protected Kryo initialValue() { + Kryo kryo = new Kryo(); + // configure kryo instance, customize settings + return kryo; + }; +}; + +// Somewhere else, use Kryo +Kryo k = kryos.get(); +... +``` + +Alternatively you may want to use the `KryoPool` provided by kryo. The `KryoPool` keeps references to `Kryo` instances +using `SoftReference`s, so that `Kryo` instances will be GC'ed when the JVM starts to run out of memory +(of course you could use `ThreadLocal` with `SoftReference`s as well). + +Here's an example that shows how to use the `KryoPool`: + +```java +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.pool.KryoCallback; +import com.esotericsoftware.kryo.pool.KryoFactory; +import com.esotericsoftware.kryo.pool.KryoPool; + +KryoFactory factory = new KryoFactory() { + public Kryo create () { + Kryo kryo = new Kryo(); + // configure kryo instance, customize settings + return kryo; + } +}; +KryoPool pool = new KryoPool(factory); +Kryo kryo = pool.borrow(); +// do s.th. with kryo here, and afterwards release it +pool.release(kryo); + +// or use a callback to work with kryo - no need to borrow/release, +// that's done by `run`. +String value = pool.run(new KryoCallback() { + public String execute(Kryo kryo) { + return kryo.readObject(input, String.class); + } +}); +``` + ## Logging Kryo makes use of the low overhead, lightweight [MinLog logging library](http://code.google.com/p/minlog/). The logging level can be set by one of the following methods: diff --git a/src/com/esotericsoftware/kryo/pool/KryoCallback.java b/src/com/esotericsoftware/kryo/pool/KryoCallback.java new file mode 100644 index 000000000..b784d3813 --- /dev/null +++ b/src/com/esotericsoftware/kryo/pool/KryoCallback.java @@ -0,0 +1,14 @@ +package com.esotericsoftware.kryo.pool; + +import com.esotericsoftware.kryo.Kryo; + +/** + * Callback to run with a provided kryo instance. + * + * @author Martin Grotzke + * + * @param The type of the result of the interaction with kryo. + */ +public interface KryoCallback { + T execute(Kryo kryo); +} \ No newline at end of file diff --git a/src/com/esotericsoftware/kryo/pool/KryoFactory.java b/src/com/esotericsoftware/kryo/pool/KryoFactory.java new file mode 100644 index 000000000..c3620245b --- /dev/null +++ b/src/com/esotericsoftware/kryo/pool/KryoFactory.java @@ -0,0 +1,12 @@ +package com.esotericsoftware.kryo.pool; + +import com.esotericsoftware.kryo.Kryo; + +/** + * Factory to create new configured instances of {@link Kryo}. + * + * @author Martin Grotzke + */ +public interface KryoFactory { + Kryo create(); +} \ No newline at end of file diff --git a/src/com/esotericsoftware/kryo/pool/KryoPool.java b/src/com/esotericsoftware/kryo/pool/KryoPool.java new file mode 100644 index 000000000..6f28bb90c --- /dev/null +++ b/src/com/esotericsoftware/kryo/pool/KryoPool.java @@ -0,0 +1,87 @@ +package com.esotericsoftware.kryo.pool; + +import java.lang.ref.SoftReference; +import java.util.Queue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; + +import com.esotericsoftware.kryo.Kryo; + +/** + * A simple, queue based pool for {@link Kryo} instances. Kryo instances + * are cached using {@link SoftReference}s and therefore should not cause + * OOMEs. + * + * Usage: + *
+ * import com.esotericsoftware.kryo.Kryo;
+ * import com.esotericsoftware.kryo.pool.KryoCallback;
+ * import com.esotericsoftware.kryo.pool.KryoFactory;
+ * import com.esotericsoftware.kryo.pool.KryoPool;
+ * 
+ * KryoFactory factory = new KryoFactory() {
+ *   public Kryo create () {
+ *     Kryo kryo = new Kryo();
+ *     // configure kryo instance, customize settings
+ *     return kryo;
+ *   }
+ * };
+ * KryoPool pool = new KryoPool(factory);
+ * Kryo kryo = pool.borrow();
+ * // do s.th. with kryo here, and afterwards release it
+ * pool.release(kryo);
+ * 
+ * // or use a callback to work with kryo (pool.run borrows+releases for you)
+ * String value = pool.run(new KryoCallback() {
+ *   public String execute(Kryo kryo) {
+ *     return kryo.readObject(input, String.class);
+ *   }
+ * });
+ *
+ * 
+ * + * @author Martin Grotzke + */ +public class KryoPool { + + private final Queue> queue; + private final KryoFactory factory; + + public KryoPool(KryoFactory factory) { + this(factory, new ConcurrentLinkedQueue>()); + } + + public KryoPool(KryoFactory factory, Queue> queue) { + this.factory = factory; + this.queue = queue; + } + + public int size () { + return queue.size(); + } + + public Kryo borrow () { + Kryo res; + SoftReference ref; + while((ref = queue.poll()) != null) { + if((res = ref.get()) != null) { + return res; + } + } + return factory.create(); + } + + public void release (Kryo kryo) { + queue.offer(new SoftReference(kryo)); + } + + public T run(KryoCallback callback) { + Kryo kryo = borrow(); + try { + return callback.execute(kryo); + } finally { + release(kryo); + } + } + +} diff --git a/test/com/esotericsoftware/kryo/pool/KryoPoolBenchmarkTest.java b/test/com/esotericsoftware/kryo/pool/KryoPoolBenchmarkTest.java new file mode 100644 index 000000000..c67993d44 --- /dev/null +++ b/test/com/esotericsoftware/kryo/pool/KryoPoolBenchmarkTest.java @@ -0,0 +1,159 @@ +package com.esotericsoftware.kryo.pool; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectInputStream; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.junit.Before; +import org.junit.Test; + +import com.esotericsoftware.kryo.FieldSerializerTest.DefaultTypes; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoSerializable; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +public class KryoPoolBenchmarkTest { + + private static final int WARMUP_ITERATIONS = 10000; + + /** Number of runs. */ + private static final int RUN_CNT = 10; + + /** Number of iterations. Set it to something rather big for obtaining meaningful results */ +// private static final int ITER_CNT = 200000; + private static final int ITER_CNT = 10000; + private static final int SLEEP_BETWEEN_RUNS = 100; + + KryoFactory factory; + KryoPool pool; + + @Before + public void beforeMethod() { + factory = new KryoFactory() { + @Override + public Kryo create () { + Kryo kryo = new Kryo(); + kryo.register(DefaultTypes.class); + kryo.register(SampleObject.class); + return kryo; + } + }; + pool = new KryoPool(factory); + } + + @Test + public void testWithoutPool() throws Exception { + // Warm-up phase: Perform 100000 iterations + runWithoutPool(1, WARMUP_ITERATIONS, false); + runWithoutPool(RUN_CNT, ITER_CNT, true); + } + + @Test + public void testWithPool() throws Exception { + // Warm-up phase: Perform 100000 iterations + runWithPool(1, WARMUP_ITERATIONS, false); + runWithPool(RUN_CNT, ITER_CNT, true); + } + + private void run (String description, Runnable runnable, final int runCount, final int iterCount, boolean outputResults) throws Exception { + long avgDur = 0; + long bestTime = Long.MAX_VALUE; + + for (int i = 0; i < runCount; i++) { + long start = System.nanoTime(); + + for (int j = 0; j < iterCount; j++) { + runnable.run(); + } + + long dur = System.nanoTime() - start; + dur = TimeUnit.NANOSECONDS.toMillis(dur); + + if (outputResults) System.out.format(">>> %s (run %d): %,d ms\n", description, i + 1, dur); + avgDur += dur; + bestTime = Math.min(bestTime, dur); + systemCleanupAfterRun(); + } + + avgDur /= runCount; + + if (outputResults) { + System.out.format("\n>>> %s (average): %,d ms", description, avgDur); + System.out.format("\n>>> %s (best time): %,d ms\n\n", description, bestTime); + } + + } + + private void runWithoutPool (final int runCount, final int iterCount, boolean outputResults) throws Exception { + run("Without pool", new Runnable() { + @Override + public void run () { + factory.create(); + } + }, runCount, iterCount, outputResults); + } + + private void runWithPool (final int runCount, final int iterCount, boolean outputResults) throws Exception { + run("With pool", new Runnable() { + @Override + public void run () { + Kryo kryo = pool.borrow(); + pool.release(kryo); + } + }, runCount, iterCount, outputResults); + } + + private void systemCleanupAfterRun () throws InterruptedException { + System.gc(); + Thread.sleep(SLEEP_BETWEEN_RUNS); + System.gc(); + } + + private static class SampleObject { + private int intVal; + private float floatVal; + private Short shortVal; + private long[] longArr; + private double[] dblArr; + private String str; + + public SampleObject () { + } + + SampleObject (int intVal, float floatVal, Short shortVal, long[] longArr, double[] dblArr, String str) { + this.intVal = intVal; + this.floatVal = floatVal; + this.shortVal = shortVal; + this.longArr = longArr; + this.dblArr = dblArr; + this.str = str; + } + + /** {@inheritDoc} */ + @Override + public boolean equals (Object other) { + if (this == other) return true; + + if (other == null || getClass() != other.getClass()) return false; + + SampleObject obj = (SampleObject)other; + + return intVal == obj.intVal && floatVal == obj.floatVal && shortVal.equals(obj.shortVal) + && Arrays.equals(dblArr, obj.dblArr) && Arrays.equals(longArr, obj.longArr) && (str == null ? obj.str == null : str.equals(obj.str)); + } + } + + +} diff --git a/test/com/esotericsoftware/kryo/pool/KryoPoolTest.java b/test/com/esotericsoftware/kryo/pool/KryoPoolTest.java new file mode 100644 index 000000000..607d117e1 --- /dev/null +++ b/test/com/esotericsoftware/kryo/pool/KryoPoolTest.java @@ -0,0 +1,112 @@ +package com.esotericsoftware.kryo.pool; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.Arrays; + +import org.junit.Before; +import org.junit.Test; + +import com.esotericsoftware.kryo.Kryo; + +public class KryoPoolTest { + + private KryoFactory factory; + private KryoPool pool; + + @Before + public void beforeMethod() { + factory = new KryoFactory() { + @Override + public Kryo create () { + Kryo kryo = new Kryo(); + // configure kryo + return kryo; + } + }; + pool = new KryoPool(factory); + } + + @Test + public void getShouldReturnAvailableInstance() { + Kryo kryo = pool.borrow(); + pool.release(kryo); + assertTrue(kryo == pool.borrow()); + } + + @Test + public void releaseShouldAddKryoToPool() { + assertEquals(0, pool.size()); + Kryo kryo = pool.borrow(); + pool.release(kryo); + assertEquals(1, pool.size()); + } + + @Test + public void testSize() { + assertEquals(0, pool.size()); + Kryo kryo1 = pool.borrow(); + assertEquals(0, pool.size()); + Kryo kryo2 = pool.borrow(); + assertFalse(kryo1 == kryo2); + pool.release(kryo1); + assertEquals(1, pool.size()); + pool.release(kryo2); + assertEquals(2, pool.size()); + } + + @Test + public void runWithKryoShouldReturnResult() { + String value = pool.run(new KryoCallback() { + @Override + public String execute(Kryo kryo) { + return "foo"; + } + }); + assertEquals("foo", value); + } + + @Test + public void runWithKryoShouldAddKryoToPool() { + assertEquals(0, pool.size()); + pool.run(new KryoCallback() { + @Override + public String execute(Kryo kryo) { + return null; + } + }); + assertEquals(1, pool.size()); + } + + @Test + public void runWithKryoShouldAddKryoToPoolOnException() { + assertEquals(0, pool.size()); + try { + pool.run(new KryoCallback() { + @Override + public String execute(Kryo kryo) { + throw new RuntimeException(); + } + }); + fail("Exception should be rethrown."); + } catch(RuntimeException e) { + // expected + } + assertEquals(1, pool.size()); + } + + @Test(expected = IllegalArgumentException.class) + public void runWithKryoShouldRethrowException() { + String value = pool.run(new KryoCallback() { + @Override + public String execute(Kryo kryo) { + throw new IllegalArgumentException(); + } + }); + } + +}