diff --git a/src/main/java/org/disq_bio/disq/serializer/DisqKryoRegistrator.java b/src/main/java/org/disq_bio/disq/serializer/DisqKryoRegistrator.java index fdfd3b5..ebd6de1 100644 --- a/src/main/java/org/disq_bio/disq/serializer/DisqKryoRegistrator.java +++ b/src/main/java/org/disq_bio/disq/serializer/DisqKryoRegistrator.java @@ -28,8 +28,10 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.serializers.JavaSerializer; import de.javakaffee.kryoserializers.CollectionsEmptyListSerializer; import de.javakaffee.kryoserializers.CollectionsSingletonListSerializer; +import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer; import org.apache.spark.serializer.KryoRegistrator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +48,7 @@ public void registerClasses(final Kryo kryo) { // htsjdk.samtools kryo.register(htsjdk.samtools.AlignmentBlock.class); + kryo.register(htsjdk.samtools.BAMRecord.class); kryo.register(htsjdk.samtools.Chunk.class); kryo.register(htsjdk.samtools.Cigar.class); kryo.register(htsjdk.samtools.CigarElement.class); @@ -79,7 +82,8 @@ public void registerClasses(final Kryo kryo) { kryo.register(htsjdk.variant.variantcontext.CommonInfo.class); kryo.register(htsjdk.variant.variantcontext.FastGenotype.class); kryo.register(htsjdk.variant.variantcontext.GenotypeType.class); - kryo.register(htsjdk.variant.variantcontext.LazyGenotypesContext.class); + // Use JavaSerializer for LazyGenotypesContext to handle transient fields correctly + kryo.register(htsjdk.variant.variantcontext.LazyGenotypesContext.class, new JavaSerializer()); kryo.register(htsjdk.variant.variantcontext.VariantContext.class); kryo.register(htsjdk.variant.variantcontext.VariantContext.Type.class); @@ -116,6 +120,8 @@ public void registerClasses(final Kryo kryo) { kryo.register(java.util.LinkedHashSet.class); kryo.register(java.util.TreeSet.class); + UnmodifiableCollectionsSerializer.registerSerializers(kryo); + // org.apache.spark.internal.io kryo.register(org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage.class); diff --git a/src/test/java/org/disq_bio/disq/HtsjdkReadsRddTest.java b/src/test/java/org/disq_bio/disq/HtsjdkReadsRddTest.java index 9c15fc0..f518e6e 100644 --- a/src/test/java/org/disq_bio/disq/HtsjdkReadsRddTest.java +++ b/src/test/java/org/disq_bio/disq/HtsjdkReadsRddTest.java @@ -108,6 +108,9 @@ public void testReadAndWrite( HtsjdkReadsRdd htsjdkReadsRdd = htsjdkReadsRddStorage.read(inputPath); + // reduce to realize the RDD of reads + Assert.assertNotNull(htsjdkReadsRdd.getReads().first()); + // read the file using htsjdk to get expected number of reads, then count the number in the RDD int expectedCount = AnySamTestUtil.countReads(inputPath, refPath); Assert.assertEquals(expectedCount, htsjdkReadsRdd.getReads().count()); diff --git a/src/test/java/org/disq_bio/disq/HtsjdkVariantsRddTest.java b/src/test/java/org/disq_bio/disq/HtsjdkVariantsRddTest.java index 61dc871..3ef0e0c 100644 --- a/src/test/java/org/disq_bio/disq/HtsjdkVariantsRddTest.java +++ b/src/test/java/org/disq_bio/disq/HtsjdkVariantsRddTest.java @@ -71,6 +71,9 @@ public void testReadAndWrite( HtsjdkVariantsRdd htsjdkVariantsRdd = htsjdkVariantsRddStorage.read(inputPath); + // reduce to realize the RDD of variants + Assert.assertNotNull(htsjdkVariantsRdd.getVariants().first()); + // read the file using htsjdk to get expected number of reads, then count the number in the RDD int expectedCount = countVariants(inputPath); Assert.assertEquals(expectedCount, htsjdkVariantsRdd.getVariants().count());