diff --git a/README.md b/README.md index 5842dac..11e6711 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ libraryDependencies += "de.lhns" %% "fs2-compress-zstd" % "2.2.1" libraryDependencies += "de.lhns" %% "fs2-compress-brotli" % "2.2.1" libraryDependencies += "de.lhns" %% "fs2-compress-brotli4j" % "2.2.1" libraryDependencies += "de.lhns" %% "fs2-compress-lz4" % "2.2.1" +libraryDependencies += "de.lhns" %% "fs2-compress-snappy" % "2.2.1" ``` ## Concepts diff --git a/build.sbt b/build.sbt index efbad2e..8136e72 100644 --- a/build.sbt +++ b/build.sbt @@ -15,6 +15,7 @@ val V = new { val logbackClassic = "1.5.16" val lz4 = "1.8.0" val munitCatsEffect = "2.0.0" + val snappy = "1.1.10.7" val zip4j = "2.11.5" val zstdJni = "1.5.6-9" } @@ -86,6 +87,7 @@ lazy val root: Project = .aggregate(brotli.projectRefs: _*) .aggregate(brotli4j.projectRefs: _*) .aggregate(lz4.projectRefs: _*) + .aggregate(snappy.projectRefs: _*) lazy val core = projectMatrix .in(file("core")) @@ -215,3 +217,16 @@ lazy val lz4 = projectMatrix ) ) .jvmPlatform(scalaVersions) + +lazy val snappy = projectMatrix + .in(file("snappy")) + .dependsOn(core % "compile->compile;test->test") + .settings(commonSettings) + .settings( + name := "fs2-compress-snappy", + libraryDependencies ++= Seq( + "co.fs2" %%% "fs2-io" % V.fs2, + "org.xerial.snappy" % "snappy-java" % V.snappy + ) + ) + .jvmPlatform(scalaVersions) diff --git a/snappy/src/main/scala/de/lhns/fs2/compress/SnappyCompressor.scala b/snappy/src/main/scala/de/lhns/fs2/compress/SnappyCompressor.scala new file mode 100644 index 0000000..5a7d1b4 --- /dev/null +++ b/snappy/src/main/scala/de/lhns/fs2/compress/SnappyCompressor.scala @@ -0,0 +1,114 @@ +package de.lhns.fs2.compress + +import cats.effect.Async +import fs2.Pipe +import fs2.io._ +import org.xerial.snappy.{ + SnappyFramedInputStream, + SnappyFramedOutputStream, + SnappyHadoopCompatibleOutputStream, + SnappyInputStream, + SnappyOutputStream +} + +import java.io.{BufferedInputStream, InputStream, OutputStream} + +class SnappyCompressor[F[_]: Async] private (chunkSize: Int, mode: SnappyCompressor.WriteMode) extends Compressor[F] { + override def compress: Pipe[F, Byte, Byte] = { stream => + readOutputStream[F](chunkSize) { outputStream => + stream + .through(writeOutputStream(Async[F].blocking[OutputStream] { + mode.fromOutputStream(outputStream) + })) + .compile + .drain + } + } +} + +object SnappyCompressor { + sealed trait WriteMode + object WriteMode { + // https://github.com/xerial/snappy-java/blob/ec23d7c611563bedce536ca4d02ebdb9a690ea91/src/main/java/org/xerial/snappy/SnappyOutputStream.java#L64 + private val DefaultBasicBlockSize = 32 * 1024 + private val DefaultHadoopBlockSize = DefaultBasicBlockSize + + /** See + * [[https://github.com/xerial/snappy-java/blob/ec23d7c611563bedce536ca4d02ebdb9a690ea91/src/main/java/org/xerial/snappy/SnappyOutputStream.java#L59]] + */ + final case class Basic(blockSize: Int = DefaultBasicBlockSize) extends WriteMode + + /** See + * [[https://github.com/xerial/snappy-java/blob/ec23d7c611563bedce536ca4d02ebdb9a690ea91/src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java#L34]] + */ + final case class Framed( + blockSize: Int = SnappyFramedOutputStream.DEFAULT_BLOCK_SIZE, + minCompressionRatio: Double = SnappyFramedOutputStream.DEFAULT_MIN_COMPRESSION_RATIO + ) extends WriteMode + + /** Compression for use with Hadoop libraries: it does not emit a file header but write out the current block size + * as a preamble to each block + */ + final case class HadoopCompatible(blockSize: Int = DefaultHadoopBlockSize) extends WriteMode + } + + private implicit class WriteModeOps(mode: WriteMode) { + def fromOutputStream(o: OutputStream): OutputStream = mode match { + case r: WriteMode.Basic => new SnappyOutputStream(o, r.blockSize) + case f: WriteMode.Framed => new SnappyFramedOutputStream(o, f.blockSize, f.minCompressionRatio) + case h: WriteMode.HadoopCompatible => new SnappyHadoopCompatibleOutputStream(o, h.blockSize) + } + } + + def apply[F[_]](implicit instance: SnappyCompressor[F]): SnappyCompressor[F] = instance + + def make[F[_]: Async]( + chunkSize: Int = Defaults.defaultChunkSize, + mode: WriteMode + ): SnappyCompressor[F] = new SnappyCompressor(chunkSize, mode) +} + +class SnappyDecompressor[F[_]: Async] private (chunkSize: Int, decompressionType: SnappyDecompressor.ReadMode) + extends Decompressor[F] { + override def decompress: Pipe[F, Byte, Byte] = { stream => + stream + .through(toInputStream[F]) + .map(new BufferedInputStream(_, chunkSize)) + .flatMap { inputStream => + readInputStream( + Async[F].blocking( + decompressionType.fromInputStream(inputStream) + ), + chunkSize + ) + } + } +} + +object SnappyDecompressor { + sealed trait ReadMode + object ReadMode { + + /** See + * [[https://github.com/xerial/snappy-java/blob/ec23d7c611563bedce536ca4d02ebdb9a690ea91/src/main/java/org/xerial/snappy/SnappyInputStream.java#L36]] + */ + final case class Basic(maxChunkSize: Int = SnappyInputStream.MAX_CHUNK_SIZE) extends ReadMode + + /** See + * [[https://github.com/xerial/snappy-java/blob/ec23d7c611563bedce536ca4d02ebdb9a690ea91/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java#L39]] + */ + final case class Framed(verifyChecksums: Boolean = true) extends ReadMode + } + + private implicit class ReadModeOps(mode: ReadMode) { + def fromInputStream(i: InputStream): InputStream = mode match { + case r: ReadMode.Basic => new SnappyInputStream(i, r.maxChunkSize) + case f: ReadMode.Framed => new SnappyFramedInputStream(i, f.verifyChecksums) + } + } + + def apply[F[_]](implicit instance: SnappyDecompressor[F]): SnappyDecompressor[F] = instance + + def make[F[_]: Async](chunkSize: Int = Defaults.defaultChunkSize, mode: ReadMode): SnappyDecompressor[F] = + new SnappyDecompressor(chunkSize, mode) +} diff --git a/snappy/src/test/scala/de/lhns/fs2/compress/SnappyRoundTripSuite.scala b/snappy/src/test/scala/de/lhns/fs2/compress/SnappyRoundTripSuite.scala new file mode 100644 index 0000000..41e0e83 --- /dev/null +++ b/snappy/src/test/scala/de/lhns/fs2/compress/SnappyRoundTripSuite.scala @@ -0,0 +1,43 @@ +package de.lhns.fs2.compress + +import cats.effect.IO +import cats.effect.std.Random +import fs2.{Chunk, Stream} +import munit.CatsEffectSuite + +import java.util + +class SnappyRoundTripSuite extends CatsEffectSuite { + + test("snappy unframed round trip") { + val readMode = SnappyDecompressor.ReadMode.Basic() + val writeMode = SnappyCompressor.WriteMode.Basic() + + testRoundTrip(readMode, writeMode) + } + + test("snappy framed round trip") { + val readMode = SnappyDecompressor.ReadMode.Framed() + val writeMode = SnappyCompressor.WriteMode.Framed() + + testRoundTrip(readMode, writeMode) + } + + private def testRoundTrip(readMode: SnappyDecompressor.ReadMode, writeMode: SnappyCompressor.WriteMode): IO[Unit] = { + implicit val snappyCompressor: SnappyCompressor[IO] = SnappyCompressor.make(mode = writeMode) + implicit val snappyDecompressor: SnappyDecompressor[IO] = SnappyDecompressor.make(mode = readMode) + for { + random <- Random.scalaUtilRandom[IO] + expected <- random.nextBytes(1024 * 1024) + obtained <- Stream + .chunk(Chunk.array(expected)) + .through(SnappyCompressor[IO].compress) + .through(SnappyDecompressor[IO].decompress) + .chunkAll + .compile + .lastOrError + .map(_.toArray) + _ = assert(util.Arrays.equals(expected, obtained)) + } yield () + } +}