From 65348f5f34d70b6fda82a21e53a55eb13b5024c7 Mon Sep 17 00:00:00 2001 From: Alex Dubov Date: Mon, 13 Nov 2023 17:50:15 +1100 Subject: [PATCH 1/4] CompressingReader: compressed data reader stream (#210) In Go, `http.Request` requires an `io.ReadCloser` for an upload body. To facilitate compression of data being uploaded we want to wrap a source reader (such an `io.File`) with a proposed `lz4.CompressingReader`, whereupon the said reader can be set directly into `http.Request` Body field. This benefits not only the plain `http.Request`, but also the various network file storage APIs which build on top of Go's http client. Signed-off-by: Alex Dubov --- compressing_reader.go | 210 +++++++++++++++++++++++++++++++++++++ compressing_reader_test.go | 79 ++++++++++++++ options.go | 28 +++++ 3 files changed, 317 insertions(+) create mode 100644 compressing_reader.go create mode 100644 compressing_reader_test.go diff --git a/compressing_reader.go b/compressing_reader.go new file mode 100644 index 00000000..9d1d81a6 --- /dev/null +++ b/compressing_reader.go @@ -0,0 +1,210 @@ +package lz4 + +import ( + "errors" + "io" + + "github.com/pierrec/lz4/v4/internal/lz4block" + "github.com/pierrec/lz4/v4/internal/lz4errors" + "github.com/pierrec/lz4/v4/internal/lz4stream" +) + +type crState int + +const ( + crStateInitial crState = iota + crStateReading + crStateFlushing + crStateDone +) + +type CompressingReader struct { + state crState + src io.ReadCloser // source reader + level lz4block.CompressionLevel // how hard to try + frame *lz4stream.Frame // frame being built + in []byte + out ovWriter + handler func(int) +} + +// NewCompressingReader creates a reader which reads compressed data from +// raw stream. This makes it a logical opposite of a normal lz4.Reader. +// We require an io.ReadCloser as an underlying source for compatibility +// with Go's http.Request. +func NewCompressingReader(src io.ReadCloser) *CompressingReader { + zrd := &CompressingReader { + frame: lz4stream.NewFrame(), + } + + _ = zrd.Apply(DefaultBlockSizeOption, DefaultChecksumOption, defaultOnBlockDone) + zrd.Reset(src) + + return zrd +} + +// Source exposes the underlying source stream for introspection and control. +func (zrd *CompressingReader) Source() io.ReadCloser { + return zrd.src +} + +// Close simply invokes the underlying stream Close method. This method is +// provided for the benefit of Go http client/server, which relies on Close +// for goroutine termination. +func (zrd *CompressingReader) Close() error { + return zrd.src.Close() +} + +// Apply applies useful options to the lz4 encoder. +func (zrd *CompressingReader) Apply(options ...Option) (err error) { + if zrd.state != crStateInitial { + return lz4errors.ErrOptionClosedOrError + } + + zrd.Reset(zrd.src) + + for _, o := range options { + if err = o(zrd); err != nil { + return + } + } + return +} + +func (*CompressingReader) private() {} + +func (zrd *CompressingReader) init() error { + zrd.frame.InitW(&zrd.out, 1, false) + size := zrd.frame.Descriptor.Flags.BlockSizeIndex() + zrd.in = size.Get() + return zrd.frame.Descriptor.Write(zrd.frame, &zrd.out) +} + +// Read allows reading of lz4 compressed data +func (zrd *CompressingReader) Read(p []byte) (n int, err error) { + defer func() { + if err != nil { + zrd.state = crStateDone + } + }() + + if !zrd.out.reset(p) { + return len(p), nil + } + + switch zrd.state { + case crStateInitial: + err = zrd.init() + if err != nil { + return + } + zrd.state = crStateReading + case crStateDone: + return 0, errors.New("This reader is done") + case crStateFlushing: + if zrd.out.dataPos > 0 { + n = zrd.out.dataPos + zrd.out.data = nil + zrd.out.dataPos = 0 + return + } else { + zrd.state = crStateDone + return 0, io.EOF + } + } + + for zrd.state == crStateReading { + block := zrd.frame.Blocks.Block + + var rCount int + rCount, err = io.ReadFull(zrd.src, zrd.in) + switch err { + case nil: + err = block.Compress( + zrd.frame, zrd.in[ : rCount], zrd.level, + ).Write(zrd.frame, &zrd.out) + zrd.handler(len(block.Data)) + if err != nil { + return + } + + if zrd.out.dataPos == len(zrd.out.data) { + n = zrd.out.dataPos + zrd.out.dataPos = 0 + zrd.out.data = nil + return + } + case io.EOF, io.ErrUnexpectedEOF: // read may be partial + if rCount > 0 { + err = block.Compress( + zrd.frame, zrd.in[ : rCount], zrd.level, + ).Write(zrd.frame, &zrd.out) + zrd.handler(len(block.Data)) + if err != nil { + return + } + } + + err = zrd.frame.CloseW(&zrd.out, 1) + if err != nil { + return + } + zrd.state = crStateFlushing + + n = zrd.out.dataPos + zrd.out.dataPos = 0 + zrd.out.data = nil + return + default: + return + } + } + + err = lz4errors.ErrInternalUnhandledState + return +} + +// Reset makes the stream usable again; mostly handy to reuse lz4 encoder +// instances. +func (zrd *CompressingReader) Reset(src io.ReadCloser) { + zrd.frame.Reset(1) + zrd.state = crStateInitial + zrd.src = src +} + +type ovWriter struct { + data []byte + ov []byte + dataPos int + ovPos int +} + +func (wr *ovWriter) Write(p []byte) (n int, err error) { + count := copy(wr.data[wr.dataPos : ], p) + wr.dataPos += count + + if count < len(p) { + wr.ov = append(wr.ov, p[count : ]...) + } + + return len(p), nil +} + +func (wr *ovWriter) reset(out []byte) bool { + ovRem := len(wr.ov) - wr.ovPos + + if ovRem >= len(out) { + wr.ovPos += copy(out, wr.ov[wr.ovPos : ]) + return false + } + + if ovRem > 0 { + copy(out, wr.ov[wr.ovPos : ]) + wr.ov = wr.ov[ : 0] + wr.ovPos = 0 + wr.dataPos = ovRem + } + + wr.data = out + return true +} diff --git a/compressing_reader_test.go b/compressing_reader_test.go new file mode 100644 index 00000000..bc34da5b --- /dev/null +++ b/compressing_reader_test.go @@ -0,0 +1,79 @@ +package lz4_test + +import ( + "bytes" + "fmt" + "io" + "os" + "strings" + "testing" + + "github.com/pierrec/lz4/v4" +) + +func TestCompressingReader(t *testing.T) { + goldenFiles := []string{ + "testdata/e.txt", + "testdata/gettysburg.txt", + "testdata/Mark.Twain-Tom.Sawyer.txt", + "testdata/Mark.Twain-Tom.Sawyer_long.txt", + "testdata/pg1661.txt", + "testdata/pi.txt", + "testdata/random.data", + "testdata/repeat.txt", + "testdata/issue102.data", + } + + for _, fname := range goldenFiles { + for _, option := range []lz4.Option{ + lz4.BlockChecksumOption(true), + lz4.SizeOption(123), + } { + label := fmt.Sprintf("%s/%s", fname, option) + t.Run(label, func(t *testing.T) { + fname := fname + option := option + t.Parallel() + + raw, err := os.ReadFile(fname) + if err != nil { + t.Fatal(err) + } + r := io.NopCloser(bytes.NewReader(raw)) + + // Compress. + zcomp := lz4.NewCompressingReader(r) + if err := zcomp.Apply(option, lz4.CompressionLevelOption(lz4.Level1)); err != nil { + t.Fatal(err) + } + + zout, err := io.ReadAll(zcomp) + if err != nil { + t.Fatal(err) + } + + // Uncompress. + zr := lz4.NewReader(bytes.NewReader(zout)) + out, err := io.ReadAll(zr) + if err != nil { + t.Fatal(err) + } + + // The uncompressed data must be the same as the initial input. + if got, want := len(out), len(raw); got != want { + t.Errorf("invalid sizes: got %d; want %d", got, want) + } + + if !bytes.Equal(out, raw) { + t.Fatal("uncompressed data does not match original") + } + + if strings.Contains(option.String(), "SizeOption") { + if got, want := zr.Size(), 123; got != want { + t.Errorf("invalid sizes: got %d; want %d", got, want) + } + } + }) + } + } +} diff --git a/options.go b/options.go index 46a87380..57a44e76 100644 --- a/options.go +++ b/options.go @@ -57,6 +57,13 @@ func BlockSizeOption(size BlockSize) Option { } w.frame.Descriptor.Flags.BlockSizeIndexSet(lz4block.Index(size)) return nil + case *CompressingReader: + size := uint32(size) + if !lz4block.IsValid(size) { + return fmt.Errorf("%w: %d", lz4errors.ErrOptionInvalidBlockSize, size) + } + w.frame.Descriptor.Flags.BlockSizeIndexSet(lz4block.Index(size)) + return nil } return lz4errors.ErrOptionNotApplicable } @@ -72,6 +79,9 @@ func BlockChecksumOption(flag bool) Option { case *Writer: w.frame.Descriptor.Flags.BlockChecksumSet(flag) return nil + case *CompressingReader: + w.frame.Descriptor.Flags.BlockChecksumSet(flag) + return nil } return lz4errors.ErrOptionNotApplicable } @@ -87,6 +97,9 @@ func ChecksumOption(flag bool) Option { case *Writer: w.frame.Descriptor.Flags.ContentChecksumSet(flag) return nil + case *CompressingReader: + w.frame.Descriptor.Flags.ContentChecksumSet(flag) + return nil } return lz4errors.ErrOptionNotApplicable } @@ -104,6 +117,10 @@ func SizeOption(size uint64) Option { w.frame.Descriptor.Flags.SizeSet(size > 0) w.frame.Descriptor.ContentSize = size return nil + case *CompressingReader: + w.frame.Descriptor.Flags.SizeSet(size > 0) + w.frame.Descriptor.ContentSize = size + return nil } return lz4errors.ErrOptionNotApplicable } @@ -162,6 +179,14 @@ func CompressionLevelOption(level CompressionLevel) Option { } w.level = lz4block.CompressionLevel(level) return nil + case *CompressingReader: + switch level { + case Fast, Level1, Level2, Level3, Level4, Level5, Level6, Level7, Level8, Level9: + default: + return fmt.Errorf("%w: %d", lz4errors.ErrOptionInvalidCompressionLevel, level) + } + w.level = lz4block.CompressionLevel(level) + return nil } return lz4errors.ErrOptionNotApplicable } @@ -186,6 +211,9 @@ func OnBlockDoneOption(handler func(size int)) Option { case *Reader: rw.handler = handler return nil + case *CompressingReader: + rw.handler = handler + return nil } return lz4errors.ErrOptionNotApplicable } From f2ece5b1696566fc0d204f3ceab7c9aa41b8c7ba Mon Sep 17 00:00:00 2001 From: Alex Dubov Date: Mon, 13 Nov 2023 18:01:49 +1100 Subject: [PATCH 2/4] CompressingReader: make sure to clear out buffer Out buffer may have some stale overflow data on Reset. Signed-off-by: Alex Dubov --- compressing_reader.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/compressing_reader.go b/compressing_reader.go index 9d1d81a6..78125dd1 100644 --- a/compressing_reader.go +++ b/compressing_reader.go @@ -170,6 +170,7 @@ func (zrd *CompressingReader) Reset(src io.ReadCloser) { zrd.frame.Reset(1) zrd.state = crStateInitial zrd.src = src + zrd.out.clear() } type ovWriter struct { @@ -208,3 +209,10 @@ func (wr *ovWriter) reset(out []byte) bool { wr.data = out return true } + +func (wr *ovWriter) clear() { + wr.data = nil + wr.dataPos = 0 + wr.ov = wr.ov[ : 0] + wr.ovPos = 0 +} From 4a80a2f8726c7b603b98707757eef952377a16a9 Mon Sep 17 00:00:00 2001 From: Alex Dubov Date: Mon, 13 Nov 2023 18:16:21 +1100 Subject: [PATCH 3/4] CompressingReader: account for possible out buffer state It is possible for out buffer to have no overflow data and yet the read offset will be not zero (if byte slices passed by the reader just happened to align perfectly). Signed-off-by: Alex Dubov --- compressing_reader.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/compressing_reader.go b/compressing_reader.go index 78125dd1..9f44aa52 100644 --- a/compressing_reader.go +++ b/compressing_reader.go @@ -206,6 +206,11 @@ func (wr *ovWriter) reset(out []byte) bool { wr.dataPos = ovRem } + if wr.ovPos > 0 { + wr.ov = wr.ov[ : 0] + wr.ovPos = 0 + } + wr.data = out return true } From 76139896ff3591a8f18047bc676faefe45ec647c Mon Sep 17 00:00:00 2001 From: Alex Dubov Date: Fri, 8 Dec 2023 16:34:03 +1100 Subject: [PATCH 4/4] CompressingReader: support older Go versions Signed-off-by: Alex Dubov --- compressing_reader.go | 5 ++--- compressing_reader_test.go | 11 +++++------ 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/compressing_reader.go b/compressing_reader.go index 9f44aa52..8df0dc76 100644 --- a/compressing_reader.go +++ b/compressing_reader.go @@ -204,11 +204,10 @@ func (wr *ovWriter) reset(out []byte) bool { wr.ov = wr.ov[ : 0] wr.ovPos = 0 wr.dataPos = ovRem - } - - if wr.ovPos > 0 { + } else if wr.ovPos > 0 { wr.ov = wr.ov[ : 0] wr.ovPos = 0 + wr.dataPos = 0 } wr.data = out diff --git a/compressing_reader_test.go b/compressing_reader_test.go index bc34da5b..705ff4d0 100644 --- a/compressing_reader_test.go +++ b/compressing_reader_test.go @@ -3,8 +3,7 @@ package lz4_test import ( "bytes" "fmt" - "io" - "os" + "io/ioutil" "strings" "testing" @@ -35,11 +34,11 @@ func TestCompressingReader(t *testing.T) { option := option t.Parallel() - raw, err := os.ReadFile(fname) + raw, err := ioutil.ReadFile(fname) if err != nil { t.Fatal(err) } - r := io.NopCloser(bytes.NewReader(raw)) + r := ioutil.NopCloser(bytes.NewReader(raw)) // Compress. zcomp := lz4.NewCompressingReader(r) @@ -47,14 +46,14 @@ func TestCompressingReader(t *testing.T) { t.Fatal(err) } - zout, err := io.ReadAll(zcomp) + zout, err := ioutil.ReadAll(zcomp) if err != nil { t.Fatal(err) } // Uncompress. zr := lz4.NewReader(bytes.NewReader(zout)) - out, err := io.ReadAll(zr) + out, err := ioutil.ReadAll(zr) if err != nil { t.Fatal(err) }