From 752023294a3a166b09533cd6f9f6f53cd934548f Mon Sep 17 00:00:00 2001 From: Michael Gasch Date: Mon, 17 Jan 2022 14:47:16 +0100 Subject: [PATCH] feat: Add ReadBatch() API Closes: #5 Signed-off-by: Michael Gasch --- example_batch_test.go | 89 +++++++ example_stream_test.go | 2 +- example_simple_test.go => example_test.go | 0 memlog.go | 46 +++- memlog_internal_test.go | 136 +++++------ memlog_test.go | 268 +++++++++++++++++++++- 6 files changed, 469 insertions(+), 72 deletions(-) create mode 100644 example_batch_test.go rename example_simple_test.go => example_test.go (100%) diff --git a/example_batch_test.go b/example_batch_test.go new file mode 100644 index 0000000..dfbe1e3 --- /dev/null +++ b/example_batch_test.go @@ -0,0 +1,89 @@ +package memlog_test + +import ( + "context" + "errors" + "fmt" + "os" + + "github.com/embano1/memlog" +) + +func Example_batch() { + const batchSize = 10 + + ctx := context.Background() + l, err := memlog.New(ctx) + if err != nil { + fmt.Printf("create log: %v", err) + os.Exit(1) + } + + // seed log with data + for i := 0; i < 15; i++ { + d := fmt.Sprintf(`{"id":%d,"message","hello world"}`, i) + _, err = l.Write(ctx, []byte(d)) + if err != nil { + fmt.Printf("write: %v", err) + os.Exit(1) + } + } + + startOffset := memlog.Offset(0) + batch := make([]memlog.Record, batchSize) + + fmt.Printf("reading batch starting at offset %d\n", startOffset) + count, err := l.ReadBatch(ctx, startOffset, batch) + if err != nil { + fmt.Printf("read batch: %v", err) + os.Exit(1) + } + fmt.Printf("records received in batch: %d\n", count) + + // print valid batch entries up to "count" + for i := 0; i < count; i++ { + r := batch[i] + fmt.Printf("batch item: %d\toffset:%d\tdata: %s\n", i, r.Metadata.Offset, r.Data) + } + + // read next batch and check if end of log reached + startOffset += memlog.Offset(count) + fmt.Printf("reading batch starting at offset %d\n", startOffset) + count, err = l.ReadBatch(ctx, startOffset, batch) + if err != nil { + if errors.Is(err, memlog.ErrFutureOffset) { + fmt.Println("reached end of log") + } else { + fmt.Printf("read batch: %v", err) + os.Exit(1) + } + } + fmt.Printf("records received in batch: %d\n", count) + + // print valid batch entries up to "count" + for i := 0; i < count; i++ { + r := batch[i] + fmt.Printf("batch item: %d\toffset:%d\tdata: %s\n", i, r.Metadata.Offset, r.Data) + } + + // Output: reading batch starting at offset 0 + // records received in batch: 10 + // batch item: 0 offset:0 data: {"id":0,"message","hello world"} + // batch item: 1 offset:1 data: {"id":1,"message","hello world"} + // batch item: 2 offset:2 data: {"id":2,"message","hello world"} + // batch item: 3 offset:3 data: {"id":3,"message","hello world"} + // batch item: 4 offset:4 data: {"id":4,"message","hello world"} + // batch item: 5 offset:5 data: {"id":5,"message","hello world"} + // batch item: 6 offset:6 data: {"id":6,"message","hello world"} + // batch item: 7 offset:7 data: {"id":7,"message","hello world"} + // batch item: 8 offset:8 data: {"id":8,"message","hello world"} + // batch item: 9 offset:9 data: {"id":9,"message","hello world"} + // reading batch starting at offset 10 + // reached end of log + // records received in batch: 5 + // batch item: 0 offset:10 data: {"id":10,"message","hello world"} + // batch item: 1 offset:11 data: {"id":11,"message","hello world"} + // batch item: 2 offset:12 data: {"id":12,"message","hello world"} + // batch item: 3 offset:13 data: {"id":13,"message","hello world"} + // batch item: 4 offset:14 data: {"id":14,"message","hello world"} +} diff --git a/example_stream_test.go b/example_stream_test.go index 413e994..4f30748 100644 --- a/example_stream_test.go +++ b/example_stream_test.go @@ -36,7 +36,7 @@ func Example_stream() { // write some records (offsets 10-14) for i := 0; i < writeRecords/2; i++ { d := fmt.Sprintf(`{"id":%d,"message","hello world"}`, i+logStart) - _, err := l.Write(ctx, []byte(d)) + _, err = l.Write(ctx, []byte(d)) if err != nil { fmt.Printf("write: %v", err) os.Exit(1) diff --git a/example_simple_test.go b/example_test.go similarity index 100% rename from example_simple_test.go rename to example_test.go diff --git a/memlog.go b/memlog.go index d1ea746..15a728b 100644 --- a/memlog.go +++ b/memlog.go @@ -115,7 +115,7 @@ func New(_ context.Context, options ...Option) (*Log, error) { return &l, nil } -// Write creates a new record in the log with the given data. The write offset +// Write creates a new record in the log with the provided data. The write offset // of the new record is returned. If an error occurs, an invalid offset (-1) and // the error is returned. // @@ -176,7 +176,7 @@ func (l *Log) write(ctx context.Context, data []byte) (Offset, error) { return r.Metadata.Offset, nil } -// Read reads a record from the log at the given offset. If an error occurs, an +// Read reads a record from the log at the specified offset. If an error occurs, an // invalid (empty) record and the error is returned. // // Safe for concurrent use. @@ -187,6 +187,48 @@ func (l *Log) Read(ctx context.Context, offset Offset) (Record, error) { return l.read(ctx, offset) } +// ReadBatch reads multiple records into batch starting at the specified offset. +// The number of records read into batch and the error, if any, is returned. +// +// ReadBatch will read at most len(batch) records, always starting at batch +// index 0. ReadBatch stops reading at the end of the log, indicated by +// ErrFutureOffset. +// +// The caller must expect partial batch results and must not read more records +// from batch than indicated by the returned number of records. See the example +// for how to use this API. +// +// Safe for concurrent use. +func (l *Log) ReadBatch(ctx context.Context, offset Offset, batch []Record) (int, error) { + l.mu.RLock() + defer l.mu.RUnlock() + + for i := 0; i < len(batch); i++ { + r, err := l.read(ctx, offset) + if err != nil { + // invalid start offset or empty log + if errors.Is(err, ErrOutOfRange) { + return 0, err + } + + // return what we have + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return i, err + } + + // end of log + if errors.Is(err, ErrFutureOffset) { + return i, ErrFutureOffset + } + + } + batch[i] = r + offset++ + } + + return len(batch), nil +} + func (l *Log) read(ctx context.Context, offset Offset) (Record, error) { if ctx.Err() != nil { return Record{}, ctx.Err() diff --git a/memlog_internal_test.go b/memlog_internal_test.go index 9d376d7..c3efd9b 100644 --- a/memlog_internal_test.go +++ b/memlog_internal_test.go @@ -230,15 +230,15 @@ func TestLog_read(t *testing.T) { testCases := []struct { name string start Offset - record Offset + read Offset wantErr error }{ - {name: "start offset 0, read -5", start: 0, record: -5, wantErr: ErrOutOfRange}, - {name: "start offset 10, read 0", start: 10, record: 0, wantErr: ErrOutOfRange}, - {name: "start offset 10, read 9", start: 10, record: 9, wantErr: ErrOutOfRange}, - {name: "start offset 0, read 0", start: 0, record: 0, wantErr: ErrFutureOffset}, - {name: "start offset 10, read 100", start: 10, record: 100, wantErr: ErrFutureOffset}, - {name: "start offset 100, read 100", start: 100, record: 100, wantErr: ErrFutureOffset}, + {name: "start offset 0, read 0", start: 0, read: 0, wantErr: ErrFutureOffset}, + {name: "start offset 0, read -5", start: 0, read: -5, wantErr: ErrOutOfRange}, + {name: "start offset 10, read 0", start: 10, read: 0, wantErr: ErrOutOfRange}, + {name: "start offset 10, read 9", start: 10, read: 9, wantErr: ErrOutOfRange}, + {name: "start offset 10, read 100", start: 10, read: 100, wantErr: ErrFutureOffset}, + {name: "start offset 100, read 100", start: 100, read: 100, wantErr: ErrFutureOffset}, } for _, tc := range testCases { @@ -247,7 +247,7 @@ func TestLog_read(t *testing.T) { l, err := New(ctx, WithStartOffset(tc.start)) assert.NilError(t, err) - r, err := l.read(ctx, tc.record) + r, err := l.read(ctx, tc.read) assert.Assert(t, errors.Is(err, tc.wantErr)) assert.Assert(t, r.Metadata.Created.IsZero()) }) @@ -256,28 +256,28 @@ func TestLog_read(t *testing.T) { t.Run("read fails when record is purged", func(t *testing.T) { testCases := []struct { - name string - start Offset - segSize int - writeRecords [][]byte - record Offset - wantErr error + name string + start Offset + segSize int + records [][]byte + read Offset + wantErr error }{ { - name: "start offset 0, segment size 5, write 20, read offset 0", - start: 0, - segSize: 5, - writeRecords: NewTestDataSlice(t, 20), - record: 0, - wantErr: ErrOutOfRange, + name: "start offset 0, segment size 5, write 20, read offset 0", + start: 0, + segSize: 5, + records: NewTestDataSlice(t, 20), + read: 0, + wantErr: ErrOutOfRange, }, { - name: "start offset 10, segment size 2, write 5, read offset 10", - start: 10, - segSize: 2, - writeRecords: NewTestDataSlice(t, 5), - record: 10, - wantErr: ErrOutOfRange, + name: "start offset 10, segment size 2, write 5, read offset 10", + start: 10, + segSize: 2, + records: NewTestDataSlice(t, 5), + read: 10, + wantErr: ErrOutOfRange, }, } @@ -292,13 +292,13 @@ func TestLog_read(t *testing.T) { l, err := New(ctx, opts...) assert.NilError(t, err) - for i, d := range tc.writeRecords { + for i, d := range tc.records { offset, writeErr := l.write(ctx, d) assert.NilError(t, writeErr) assert.Equal(t, offset, tc.start+Offset(i)) } - r, err := l.read(ctx, tc.record) + r, err := l.read(ctx, tc.read) assert.Assert(t, errors.Is(err, tc.wantErr)) assert.Assert(t, r.Metadata.Created.IsZero()) }) @@ -307,22 +307,22 @@ func TestLog_read(t *testing.T) { t.Run("read from log succeeds", func(t *testing.T) { testCases := []struct { - name string - start Offset - segSize int - writeRecords [][]byte + name string + start Offset + segSize int + records [][]byte }{ { - name: "start offset 0, segment size 5, write and read 3", - start: 0, - segSize: 5, - writeRecords: NewTestDataSlice(t, 3), + name: "start offset 0, segment size 5, write and read 3", + start: 0, + segSize: 5, + records: NewTestDataSlice(t, 3), }, { - name: "start offset 10, segment size 10, write and read 10", - start: 10, - segSize: 10, - writeRecords: NewTestDataSlice(t, 10), + name: "start offset 10, segment size 10, write and read 10", + start: 10, + segSize: 10, + records: NewTestDataSlice(t, 10), }, } @@ -343,7 +343,7 @@ func TestLog_read(t *testing.T) { l, err := New(ctx, opts...) assert.NilError(t, err) - for i, d := range tc.writeRecords { + for i, d := range tc.records { offset, writeErr := l.write(ctx, d) assert.NilError(t, writeErr) assert.Equal(t, offset, tc.start+Offset(i)) @@ -354,7 +354,7 @@ func TestLog_read(t *testing.T) { Offset: Offset(i) + tc.start, Created: now, }, - Data: tc.writeRecords[i], + Data: tc.records[i], } assert.NilError(t, writeErr) @@ -372,57 +372,57 @@ func Test_offsetRange(t *testing.T) { } testCases := []struct { - name string - start Offset - segSize int - writeRecords [][]byte - want wantOffsets + name string + start Offset + segSize int + records [][]byte + want wantOffsets }{ { - name: "empty log, starts at 0", - start: 0, - segSize: 10, - writeRecords: nil, + name: "empty log, starts at 0", + start: 0, + segSize: 10, + records: nil, want: wantOffsets{ earliest: -1, latest: -1, }, }, { - name: "empty log, starts at 100", - start: 100, - segSize: 10, - writeRecords: nil, + name: "empty log, starts at 100", + start: 100, + segSize: 10, + records: nil, want: wantOffsets{ earliest: -1, latest: -1, }, }, { - name: "log with 10 records, starts at 0, no purge", - start: 0, - segSize: 20, - writeRecords: NewTestDataSlice(t, 10), + name: "log with 10 records, starts at 0, no purge", + start: 0, + segSize: 20, + records: NewTestDataSlice(t, 10), want: wantOffsets{ earliest: 0, latest: 9, }, }, { - name: "log with 10 records, starts at 60, no purge", - start: 60, - segSize: 20, - writeRecords: NewTestDataSlice(t, 10), + name: "log with 10 records, starts at 60, no purge", + start: 60, + segSize: 20, + records: NewTestDataSlice(t, 10), want: wantOffsets{ earliest: 60, latest: 69, }, }, { - name: "log with 30 records, starts at 10, segment size 10, purged history", - start: 10, - segSize: 10, - writeRecords: NewTestDataSlice(t, 30), + name: "log with 30 records, starts at 10, segment size 10, purged history", + start: 10, + segSize: 10, + records: NewTestDataSlice(t, 30), want: wantOffsets{ earliest: 20, latest: 39, @@ -442,7 +442,7 @@ func Test_offsetRange(t *testing.T) { l, err := New(ctx, opts...) assert.NilError(t, err) - for i, r := range tc.writeRecords { + for i, r := range tc.records { offset, writeErr := l.write(ctx, r) assert.NilError(t, writeErr) assert.Equal(t, offset, tc.start+Offset(i)) diff --git a/memlog_test.go b/memlog_test.go index 9872e56..cdd9c45 100644 --- a/memlog_test.go +++ b/memlog_test.go @@ -14,6 +14,272 @@ import ( "github.com/embano1/memlog" ) +func TestLog_ReadBatch(t *testing.T) { + t.Run("fails to read batch", func(t *testing.T) { + testCases := []struct { + name string + start memlog.Offset // log start + segSize int + records [][]byte + offset memlog.Offset // read offset + batchSize int + wantRecords int // total number records read + wantErr error + }{ + { + name: "fails on empty log", + start: 0, + segSize: 10, + records: nil, + offset: 0, + batchSize: 10, + wantRecords: 0, + wantErr: memlog.ErrFutureOffset, + }, + { + name: "fails on invalid start offset", + start: 10, + segSize: 10, + records: memlog.NewTestDataSlice(t, 10), + offset: 0, + batchSize: 10, + wantRecords: 0, + wantErr: memlog.ErrOutOfRange, + }, + { + name: "fails on invalid read offset", + start: 10, + segSize: 10, + records: memlog.NewTestDataSlice(t, 10), + offset: 20, + batchSize: 10, + wantRecords: 0, + wantErr: memlog.ErrFutureOffset, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + + opts := []memlog.Option{ + memlog.WithClock(clock.NewMock()), + memlog.WithStartOffset(tc.start), + memlog.WithMaxSegmentSize(tc.segSize), + } + + l, err := memlog.New(ctx, opts...) + assert.NilError(t, err) + + records := make([]memlog.Record, tc.batchSize) + count, err := l.ReadBatch(ctx, tc.offset, records) + assert.Assert(t, errors.Is(err, tc.wantErr)) + assert.Equal(t, count, tc.wantRecords) + }) + } + }) + + t.Run("fails on cancelled context", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + l, err := memlog.New(ctx) + assert.NilError(t, err) + + records := make([]memlog.Record, 10) + cancel() + count, err := l.ReadBatch(ctx, 0, records) + assert.Assert(t, errors.Is(err, context.Canceled)) + assert.Equal(t, count, 0) + }) + + t.Run("fails when deadline exceeded", func(t *testing.T) { + ctx := context.Background() + l, err := memlog.New(ctx) + assert.NilError(t, err) + + ctx, cancel := context.WithTimeout(ctx, 0) + defer cancel() + + records := make([]memlog.Record, 10) + count, err := l.ReadBatch(ctx, 0, records) + assert.Assert(t, errors.Is(err, context.DeadlineExceeded)) + assert.Equal(t, count, 0) + }) + + t.Run("reads one batch", func(t *testing.T) { + testCases := []struct { + name string + start memlog.Offset // log start + segSize int + records [][]byte + offset memlog.Offset // read offset + batchSize int + }{ + { + name: "log starts at 0, write 10 records, no purge, batch size 10", + start: 0, + segSize: 10, + records: memlog.NewTestDataSlice(t, 10), + offset: 0, + batchSize: 10, + }, + { + name: "log starts at 0, write 10 records, no purge, batch size 5", + start: 0, + segSize: 10, + records: memlog.NewTestDataSlice(t, 10), + offset: 0, + batchSize: 5, + }, + { + name: "log starts at 10, write 10 records, no purge, batch size 0", + start: 10, + segSize: 10, + records: memlog.NewTestDataSlice(t, 10), + offset: 0, + batchSize: 0, + }, + { + name: "log starts at 10, write 5 records, no purge, batch size 5", + start: 10, + segSize: 10, + records: memlog.NewTestDataSlice(t, 5), + offset: 10, + batchSize: 5, + }, + { + name: "log starts at 10, write 30 records, with purge, batch size 10", + start: 10, + segSize: 10, + records: memlog.NewTestDataSlice(t, 30), + offset: 30, + batchSize: 10, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + + opts := []memlog.Option{ + memlog.WithClock(clock.NewMock()), + memlog.WithStartOffset(tc.start), + memlog.WithMaxSegmentSize(tc.segSize), + } + + l, err := memlog.New(ctx, opts...) + assert.NilError(t, err) + + for _, d := range tc.records { + _, err = l.Write(ctx, d) + assert.NilError(t, err) + } + + records := make([]memlog.Record, tc.batchSize) + count, err := l.ReadBatch(ctx, tc.offset, records) + assert.NilError(t, err) + assert.Equal(t, count, tc.batchSize) + }) + } + }) + + t.Run("reads multiple batches until end of log", func(t *testing.T) { + testCases := []struct { + name string + start memlog.Offset // log start + segSize int + records [][]byte + offset memlog.Offset // read offset + batchSize int + }{ + { + name: "log starts at 0, write 30 records, no purge, batch size 10", + start: 0, + segSize: 30, + records: memlog.NewTestDataSlice(t, 30), + offset: 0, + batchSize: 10, + }, + { + name: "log starts at 0, write 10 records, no purge, read from 9, batch size 5", + start: 0, + segSize: 30, + records: memlog.NewTestDataSlice(t, 10), + offset: 9, + batchSize: 5, + }, + { + name: "log starts at 0, write 30 records, no purge, read from 10, batch size 5", + start: 0, + segSize: 30, + records: memlog.NewTestDataSlice(t, 30), + offset: 10, + batchSize: 5, + }, + { + name: "log starts at 10, write 40 records, no purge, read from 20, batch size 1", + start: 10, + segSize: 40, + records: memlog.NewTestDataSlice(t, 40), + offset: 20, + batchSize: 1, + }, + { + name: "log starts at 0, write 30 records, with purge, read from 10, batch size 5", + start: 0, + segSize: 10, + records: memlog.NewTestDataSlice(t, 30), + offset: 10, + batchSize: 5, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + + opts := []memlog.Option{ + memlog.WithClock(clock.NewMock()), + memlog.WithStartOffset(tc.start), + memlog.WithMaxSegmentSize(tc.segSize), + } + + l, err := memlog.New(ctx, opts...) + assert.NilError(t, err) + + for _, d := range tc.records { + _, err = l.Write(ctx, d) + assert.NilError(t, err) + } + + records := make([]memlog.Record, tc.batchSize) + + var ( + count int + total int + offset = tc.offset + ) + for err == nil { + count, err = l.ReadBatch(ctx, offset, records) + total += count + offset += memlog.Offset(count) + } + + assert.Assert(t, errors.Is(err, memlog.ErrFutureOffset)) + switch { + // with purge + case len(tc.records) > tc.segSize: + assert.Equal(t, total, len(tc.records)-(2*tc.segSize)+int(tc.offset)) + // no purge + default: + assert.Equal(t, total, len(tc.records)-int(tc.offset-tc.start)) + } + }) + } + }) +} + func TestLog_Checkpoint_Resume(t *testing.T) { const ( sourceDataCount = 50 @@ -95,7 +361,7 @@ func TestLog_Checkpoint_Resume(t *testing.T) { break } - assert.Equal(t, r.Metadata.Offset, memlog.Offset(i)) + assert.Equal(t, r.Metadata.Offset, i) records = append(records, r) } })