Skip to content

Commit

Permalink
Merge pull request #191 from rodrigo-molina/support-snappy
Browse files Browse the repository at this point in the history
Support snappy
  • Loading branch information
lhns authored Jan 25, 2025
2 parents d7c4085 + 8c44ac2 commit 62af9e4
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ libraryDependencies += "de.lhns" %% "fs2-compress-zstd" % "2.2.1"
libraryDependencies += "de.lhns" %% "fs2-compress-brotli" % "2.2.1"
libraryDependencies += "de.lhns" %% "fs2-compress-brotli4j" % "2.2.1"
libraryDependencies += "de.lhns" %% "fs2-compress-lz4" % "2.2.1"
libraryDependencies += "de.lhns" %% "fs2-compress-snappy" % "2.2.1"
```

## Concepts
Expand Down
15 changes: 15 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ val V = new {
val logbackClassic = "1.5.16"
val lz4 = "1.8.0"
val munitCatsEffect = "2.0.0"
val snappy = "1.1.10.7"
val zip4j = "2.11.5"
val zstdJni = "1.5.6-9"
}
Expand Down Expand Up @@ -86,6 +87,7 @@ lazy val root: Project =
.aggregate(brotli.projectRefs: _*)
.aggregate(brotli4j.projectRefs: _*)
.aggregate(lz4.projectRefs: _*)
.aggregate(snappy.projectRefs: _*)

lazy val core = projectMatrix
.in(file("core"))
Expand Down Expand Up @@ -215,3 +217,16 @@ lazy val lz4 = projectMatrix
)
)
.jvmPlatform(scalaVersions)

lazy val snappy = projectMatrix
.in(file("snappy"))
.dependsOn(core % "compile->compile;test->test")
.settings(commonSettings)
.settings(
name := "fs2-compress-snappy",
libraryDependencies ++= Seq(
"co.fs2" %%% "fs2-io" % V.fs2,
"org.xerial.snappy" % "snappy-java" % V.snappy
)
)
.jvmPlatform(scalaVersions)
114 changes: 114 additions & 0 deletions snappy/src/main/scala/de/lhns/fs2/compress/SnappyCompressor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package de.lhns.fs2.compress

import cats.effect.Async
import fs2.Pipe
import fs2.io._
import org.xerial.snappy.{
SnappyFramedInputStream,
SnappyFramedOutputStream,
SnappyHadoopCompatibleOutputStream,
SnappyInputStream,
SnappyOutputStream
}

import java.io.{BufferedInputStream, InputStream, OutputStream}

class SnappyCompressor[F[_]: Async] private (chunkSize: Int, mode: SnappyCompressor.WriteMode) extends Compressor[F] {
override def compress: Pipe[F, Byte, Byte] = { stream =>
readOutputStream[F](chunkSize) { outputStream =>
stream
.through(writeOutputStream(Async[F].blocking[OutputStream] {
mode.fromOutputStream(outputStream)
}))
.compile
.drain
}
}
}

object SnappyCompressor {
sealed trait WriteMode
object WriteMode {
// https://github.com/xerial/snappy-java/blob/ec23d7c611563bedce536ca4d02ebdb9a690ea91/src/main/java/org/xerial/snappy/SnappyOutputStream.java#L64
private val DefaultBasicBlockSize = 32 * 1024
private val DefaultHadoopBlockSize = DefaultBasicBlockSize

/** See
* [[https://github.com/xerial/snappy-java/blob/ec23d7c611563bedce536ca4d02ebdb9a690ea91/src/main/java/org/xerial/snappy/SnappyOutputStream.java#L59]]
*/
final case class Basic(blockSize: Int = DefaultBasicBlockSize) extends WriteMode

/** See
* [[https://github.com/xerial/snappy-java/blob/ec23d7c611563bedce536ca4d02ebdb9a690ea91/src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java#L34]]
*/
final case class Framed(
blockSize: Int = SnappyFramedOutputStream.DEFAULT_BLOCK_SIZE,
minCompressionRatio: Double = SnappyFramedOutputStream.DEFAULT_MIN_COMPRESSION_RATIO
) extends WriteMode

/** Compression for use with Hadoop libraries: it does not emit a file header but write out the current block size
* as a preamble to each block
*/
final case class HadoopCompatible(blockSize: Int = DefaultHadoopBlockSize) extends WriteMode
}

private implicit class WriteModeOps(mode: WriteMode) {
def fromOutputStream(o: OutputStream): OutputStream = mode match {
case r: WriteMode.Basic => new SnappyOutputStream(o, r.blockSize)
case f: WriteMode.Framed => new SnappyFramedOutputStream(o, f.blockSize, f.minCompressionRatio)
case h: WriteMode.HadoopCompatible => new SnappyHadoopCompatibleOutputStream(o, h.blockSize)
}
}

def apply[F[_]](implicit instance: SnappyCompressor[F]): SnappyCompressor[F] = instance

def make[F[_]: Async](
chunkSize: Int = Defaults.defaultChunkSize,
mode: WriteMode
): SnappyCompressor[F] = new SnappyCompressor(chunkSize, mode)
}

class SnappyDecompressor[F[_]: Async] private (chunkSize: Int, decompressionType: SnappyDecompressor.ReadMode)
extends Decompressor[F] {
override def decompress: Pipe[F, Byte, Byte] = { stream =>
stream
.through(toInputStream[F])
.map(new BufferedInputStream(_, chunkSize))
.flatMap { inputStream =>
readInputStream(
Async[F].blocking(
decompressionType.fromInputStream(inputStream)
),
chunkSize
)
}
}
}

object SnappyDecompressor {
sealed trait ReadMode
object ReadMode {

/** See
* [[https://github.com/xerial/snappy-java/blob/ec23d7c611563bedce536ca4d02ebdb9a690ea91/src/main/java/org/xerial/snappy/SnappyInputStream.java#L36]]
*/
final case class Basic(maxChunkSize: Int = SnappyInputStream.MAX_CHUNK_SIZE) extends ReadMode

/** See
* [[https://github.com/xerial/snappy-java/blob/ec23d7c611563bedce536ca4d02ebdb9a690ea91/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java#L39]]
*/
final case class Framed(verifyChecksums: Boolean = true) extends ReadMode
}

private implicit class ReadModeOps(mode: ReadMode) {
def fromInputStream(i: InputStream): InputStream = mode match {
case r: ReadMode.Basic => new SnappyInputStream(i, r.maxChunkSize)
case f: ReadMode.Framed => new SnappyFramedInputStream(i, f.verifyChecksums)
}
}

def apply[F[_]](implicit instance: SnappyDecompressor[F]): SnappyDecompressor[F] = instance

def make[F[_]: Async](chunkSize: Int = Defaults.defaultChunkSize, mode: ReadMode): SnappyDecompressor[F] =
new SnappyDecompressor(chunkSize, mode)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package de.lhns.fs2.compress

import cats.effect.IO
import cats.effect.std.Random
import fs2.{Chunk, Stream}
import munit.CatsEffectSuite

import java.util

class SnappyRoundTripSuite extends CatsEffectSuite {

test("snappy unframed round trip") {
val readMode = SnappyDecompressor.ReadMode.Basic()
val writeMode = SnappyCompressor.WriteMode.Basic()

testRoundTrip(readMode, writeMode)
}

test("snappy framed round trip") {
val readMode = SnappyDecompressor.ReadMode.Framed()
val writeMode = SnappyCompressor.WriteMode.Framed()

testRoundTrip(readMode, writeMode)
}

private def testRoundTrip(readMode: SnappyDecompressor.ReadMode, writeMode: SnappyCompressor.WriteMode): IO[Unit] = {
implicit val snappyCompressor: SnappyCompressor[IO] = SnappyCompressor.make(mode = writeMode)
implicit val snappyDecompressor: SnappyDecompressor[IO] = SnappyDecompressor.make(mode = readMode)
for {
random <- Random.scalaUtilRandom[IO]
expected <- random.nextBytes(1024 * 1024)
obtained <- Stream
.chunk(Chunk.array(expected))
.through(SnappyCompressor[IO].compress)
.through(SnappyDecompressor[IO].decompress)
.chunkAll
.compile
.lastOrError
.map(_.toArray)
_ = assert(util.Arrays.equals(expected, obtained))
} yield ()
}
}

0 comments on commit 62af9e4

Please sign in to comment.