diff --git a/options.go b/options.go index 0c1c3c6..5292e32 100644 --- a/options.go +++ b/options.go @@ -9,7 +9,7 @@ import ( const ( // DefaultStartOffset is the start offset of the log DefaultStartOffset = Offset(0) - // DefaultSegmentSize is the segment size of the log + // DefaultSegmentSize is the segment size, i.e. number of offsets, in the log DefaultSegmentSize = 1024 // DefaultMaxRecordDataBytes is the maximum data (payload) size of a record DefaultMaxRecordDataBytes = 1024 << 10 // 1MiB diff --git a/sharded/log.go b/sharded/log.go index d870be7..a266c12 100644 --- a/sharded/log.go +++ b/sharded/log.go @@ -9,10 +9,6 @@ import ( "github.com/embano1/memlog" ) -const ( - maxShards = 1000 -) - type config struct { shards uint @@ -29,17 +25,8 @@ type Log struct { logs []*memlog.Log } -func New(ctx context.Context, shards uint, options ...Option) (*Log, error) { - if shards < 1 { - return nil, fmt.Errorf("shards must be greater than 0") - } - - if shards > maxShards { - return nil, fmt.Errorf("shards must not be greater than %d", maxShards) - } - +func New(ctx context.Context, options ...Option) (*Log, error) { var l Log - l.conf.shards = shards // apply defaults for _, opt := range defaultOptions { @@ -55,6 +42,7 @@ func New(ctx context.Context, shards uint, options ...Option) (*Log, error) { } } + shards := l.conf.shards l.logs = make([]*memlog.Log, shards) for i := 0; i < int(shards); i++ { ml, err := memlog.New(ctx) diff --git a/sharded/log_test.go b/sharded/log_test.go index c572146..96d1038 100644 --- a/sharded/log_test.go +++ b/sharded/log_test.go @@ -17,11 +17,9 @@ func TestLog(t *testing.T) { "group": {"michael", "hannah", "bjarne", "soenke"}, "groups": {"family", "friends", "colleagues"}, "Z": {"ent inc.", "vmware", "google", "microsoft"}, - "K": {"ent inc.", "vmware", "google", "microsoft"}, - "A": {"ent inc.", "vmware", "google", "microsoft"}, } - l, err := New(ctx, 20) + l, err := New(ctx) assert.NilError(t, err) for k, vals := range kvs { diff --git a/sharded/options.go b/sharded/options.go index c5fee53..c506e49 100644 --- a/sharded/options.go +++ b/sharded/options.go @@ -9,11 +9,13 @@ import ( ) const ( - // DefaultStartOffset is the start offset of each shard + // DefaultShards is the number of shards unless specified otherwise + DefaultShards = 1000 + // DefaultStartOffset is the start offset in a shard DefaultStartOffset = memlog.DefaultStartOffset - // DefaultSegmentSize is the segment size of each shard + // DefaultSegmentSize is the segment size, i.e. number of offsets, in a shard DefaultSegmentSize = memlog.DefaultSegmentSize - // DefaultMaxRecordDataBytes is the maximum data (payload) size of a record + // DefaultMaxRecordDataBytes is the maximum data (payload) size of a record in a shard DefaultMaxRecordDataBytes = memlog.DefaultMaxRecordDataBytes ) @@ -24,6 +26,7 @@ var defaultOptions = []Option{ WithClock(clock.New()), WithMaxRecordDataSize(DefaultMaxRecordDataBytes), WithMaxSegmentSize(DefaultSegmentSize), + WithNumShards(DefaultShards), WithSharder(newDefaultSharder()), WithStartOffset(DefaultStartOffset), } @@ -40,7 +43,8 @@ func WithClock(c clock.Clock) Option { } } -// WithMaxRecordDataSize sets the maximum record data (payload) size in bytes +// WithMaxRecordDataSize sets the maximum record data (payload) size in bytes in +// each shard func WithMaxRecordDataSize(size int) Option { return func(log *Log) error { if size <= 0 { @@ -51,8 +55,8 @@ func WithMaxRecordDataSize(size int) Option { } } -// WithMaxSegmentSize sets the maximum size, i.e. number of offsets, in a log -// segment. Must be greater than 0. +// WithMaxSegmentSize sets the maximum size, i.e. number of offsets, in each shard. +// Must be greater than 0. func WithMaxSegmentSize(size int) Option { return func(log *Log) error { if size <= 0 { @@ -63,6 +67,19 @@ func WithMaxSegmentSize(size int) Option { } } +// WithNumShards sets the number of shards in a log +func WithNumShards(n uint) Option { + return func(log *Log) error { + // sharded log with < 2 shards is a standard log + if n < 2 { + return errors.New("number of shards must be greater than 1") + } + log.conf.shards = n + return nil + } +} + +// WithSharder uses the specified sharder for key sharding func WithSharder(s Sharder) Option { return func(log *Log) error { if s == nil { @@ -73,7 +90,7 @@ func WithSharder(s Sharder) Option { } } -// WithStartOffset sets the start offset of the log. Must be equal or greater +// WithStartOffset sets the start offset of each shard. Must be equal or greater // than 0. func WithStartOffset(offset memlog.Offset) Option { return func(log *Log) error { diff --git a/sharded/sharder.go b/sharded/shard.go similarity index 77% rename from sharded/sharder.go rename to sharded/shard.go index 6ce2292..66a4ea6 100644 --- a/sharded/sharder.go +++ b/sharded/shard.go @@ -1,6 +1,7 @@ package sharded import ( + "crypto/sha256" "fmt" "hash" "hash/fnv" @@ -13,12 +14,14 @@ type Sharder interface { type defaultSharder struct { sync.Mutex - hasher hash.Hash32 + hash32 hash.Hash32 + hash256 hash.Hash } func newDefaultSharder() *defaultSharder { return &defaultSharder{ - hasher: fnv.New32a(), + hash32: fnv.New32a(), + hash256: sha256.New(), } } @@ -39,11 +42,11 @@ func (d *defaultSharder) hash(key []byte) (uint32, error) { d.Lock() defer d.Unlock() - d.hasher.Reset() - _, err := d.hasher.Write(key) + d.hash32.Reset() + _, err := d.hash32.Write(key) if err != nil { return 0, err } - return d.hasher.Sum32(), nil + return d.hash32.Sum32(), nil }