Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Gasch <[email protected]>
  • Loading branch information
Michael Gasch committed Feb 3, 2022
1 parent 6a3361f commit bcc1a14
Show file tree
Hide file tree
Showing 6 changed files with 414 additions and 56 deletions.
21 changes: 14 additions & 7 deletions sharded/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ type config struct {
}

type Log struct {
Sharder
clock clock.Clock
conf config
logs []*memlog.Log
sharder Sharder
clock clock.Clock
conf config
logs []*memlog.Log
}

func New(ctx context.Context, options ...Option) (*Log, error) {
Expand All @@ -44,8 +44,15 @@ func New(ctx context.Context, options ...Option) (*Log, error) {

shards := l.conf.shards
l.logs = make([]*memlog.Log, shards)
opts := []memlog.Option{
memlog.WithClock(l.clock),
memlog.WithMaxRecordDataSize(l.conf.maxRecordSize),
memlog.WithStartOffset(l.conf.startOffset),
memlog.WithMaxSegmentSize(l.conf.segmentSize),
}

for i := 0; i < int(shards); i++ {
ml, err := memlog.New(ctx)
ml, err := memlog.New(ctx, opts...)
if err != nil {
return nil, fmt.Errorf("create shard: %w", err)
}
Expand All @@ -56,7 +63,7 @@ func New(ctx context.Context, options ...Option) (*Log, error) {
}

func (l *Log) Write(ctx context.Context, key []byte, data []byte) (memlog.Offset, error) {
shard, err := l.Shard(key, l.conf.shards)
shard, err := l.sharder.Shard(key, l.conf.shards)
if err != nil {
return -1, fmt.Errorf("get shard: %w", err)
}
Expand All @@ -70,7 +77,7 @@ func (l *Log) Write(ctx context.Context, key []byte, data []byte) (memlog.Offset
}

func (l *Log) Read(ctx context.Context, key []byte, offset memlog.Offset) (memlog.Record, error) {
shard, err := l.Shard(key, l.conf.shards)
shard, err := l.sharder.Shard(key, l.conf.shards)
if err != nil {
return memlog.Record{}, fmt.Errorf("get shard: %w", err)
}
Expand Down
103 changes: 103 additions & 0 deletions sharded/log_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package sharded

import (
"context"
"testing"

"github.com/benbjohnson/clock"
"gotest.tools/v3/assert"

"github.com/embano1/memlog"
)

func TestNew(t *testing.T) {
t.Run("fails to create new log", func(t *testing.T) {
const (
defaultShards = 10
defaultStart = 0
defaultSegSize = 10
)

testCases := []struct {
name string
shards int
sharder Sharder
clock clock.Clock
start memlog.Offset // log start
segSize int
wantErr string
}{
{
name: "fails with invalid shard count",
shards: 0,
sharder: &defaultSharder{},
clock: clock.NewMock(),
start: defaultStart,
segSize: defaultSegSize,
wantErr: "must be greater than 1",
},
{
name: "fails with invalid segment size",
shards: defaultShards,
sharder: &defaultSharder{},
clock: clock.NewMock(),
start: defaultStart,
segSize: 0,
wantErr: "must be greater than 0",
},
{
name: "fails with invalid start offset",
shards: defaultShards,
sharder: &defaultSharder{},
clock: clock.NewMock(),
start: -10,
segSize: defaultSegSize,
wantErr: "must not be negative",
},
{
name: "fails with invalid clock",
shards: defaultShards,
sharder: &defaultSharder{},
clock: nil,
start: defaultStart,
segSize: defaultSegSize,
wantErr: "must not be nil",
},
{
name: "fails with invalid sharder",
shards: defaultShards,
sharder: nil,
clock: clock.NewMock(),
start: defaultStart,
segSize: defaultSegSize,
wantErr: "must not be nil",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
opts := []Option{
WithNumShards(uint(tc.shards)),
WithClock(tc.clock),
WithStartOffset(tc.start),
WithMaxSegmentSize(tc.segSize),
WithSharder(tc.sharder),
}
l, err := New(ctx, opts...)
assert.ErrorContains(t, err, tc.wantErr)
assert.DeepEqual(t, l, (*Log)(nil))
})
}
})

t.Run("successfully creates new log with defaults", func(t *testing.T) {
l, err := New(context.Background())
assert.NilError(t, err)
assert.Assert(t, l.conf.startOffset == DefaultStartOffset)
assert.Assert(t, l.conf.segmentSize == DefaultSegmentSize)
assert.Assert(t, l.conf.shards == DefaultShards)
assert.Assert(t, l.conf.maxRecordSize == DefaultMaxRecordDataBytes)
assert.Assert(t, len(l.logs) == DefaultShards)
})
}
Loading

0 comments on commit bcc1a14

Please sign in to comment.