Skip to content

Commit

Permalink
gemini: moved in-flight set to partition
Browse files Browse the repository at this point in the history
  • Loading branch information
Henrik Johansson committed Sep 5, 2019
1 parent c548d3a commit 482573d
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 14 deletions.
14 changes: 2 additions & 12 deletions generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ type DistributionFunc func() TokenIndex

type Generator struct {
partitions []*Partition
inFlight inflight.InFlight
partitionCount uint64
table *Table
partitionsConfig PartitionRangeConfig
Expand All @@ -48,12 +47,12 @@ func NewGenerator(table *Table, config *GeneratorConfig, logger *zap.Logger) *Ge
partitions[i] = &Partition{
values: make(chan ValueWithToken, config.PkUsedBufferSize),
oldValues: make(chan ValueWithToken, config.PkUsedBufferSize),
inFlight: inflight.New(),
t: t,
}
}
gs := &Generator{
partitions: partitions,
inFlight: inflight.New(),
partitionCount: config.PartitionsCount,
table: table,
partitionsConfig: config.PartitionsRangeConfig,
Expand All @@ -73,12 +72,7 @@ func (g Generator) Get() (ValueWithToken, bool) {
default:
}
partition := g.partitions[uint64(g.idxFunc())%g.partitionCount]
for {
v := partition.pick()
if g.inFlight.AddIfNotPresent(v.Token) {
return v, true
}
}
return partition.get()
}

// GetOld returns a previously used value and token or a new if
Expand Down Expand Up @@ -107,10 +101,6 @@ func (g *Generator) GiveOld(v ValueWithToken) {
default:
}
partition := g.partitions[v.Token%g.partitionCount]
if len(v.Value) == 0 {
g.inFlight.Delete(v.Token)
return
}
partition.giveOld(v)
}

Expand Down
17 changes: 15 additions & 2 deletions partition.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,26 @@
package gemini

import "gopkg.in/tomb.v2"
import (
"github.com/scylladb/gemini/inflight"
"gopkg.in/tomb.v2"
)

type Partition struct {
values chan ValueWithToken
oldValues chan ValueWithToken
inFlight inflight.InFlight
t *tomb.Tomb
}

// get returns a new value and ensures that it's corresponding token
// is not already in-flight.
func (s *Partition) get() (ValueWithToken, bool) {
return s.pick(), true
for {
v := s.pick()
if s.inFlight.AddIfNotPresent(v.Token) {
return v, true
}
}
}

var emptyValueWithToken = ValueWithToken{}
Expand All @@ -31,6 +40,10 @@ func (s *Partition) getOld() (ValueWithToken, bool) {
// is empty in which case it removes the corresponding token from the
// in-flight tracking.
func (s *Partition) giveOld(v ValueWithToken) {
if len(v.Value) == 0 {
s.inFlight.Delete(v.Token)
return
}
select {
case s.oldValues <- v:
default:
Expand Down

0 comments on commit 482573d

Please sign in to comment.