From 99b9072ad12329153202313256acc451a21cb928 Mon Sep 17 00:00:00 2001 From: Michael L Heuer Date: Wed, 3 Oct 2018 10:41:18 -0500 Subject: [PATCH 1/2] Demonstrate errors reported in issue #27. --- src/test/java/org/disq_bio/disq/HtsjdkReadsRddTest.java | 3 +++ src/test/java/org/disq_bio/disq/HtsjdkVariantsRddTest.java | 3 +++ 2 files changed, 6 insertions(+) diff --git a/src/test/java/org/disq_bio/disq/HtsjdkReadsRddTest.java b/src/test/java/org/disq_bio/disq/HtsjdkReadsRddTest.java index 9c15fc0..f518e6e 100644 --- a/src/test/java/org/disq_bio/disq/HtsjdkReadsRddTest.java +++ b/src/test/java/org/disq_bio/disq/HtsjdkReadsRddTest.java @@ -108,6 +108,9 @@ public void testReadAndWrite( HtsjdkReadsRdd htsjdkReadsRdd = htsjdkReadsRddStorage.read(inputPath); + // reduce to realize the RDD of reads + Assert.assertNotNull(htsjdkReadsRdd.getReads().first()); + // read the file using htsjdk to get expected number of reads, then count the number in the RDD int expectedCount = AnySamTestUtil.countReads(inputPath, refPath); Assert.assertEquals(expectedCount, htsjdkReadsRdd.getReads().count()); diff --git a/src/test/java/org/disq_bio/disq/HtsjdkVariantsRddTest.java b/src/test/java/org/disq_bio/disq/HtsjdkVariantsRddTest.java index 61dc871..3ef0e0c 100644 --- a/src/test/java/org/disq_bio/disq/HtsjdkVariantsRddTest.java +++ b/src/test/java/org/disq_bio/disq/HtsjdkVariantsRddTest.java @@ -71,6 +71,9 @@ public void testReadAndWrite( HtsjdkVariantsRdd htsjdkVariantsRdd = htsjdkVariantsRddStorage.read(inputPath); + // reduce to realize the RDD of variants + Assert.assertNotNull(htsjdkVariantsRdd.getVariants().first()); + // read the file using htsjdk to get expected number of reads, then count the number in the RDD int expectedCount = countVariants(inputPath); Assert.assertEquals(expectedCount, htsjdkVariantsRdd.getVariants().count()); From 6adb60efda74a17c36050225a41bf1a7a3389af3 Mon Sep 17 00:00:00 2001 From: Tom White Date: Thu, 18 Oct 2018 08:31:41 +0100 Subject: [PATCH 2/2] More kryo registrations --- .../org/disq_bio/disq/serializer/DisqKryoRegistrator.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/disq_bio/disq/serializer/DisqKryoRegistrator.java b/src/main/java/org/disq_bio/disq/serializer/DisqKryoRegistrator.java index fdfd3b5..ebd6de1 100644 --- a/src/main/java/org/disq_bio/disq/serializer/DisqKryoRegistrator.java +++ b/src/main/java/org/disq_bio/disq/serializer/DisqKryoRegistrator.java @@ -28,8 +28,10 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.serializers.JavaSerializer; import de.javakaffee.kryoserializers.CollectionsEmptyListSerializer; import de.javakaffee.kryoserializers.CollectionsSingletonListSerializer; +import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer; import org.apache.spark.serializer.KryoRegistrator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +48,7 @@ public void registerClasses(final Kryo kryo) { // htsjdk.samtools kryo.register(htsjdk.samtools.AlignmentBlock.class); + kryo.register(htsjdk.samtools.BAMRecord.class); kryo.register(htsjdk.samtools.Chunk.class); kryo.register(htsjdk.samtools.Cigar.class); kryo.register(htsjdk.samtools.CigarElement.class); @@ -79,7 +82,8 @@ public void registerClasses(final Kryo kryo) { kryo.register(htsjdk.variant.variantcontext.CommonInfo.class); kryo.register(htsjdk.variant.variantcontext.FastGenotype.class); kryo.register(htsjdk.variant.variantcontext.GenotypeType.class); - kryo.register(htsjdk.variant.variantcontext.LazyGenotypesContext.class); + // Use JavaSerializer for LazyGenotypesContext to handle transient fields correctly + kryo.register(htsjdk.variant.variantcontext.LazyGenotypesContext.class, new JavaSerializer()); kryo.register(htsjdk.variant.variantcontext.VariantContext.class); kryo.register(htsjdk.variant.variantcontext.VariantContext.Type.class); @@ -116,6 +120,8 @@ public void registerClasses(final Kryo kryo) { kryo.register(java.util.LinkedHashSet.class); kryo.register(java.util.TreeSet.class); + UnmodifiableCollectionsSerializer.registerSerializers(kryo); + // org.apache.spark.internal.io kryo.register(org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage.class);