Skip to content

Commit

Permalink
Merge pull request #225 from alldroll/byte_input
Browse files Browse the repository at this point in the history
Unification of bitmap unserialization by using auxiliary byteInput approach
  • Loading branch information
lemire authored Aug 31, 2019
2 parents d017a98 + 65f42a3 commit b5aa429
Show file tree
Hide file tree
Showing 12 changed files with 440 additions and 342 deletions.
161 changes: 161 additions & 0 deletions byte_input.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package roaring

import (
"encoding/binary"
"io"
)

type byteInput interface {
// next returns a slice containing the next n bytes from the buffer,
// advancing the buffer as if the bytes had been returned by Read.
next(n int) ([]byte, error)
// readUInt32 reads uint32 with LittleEndian order
readUInt32() (uint32, error)
// readUInt16 reads uint16 with LittleEndian order
readUInt16() (uint16, error)
// getReadBytes returns read bytes
getReadBytes() int64
// skipBytes skips exactly n bytes
skipBytes(n int) error
}

func newByteInputFromReader(reader io.Reader) byteInput {
return &byteInputAdapter{
r: reader,
readBytes: 0,
}
}

func newByteInput(buf []byte) byteInput {
return &byteBuffer{
buf: buf,
off: 0,
}
}

type byteBuffer struct {
buf []byte
off int
}

// next returns a slice containing the next n bytes from the reader
// If there are fewer bytes than the given n, io.ErrUnexpectedEOF will be returned
func (b *byteBuffer) next(n int) ([]byte, error) {
m := len(b.buf) - b.off

if n > m {
return nil, io.ErrUnexpectedEOF
}

data := b.buf[b.off : b.off+n]
b.off += n

return data, nil
}

// readUInt32 reads uint32 with LittleEndian order
func (b *byteBuffer) readUInt32() (uint32, error) {
if len(b.buf)-b.off < 4 {
return 0, io.ErrUnexpectedEOF
}

v := binary.LittleEndian.Uint32(b.buf[b.off:])
b.off += 4

return v, nil
}

// readUInt16 reads uint16 with LittleEndian order
func (b *byteBuffer) readUInt16() (uint16, error) {
if len(b.buf)-b.off < 2 {
return 0, io.ErrUnexpectedEOF
}

v := binary.LittleEndian.Uint16(b.buf[b.off:])
b.off += 2

return v, nil
}

// getReadBytes returns read bytes
func (b *byteBuffer) getReadBytes() int64 {
return int64(b.off)
}

// skipBytes skips exactly n bytes
func (b *byteBuffer) skipBytes(n int) error {
m := len(b.buf) - b.off

if n > m {
return io.ErrUnexpectedEOF
}

b.off += n

return nil
}

// reset resets the given buffer with a new byte slice
func (b *byteBuffer) reset(buf []byte) {
b.buf = buf
b.off = 0
}

type byteInputAdapter struct {
r io.Reader
readBytes int
}

// next returns a slice containing the next n bytes from the buffer,
// advancing the buffer as if the bytes had been returned by Read.
func (b *byteInputAdapter) next(n int) ([]byte, error) {
buf := make([]byte, n)
m, err := io.ReadAtLeast(b.r, buf, n)
b.readBytes += m

if err != nil {
return nil, err
}

return buf, nil
}

// readUInt32 reads uint32 with LittleEndian order
func (b *byteInputAdapter) readUInt32() (uint32, error) {
buf, err := b.next(4)

if err != nil {
return 0, err
}

return binary.LittleEndian.Uint32(buf), nil
}

// readUInt16 reads uint16 with LittleEndian order
func (b *byteInputAdapter) readUInt16() (uint16, error) {
buf, err := b.next(2)

if err != nil {
return 0, err
}

return binary.LittleEndian.Uint16(buf), nil
}

// getReadBytes returns read bytes
func (b *byteInputAdapter) getReadBytes() int64 {
return int64(b.readBytes)
}

// skipBytes skips exactly n bytes
func (b *byteInputAdapter) skipBytes(n int) error {
_, err := b.next(n)

return err
}

// reset resets the given buffer with a new stream
func (b *byteInputAdapter) reset(stream io.Reader) {
b.r = stream
b.readBytes = 0
}
66 changes: 66 additions & 0 deletions byte_input_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package roaring

import (
"bytes"
"testing"

. "github.com/smartystreets/goconvey/convey"
)

func TestByteInputFlow(t *testing.T) {
Convey("Test should be an error on empty data", t, func() {
buf := bytes.NewBuffer([]byte{})

instances := []byteInput{
newByteInput(buf.Bytes()),
newByteInputFromReader(buf),
}

for _, input := range instances {
n, err := input.readUInt16()
So(n, ShouldEqual, 0)
So(err, ShouldBeError)

p, err := input.readUInt32()
So(p, ShouldEqual, 0)
So(err, ShouldBeError)

b, err := input.next(10)
So(b, ShouldEqual, nil)
So(err, ShouldBeError)

err = input.skipBytes(10)
So(err, ShouldBeError)
}
})

Convey("Test not empty data", t, func() {
buf := bytes.NewBuffer(uint16SliceAsByteSlice([]uint16{1, 10, 32, 66, 23}))

instances := []byteInput{
newByteInput(buf.Bytes()),
newByteInputFromReader(buf),
}

for _, input := range instances {
n, err := input.readUInt16()
So(n, ShouldEqual, 1)
So(err, ShouldBeNil)

p, err := input.readUInt32()
So(p, ShouldEqual, 2097162) // 32 << 16 | 10
So(err, ShouldBeNil)

b, err := input.next(2)
So([]byte{66, 0}, ShouldResemble, b)
So(err, ShouldBeNil)

err = input.skipBytes(2)
So(err, ShouldBeNil)

b, err = input.next(1)
So(b, ShouldEqual, nil)
So(err, ShouldBeError)
}
})
}
35 changes: 31 additions & 4 deletions roaring.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"fmt"
"io"
"strconv"
"sync"
)

// Bitmap represents a compressed bitmap where you can add integers.
Expand Down Expand Up @@ -66,8 +67,14 @@ func (rb *Bitmap) WriteToMsgpack(stream io.Writer) (int64, error) {
// The format is compatible with other RoaringBitmap
// implementations (Java, C) and is documented here:
// https://github.com/RoaringBitmap/RoaringFormatSpec
func (rb *Bitmap) ReadFrom(stream io.Reader) (int64, error) {
return rb.highlowcontainer.readFrom(stream)
func (rb *Bitmap) ReadFrom(reader io.Reader) (p int64, err error) {
stream := byteInputAdapterPool.Get().(*byteInputAdapter)
stream.reset(reader)

p, err = rb.highlowcontainer.readFrom(stream)
byteInputAdapterPool.Put(stream)

return
}

// FromBuffer creates a bitmap from its serialized version stored in buffer
Expand All @@ -92,10 +99,30 @@ func (rb *Bitmap) ReadFrom(stream io.Reader) (int64, error) {
// also be broken. Thus, before making buf unavailable, you should
// call CloneCopyOnWriteContainers on all such bitmaps.
//
func (rb *Bitmap) FromBuffer(buf []byte) (int64, error) {
return rb.highlowcontainer.fromBuffer(buf)
func (rb *Bitmap) FromBuffer(buf []byte) (p int64, err error) {
stream := byteBufferPool.Get().(*byteBuffer)
stream.reset(buf)

p, err = rb.highlowcontainer.readFrom(stream)
byteBufferPool.Put(stream)

return
}

var (
byteBufferPool = sync.Pool{
New: func() interface{} {
return &byteBuffer{}
},
}

byteInputAdapterPool = sync.Pool{
New: func() interface{} {
return &byteInputAdapter{}
},
}
)

// RunOptimize attempts to further compress the runs of consecutive values found in the bitmap
func (rb *Bitmap) RunOptimize() {
rb.highlowcontainer.runOptimize()
Expand Down
2 changes: 1 addition & 1 deletion roaring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestReverseIteratorCount(t *testing.T) {
count := 0
for it.HasNext() {
it.Next()
count += 1
count++
}
if count != testSize {
t.FailNow()
Expand Down
Loading

0 comments on commit b5aa429

Please sign in to comment.