Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Gasch <[email protected]>
  • Loading branch information
Michael Gasch committed Jan 31, 2022
1 parent 34babf3 commit 6a3361f
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 30 deletions.
2 changes: 1 addition & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
const (
// DefaultStartOffset is the start offset of the log
DefaultStartOffset = Offset(0)
// DefaultSegmentSize is the segment size of the log
// DefaultSegmentSize is the segment size, i.e. number of offsets, in the log
DefaultSegmentSize = 1024
// DefaultMaxRecordDataBytes is the maximum data (payload) size of a record
DefaultMaxRecordDataBytes = 1024 << 10 // 1MiB
Expand Down
16 changes: 2 additions & 14 deletions sharded/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ import (
"github.com/embano1/memlog"
)

const (
maxShards = 1000
)

type config struct {
shards uint

Expand All @@ -29,17 +25,8 @@ type Log struct {
logs []*memlog.Log
}

func New(ctx context.Context, shards uint, options ...Option) (*Log, error) {
if shards < 1 {
return nil, fmt.Errorf("shards must be greater than 0")
}

if shards > maxShards {
return nil, fmt.Errorf("shards must not be greater than %d", maxShards)
}

func New(ctx context.Context, options ...Option) (*Log, error) {
var l Log
l.conf.shards = shards

// apply defaults
for _, opt := range defaultOptions {
Expand All @@ -55,6 +42,7 @@ func New(ctx context.Context, shards uint, options ...Option) (*Log, error) {
}
}

shards := l.conf.shards
l.logs = make([]*memlog.Log, shards)
for i := 0; i < int(shards); i++ {
ml, err := memlog.New(ctx)
Expand Down
4 changes: 1 addition & 3 deletions sharded/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ func TestLog(t *testing.T) {
"group": {"michael", "hannah", "bjarne", "soenke"},
"groups": {"family", "friends", "colleagues"},
"Z": {"ent inc.", "vmware", "google", "microsoft"},
"K": {"ent inc.", "vmware", "google", "microsoft"},
"A": {"ent inc.", "vmware", "google", "microsoft"},
}

l, err := New(ctx, 20)
l, err := New(ctx)
assert.NilError(t, err)

for k, vals := range kvs {
Expand Down
31 changes: 24 additions & 7 deletions sharded/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import (
)

const (
// DefaultStartOffset is the start offset of each shard
// DefaultShards is the number of shards unless specified otherwise
DefaultShards = 1000
// DefaultStartOffset is the start offset in a shard
DefaultStartOffset = memlog.DefaultStartOffset
// DefaultSegmentSize is the segment size of each shard
// DefaultSegmentSize is the segment size, i.e. number of offsets, in a shard
DefaultSegmentSize = memlog.DefaultSegmentSize
// DefaultMaxRecordDataBytes is the maximum data (payload) size of a record
// DefaultMaxRecordDataBytes is the maximum data (payload) size of a record in a shard
DefaultMaxRecordDataBytes = memlog.DefaultMaxRecordDataBytes
)

Expand All @@ -24,6 +26,7 @@ var defaultOptions = []Option{
WithClock(clock.New()),
WithMaxRecordDataSize(DefaultMaxRecordDataBytes),
WithMaxSegmentSize(DefaultSegmentSize),
WithNumShards(DefaultShards),
WithSharder(newDefaultSharder()),
WithStartOffset(DefaultStartOffset),
}
Expand All @@ -40,7 +43,8 @@ func WithClock(c clock.Clock) Option {
}
}

// WithMaxRecordDataSize sets the maximum record data (payload) size in bytes
// WithMaxRecordDataSize sets the maximum record data (payload) size in bytes in
// each shard
func WithMaxRecordDataSize(size int) Option {
return func(log *Log) error {
if size <= 0 {
Expand All @@ -51,8 +55,8 @@ func WithMaxRecordDataSize(size int) Option {
}
}

// WithMaxSegmentSize sets the maximum size, i.e. number of offsets, in a log
// segment. Must be greater than 0.
// WithMaxSegmentSize sets the maximum size, i.e. number of offsets, in each shard.
// Must be greater than 0.
func WithMaxSegmentSize(size int) Option {
return func(log *Log) error {
if size <= 0 {
Expand All @@ -63,6 +67,19 @@ func WithMaxSegmentSize(size int) Option {
}
}

// WithNumShards sets the number of shards in a log
func WithNumShards(n uint) Option {
return func(log *Log) error {
// sharded log with < 2 shards is a standard log
if n < 2 {
return errors.New("number of shards must be greater than 1")
}
log.conf.shards = n
return nil
}
}

// WithSharder uses the specified sharder for key sharding
func WithSharder(s Sharder) Option {
return func(log *Log) error {
if s == nil {
Expand All @@ -73,7 +90,7 @@ func WithSharder(s Sharder) Option {
}
}

// WithStartOffset sets the start offset of the log. Must be equal or greater
// WithStartOffset sets the start offset of each shard. Must be equal or greater
// than 0.
func WithStartOffset(offset memlog.Offset) Option {
return func(log *Log) error {
Expand Down
13 changes: 8 additions & 5 deletions sharded/sharder.go → sharded/shard.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sharded

import (
"crypto/sha256"
"fmt"
"hash"
"hash/fnv"
Expand All @@ -13,12 +14,14 @@ type Sharder interface {

type defaultSharder struct {
sync.Mutex
hasher hash.Hash32
hash32 hash.Hash32
hash256 hash.Hash
}

func newDefaultSharder() *defaultSharder {
return &defaultSharder{
hasher: fnv.New32a(),
hash32: fnv.New32a(),
hash256: sha256.New(),
}
}

Expand All @@ -39,11 +42,11 @@ func (d *defaultSharder) hash(key []byte) (uint32, error) {
d.Lock()
defer d.Unlock()

d.hasher.Reset()
_, err := d.hasher.Write(key)
d.hash32.Reset()
_, err := d.hash32.Write(key)
if err != nil {
return 0, err
}

return d.hasher.Sum32(), nil
return d.hash32.Sum32(), nil
}

0 comments on commit 6a3361f

Please sign in to comment.