Skip to content

Commit

Permalink
feat: Add ReadBatch() API
Browse files Browse the repository at this point in the history
Closes: #5
Signed-off-by: Michael Gasch <[email protected]>
  • Loading branch information
Michael Gasch committed Jan 17, 2022
1 parent 2217871 commit 7520232
Show file tree
Hide file tree
Showing 6 changed files with 469 additions and 72 deletions.
89 changes: 89 additions & 0 deletions example_batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package memlog_test

import (
"context"
"errors"
"fmt"
"os"

"github.com/embano1/memlog"
)

func Example_batch() {
const batchSize = 10

ctx := context.Background()
l, err := memlog.New(ctx)
if err != nil {
fmt.Printf("create log: %v", err)
os.Exit(1)
}

// seed log with data
for i := 0; i < 15; i++ {
d := fmt.Sprintf(`{"id":%d,"message","hello world"}`, i)
_, err = l.Write(ctx, []byte(d))
if err != nil {
fmt.Printf("write: %v", err)
os.Exit(1)
}
}

startOffset := memlog.Offset(0)
batch := make([]memlog.Record, batchSize)

fmt.Printf("reading batch starting at offset %d\n", startOffset)
count, err := l.ReadBatch(ctx, startOffset, batch)
if err != nil {
fmt.Printf("read batch: %v", err)
os.Exit(1)
}
fmt.Printf("records received in batch: %d\n", count)

// print valid batch entries up to "count"
for i := 0; i < count; i++ {
r := batch[i]
fmt.Printf("batch item: %d\toffset:%d\tdata: %s\n", i, r.Metadata.Offset, r.Data)
}

// read next batch and check if end of log reached
startOffset += memlog.Offset(count)
fmt.Printf("reading batch starting at offset %d\n", startOffset)
count, err = l.ReadBatch(ctx, startOffset, batch)
if err != nil {
if errors.Is(err, memlog.ErrFutureOffset) {
fmt.Println("reached end of log")
} else {
fmt.Printf("read batch: %v", err)
os.Exit(1)
}
}
fmt.Printf("records received in batch: %d\n", count)

// print valid batch entries up to "count"
for i := 0; i < count; i++ {
r := batch[i]
fmt.Printf("batch item: %d\toffset:%d\tdata: %s\n", i, r.Metadata.Offset, r.Data)
}

// Output: reading batch starting at offset 0
// records received in batch: 10
// batch item: 0 offset:0 data: {"id":0,"message","hello world"}
// batch item: 1 offset:1 data: {"id":1,"message","hello world"}
// batch item: 2 offset:2 data: {"id":2,"message","hello world"}
// batch item: 3 offset:3 data: {"id":3,"message","hello world"}
// batch item: 4 offset:4 data: {"id":4,"message","hello world"}
// batch item: 5 offset:5 data: {"id":5,"message","hello world"}
// batch item: 6 offset:6 data: {"id":6,"message","hello world"}
// batch item: 7 offset:7 data: {"id":7,"message","hello world"}
// batch item: 8 offset:8 data: {"id":8,"message","hello world"}
// batch item: 9 offset:9 data: {"id":9,"message","hello world"}
// reading batch starting at offset 10
// reached end of log
// records received in batch: 5
// batch item: 0 offset:10 data: {"id":10,"message","hello world"}
// batch item: 1 offset:11 data: {"id":11,"message","hello world"}
// batch item: 2 offset:12 data: {"id":12,"message","hello world"}
// batch item: 3 offset:13 data: {"id":13,"message","hello world"}
// batch item: 4 offset:14 data: {"id":14,"message","hello world"}
}
2 changes: 1 addition & 1 deletion example_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func Example_stream() {
// write some records (offsets 10-14)
for i := 0; i < writeRecords/2; i++ {
d := fmt.Sprintf(`{"id":%d,"message","hello world"}`, i+logStart)
_, err := l.Write(ctx, []byte(d))
_, err = l.Write(ctx, []byte(d))
if err != nil {
fmt.Printf("write: %v", err)
os.Exit(1)
Expand Down
File renamed without changes.
46 changes: 44 additions & 2 deletions memlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func New(_ context.Context, options ...Option) (*Log, error) {
return &l, nil
}

// Write creates a new record in the log with the given data. The write offset
// Write creates a new record in the log with the provided data. The write offset
// of the new record is returned. If an error occurs, an invalid offset (-1) and
// the error is returned.
//
Expand Down Expand Up @@ -176,7 +176,7 @@ func (l *Log) write(ctx context.Context, data []byte) (Offset, error) {
return r.Metadata.Offset, nil
}

// Read reads a record from the log at the given offset. If an error occurs, an
// Read reads a record from the log at the specified offset. If an error occurs, an
// invalid (empty) record and the error is returned.
//
// Safe for concurrent use.
Expand All @@ -187,6 +187,48 @@ func (l *Log) Read(ctx context.Context, offset Offset) (Record, error) {
return l.read(ctx, offset)
}

// ReadBatch reads multiple records into batch starting at the specified offset.
// The number of records read into batch and the error, if any, is returned.
//
// ReadBatch will read at most len(batch) records, always starting at batch
// index 0. ReadBatch stops reading at the end of the log, indicated by
// ErrFutureOffset.
//
// The caller must expect partial batch results and must not read more records
// from batch than indicated by the returned number of records. See the example
// for how to use this API.
//
// Safe for concurrent use.
func (l *Log) ReadBatch(ctx context.Context, offset Offset, batch []Record) (int, error) {
l.mu.RLock()
defer l.mu.RUnlock()

for i := 0; i < len(batch); i++ {
r, err := l.read(ctx, offset)
if err != nil {
// invalid start offset or empty log
if errors.Is(err, ErrOutOfRange) {
return 0, err
}

// return what we have
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return i, err
}

// end of log
if errors.Is(err, ErrFutureOffset) {
return i, ErrFutureOffset
}

}
batch[i] = r
offset++
}

return len(batch), nil
}

func (l *Log) read(ctx context.Context, offset Offset) (Record, error) {
if ctx.Err() != nil {
return Record{}, ctx.Err()
Expand Down
Loading

0 comments on commit 7520232

Please sign in to comment.