Skip to content

Commit

Permalink
fix(generator): pause generator when partitions are full
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmitry Kropachev committed Jun 3, 2023
1 parent 080b201 commit 73f64f2
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 1 deletion.
26 changes: 25 additions & 1 deletion pkg/generators/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,12 @@ type GeneratorInterface interface {
}

type Generator struct {
wakeUpSignal chan struct{}
ctx context.Context
logger *zap.Logger
table *testschema.Table
idxFunc DistributionFunc
partitions []*Partition
partitions Partitions
partitionsConfig typedef.PartitionRangeConfig
partitionCount uint64
seed uint64
Expand Down Expand Up @@ -92,6 +93,7 @@ func NewGenerator(ctx context.Context, table *testschema.Table, config *Config,
seed: config.Seed,
idxFunc: config.PartitionsDistributionFunc,
logger: logger,
wakeUpSignal: make(chan struct{}),
}
gs.start()
return gs
Expand All @@ -111,9 +113,23 @@ func (g *Generator) Get() *typedef.ValueWithToken {
return nil
}
partition := g.partitions[uint64(g.idxFunc())%g.partitionCount]
if partition.NeedMoreValues() {
g.wakeupGenerator()
}
return partition.get()
}

func (g *Generator) wakeupGenerator() {
select {
case g.wakeUpSignal <- struct{}{}:
default:
}
}

func (g *Generator) waitForMoreValuesNeeded() {
<-g.wakeUpSignal
}

// GetOld returns a previously used value and token or a new if
// the old queue is empty.
func (g *Generator) GetOld() *typedef.ValueWithToken {
Expand Down Expand Up @@ -171,6 +187,14 @@ func (g *Generator) start() {
zap.Uint64("keys_emitted", cntEmitted))
return gCtx.Err()
default:
// This part is only get triggered when partition is full of values
// Which is signal to stop generating
// But if partitions values are not balanced, you can have case when one partition is full
// While other partitions are low on values
// To address this case before pausing generation we need to make sure that all partition are above the limit
if g.partitions.NeedMoreValues() {
g.waitForMoreValuesNeeded()
}
}
}
})
Expand Down
15 changes: 15 additions & 0 deletions pkg/generators/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ type Partition struct {
inFlight inflight.InFlight
}

func (s *Partition) NeedMoreValues() bool {
return len(s.values) < cap(s.values)-30
}

// get returns a new value and ensures that it's corresponding token
// is not already in-flight.
func (s *Partition) get() *typedef.ValueWithToken {
Expand Down Expand Up @@ -71,3 +75,14 @@ func (s *Partition) releaseToken(token uint64) {
func (s *Partition) pick() *typedef.ValueWithToken {
return <-s.values
}

type Partitions []*Partition

func (p Partitions) NeedMoreValues() bool {
for _, part := range p {
if part.NeedMoreValues() {
return true
}
}
return false
}

0 comments on commit 73f64f2

Please sign in to comment.