diff --git a/pkg/generators/generator.go b/pkg/generators/generator.go index ab09db1d..ff3b8956 100644 --- a/pkg/generators/generator.go +++ b/pkg/generators/generator.go @@ -51,11 +51,12 @@ type GeneratorInterface interface { } type Generator struct { + wakeUpSignal chan struct{} ctx context.Context logger *zap.Logger table *testschema.Table idxFunc DistributionFunc - partitions []*Partition + partitions Partitions partitionsConfig typedef.PartitionRangeConfig partitionCount uint64 seed uint64 @@ -92,6 +93,7 @@ func NewGenerator(ctx context.Context, table *testschema.Table, config *Config, seed: config.Seed, idxFunc: config.PartitionsDistributionFunc, logger: logger, + wakeUpSignal: make(chan struct{}), } gs.start() return gs @@ -111,9 +113,23 @@ func (g *Generator) Get() *typedef.ValueWithToken { return nil } partition := g.partitions[uint64(g.idxFunc())%g.partitionCount] + if partition.NeedMoreValues() { + g.wakeupGenerator() + } return partition.get() } +func (g *Generator) wakeupGenerator() { + select { + case g.wakeUpSignal <- struct{}{}: + default: + } +} + +func (g *Generator) waitForMoreValuesNeeded() { + <-g.wakeUpSignal +} + // GetOld returns a previously used value and token or a new if // the old queue is empty. func (g *Generator) GetOld() *typedef.ValueWithToken { @@ -171,6 +187,14 @@ func (g *Generator) start() { zap.Uint64("keys_emitted", cntEmitted)) return gCtx.Err() default: + // This part is only get triggered when partition is full of values + // Which is signal to stop generating + // But if partitions values are not balanced, you can have case when one partition is full + // While other partitions are low on values + // To address this case before pausing generation we need to make sure that all partition are above the limit + if g.partitions.NeedMoreValues() { + g.waitForMoreValuesNeeded() + } } } }) diff --git a/pkg/generators/partition.go b/pkg/generators/partition.go index 7e1f8418..7ecf4cb7 100644 --- a/pkg/generators/partition.go +++ b/pkg/generators/partition.go @@ -28,6 +28,10 @@ type Partition struct { inFlight inflight.InFlight } +func (s *Partition) NeedMoreValues() bool { + return len(s.values) < cap(s.values)-30 +} + // get returns a new value and ensures that it's corresponding token // is not already in-flight. func (s *Partition) get() *typedef.ValueWithToken { @@ -71,3 +75,14 @@ func (s *Partition) releaseToken(token uint64) { func (s *Partition) pick() *typedef.ValueWithToken { return <-s.values } + +type Partitions []*Partition + +func (p Partitions) NeedMoreValues() bool { + for _, part := range p { + if part.NeedMoreValues() { + return true + } + } + return false +}