Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvement cql stmt generation #435

13 changes: 13 additions & 0 deletions .run/Run Gemini Mixed.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Run Gemini Mixed" type="GoApplicationRunConfiguration" factoryName="Go Application">
<module name="gemini" />
<working_directory value="$PROJECT_DIR$" />
<go_parameters value="-gcflags &quot;all=-N -l&quot;" />
<parameters value="--dataset-size=small --cql-features all --duration 2m --drop-schema true --fail-fast --level info --non-interactive --materialized-views false --outfile ./results/gemini.log --test-statement-log-file ./results/gemini_test_statements.log --oracle-statement-log-file ./results/gemini_oracle_statements.log --test-host-selection-policy token-aware --oracle-host-selection-policy token-aware --test-cluster=192.168.100.2 --oracle-cluster=192.168.100.3 --outfile ./results/gemini_result.log --mode mixed --non-interactive --request-timeout 180s --connect-timeout 120s --use-server-timestamps false --async-objects-stabilization-attempts 10 --async-objects-stabilization-backoff 100ms --replication-strategy &quot;{'class': 'NetworkTopologyStrategy', 'replication_factor': '1'}&quot; --oracle-replication-strategy &quot;{'class': 'NetworkTopologyStrategy', 'replication_factor': '1'}&quot; --max-mutation-retries 5 --max-mutation-retries-backoff 1000ms --concurrency 1 --tracing-outfile ./results/gemini_tracing.log" />
<kind value="PACKAGE" />
<package value="github.com/scylladb/gemini/cmd/gemini" />
<directory value="$PROJECT_DIR$" />
<filePath value="$PROJECT_DIR$/cmd/gemini/main.go" />
<method v="2" />
</configuration>
</component>
13 changes: 13 additions & 0 deletions .run/Run Gemini Read.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Run Gemini Read" type="GoApplicationRunConfiguration" factoryName="Go Application">
<module name="gemini" />
<working_directory value="$PROJECT_DIR$" />
<go_parameters value="-gcflags &quot;all=-N -l&quot;" />
<parameters value="--dataset-size=small --cql-features all --warmup 0 --duration 2m --drop-schema true --fail-fast --level info --non-interactive --materialized-views false --outfile ./results/gemini.log --test-statement-log-file ./results/gemini_test_statements.log --oracle-statement-log-file ./results/gemini_oracle_statements.log --test-host-selection-policy token-aware --oracle-host-selection-policy token-aware --test-cluster=192.168.100.2 --oracle-cluster=192.168.100.3 --outfile ./results/gemini_result.log --mode read --non-interactive --request-timeout 180s --connect-timeout 120s --use-server-timestamps false --async-objects-stabilization-attempts 10 --async-objects-stabilization-backoff 100ms --replication-strategy &quot;{'class': 'NetworkTopologyStrategy', 'replication_factor': '1'}&quot; --oracle-replication-strategy &quot;{'class': 'NetworkTopologyStrategy', 'replication_factor': '1'}&quot; --max-mutation-retries 5 --max-mutation-retries-backoff 1000ms --concurrency 2 --tracing-outfile ./results/gemini_tracing.log" />
<kind value="PACKAGE" />
<package value="github.com/scylladb/gemini/cmd/gemini" />
<directory value="$PROJECT_DIR$" />
<filePath value="$PROJECT_DIR$/cmd/gemini/main.go" />
<method v="2" />
</configuration>
</component>
13 changes: 0 additions & 13 deletions .run/Run Gemini.run.xml

This file was deleted.

50 changes: 23 additions & 27 deletions pkg/jobs/pump.go → pkg/burst/pump.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,40 +12,36 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package jobs
package burst

import (
"context"
"math/rand/v2"
"time"
)

"github.com/scylladb/gemini/pkg/stop"
const ChannelSize = 10000

"go.uber.org/zap"
"golang.org/x/exp/rand"
)
func work(ctx context.Context, pump chan<- time.Duration, chance int, sleepDuration time.Duration) {
defer close(pump)
for {
select {
case <-ctx.Done():
return
default:
sleep := time.Duration(0)

func NewPump(stopFlag *stop.Flag, logger *zap.Logger) chan time.Duration {
pump := make(chan time.Duration, 10000)
logger = logger.Named("Pump")
go func() {
logger.Debug("pump channel opened")
defer func() {
close(pump)
logger.Debug("pump channel closed")
}()
for !stopFlag.IsHardOrSoft() {
pump <- newHeartBeat()
}
}()
if rand.Int()%chance == 0 {
sleep = sleepDuration
}

return pump
pump <- sleep
Comment on lines +32 to +38

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it should sleep? Can you explain me exactly how it acts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Burst mechanism, makes a user of this, sleep after change is hit. practically if you make change = 10.

It will sleep approximatly one in 10. This is just to not overload the server

}
}
}

func newHeartBeat() time.Duration {
r := rand.Intn(10)
switch r {
case 0:
return 10 * time.Millisecond
default:
return 0
}
func New(ctx context.Context, chance int, sleepDuration time.Duration) chan time.Duration {
pump := make(chan time.Duration, ChannelSize)
go work(ctx, pump, chance, sleepDuration)
return pump
}
2 changes: 1 addition & 1 deletion pkg/jobs/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (m *mutation) DDL(ctx context.Context, table *typedef.Table) error {

table.Lock()
defer table.Unlock()
ddlStmts, err := statements.GenDDLStmt(m.schema, table, m.random, m.partitionRangeConfig, m.schemaCfg)
ddlStmts, err := statements.GenDDLStmt(m.schema, table, m.random, &m.schema.Config)
if err != nil {
m.logger.Error("Failed! DDL Mutation statement generation failed", zap.Error(err))
m.globalStatus.WriteErrors.Add(1)
Expand Down
153 changes: 74 additions & 79 deletions pkg/jobs/jobs.go
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the main thing missing from this commit description is, what's wrong with the stopFlags

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple issues, it's too complex, makes gemini stuck sometimes when SIGTERM or SIGINT is sent and it's never tested, context.Context is in stdlib, fully tested and everybody in go ecosystem knows how to use it. TLDR makes gemini code shorted and easier to maintain without hiccups

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • too complex is a bit vague argument.
  • "everybody in go ecosystem knows how to use" - maybe I'm not yet that familiar of go ecosystem evaluate that statement.

we need to write all of it down, i.e. what the shortcoming of it, vs. what is the advantages of context.Context
preferably in the commit message / PR description

Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,32 @@ import (
"golang.org/x/exp/rand"
"golang.org/x/sync/errgroup"

"github.com/scylladb/gemini/pkg/burst"
"github.com/scylladb/gemini/pkg/generators"
"github.com/scylladb/gemini/pkg/status"
"github.com/scylladb/gemini/pkg/store"
"github.com/scylladb/gemini/pkg/typedef"
)

type Runner struct {
duration time.Duration
logger *zap.Logger
random *rand.Rand
stopFlag *stop.Flag
workers uint64
generators []*generators.Generator
schema *typedef.Schema
failFast bool
schemaCfg *typedef.SchemaConfig
warmup time.Duration
globalStatus *status.GlobalStatus
pump <-chan time.Duration
store store.Store
mode Mode
}

type Job interface {
Name() string
Do(context.Context, generators.Interface, *typedef.Table) error
}
type (
Runner struct {
duration time.Duration
logger *zap.Logger
random *rand.Rand
workers uint64
generators []*generators.Generator
schema *typedef.Schema
failFast bool
warmup time.Duration
globalStatus *status.GlobalStatus
store store.Store
mode Mode
}
Job interface {
Name() string
Do(context.Context, generators.Interface, *typedef.Table) error
}
)

func New(
mode string,
Expand All @@ -58,7 +57,6 @@ func New(
schema *typedef.Schema,
store store.Store,
globalStatus *status.GlobalStatus,
schemaCfg *typedef.SchemaConfig,
seed uint64,
gens []*generators.Generator,
failFast bool,
Expand All @@ -67,109 +65,106 @@ func New(
return &Runner{
warmup: warmup,
globalStatus: globalStatus,
pump: NewPump(stopFlag, logger.Named("Pump")),
store: store,
mode: ModeFromString(mode),
logger: logger,
schemaCfg: schemaCfg,
duration: duration,
workers: workers,
stopFlag: stopFlag,
failFast: failFast,
random: rand.New(rand.NewSource(seed)),
generators: gens,
schema: schema,
}
}

func (l *Runner) Name() string {
return "Runner"
}

func (l *Runner) Run(ctx context.Context) error {
ctx = l.stopFlag.CancelContextOnSignal(ctx, stop.SignalHardStop)
partitionRangeConfig := l.schemaCfg.GetPartitionRangeConfig()

l.logger.Info("start jobs")

if l.warmup > 0 {
l.logger.Info("Warmup Job Started",
zap.Int("duration", int(l.warmup.Seconds())),
zap.Int("workers", int(l.workers)),
)
time.AfterFunc(l.warmup, func() {
l.logger.Info("jobs time is up, begins jobs completion")
l.stopFlag.SetSoft(true)
})

warmup := func(_ <-chan time.Duration, rnd *rand.Rand) Job {
return NewWarmup(l.logger, l.schema, l.store, &partitionRangeConfig, l.globalStatus, l.schemaCfg, l.stopFlag, rnd, l.failFast)
}
warmupCtx, cancel := context.WithTimeout(ctx, l.warmup)
defer cancel()
l.startMutation(warmupCtx, cancel, l.random, "Warmup", false, false)
}

if err := l.start(ctx, warmup); err != nil {
return err
}
ctx, cancel := context.WithTimeout(ctx, l.duration+1*time.Second)
defer cancel()

src := rand.NewSource(l.random.Uint64())

if l.mode.IsRead() {
go l.startValidation(ctx, cancel, src)
}

time.AfterFunc(l.duration, func() {
l.logger.Info("jobs time is up, begins jobs completion")
l.stopFlag.SetSoft(true)
if l.mode.IsWrite() {
l.startMutation(ctx, cancel, src, "Mutation", true, true)
}

return nil
}

func (l *Runner) startMutation(ctx context.Context, cancel context.CancelFunc, src rand.Source, name string, deletes, ddl bool) {
logger := l.logger.Named(name)

err := l.start(ctx, rand.New(src), func(pump <-chan time.Duration, rnd *rand.Rand) Job {
return NewMutation(
logger,
l.schema,
l.store,
l.globalStatus,
rnd,
pump,
l.failFast,
deletes,
ddl,
)
})

if l.mode.IsWrite() {
return l.start(ctx, func(pump <-chan time.Duration, rnd *rand.Rand) Job {
return NewMutation(
l.logger.Named("Mutation"),
l.schema,
l.store,
&partitionRangeConfig,
l.globalStatus,
l.stopFlag,
rnd,
l.schemaCfg,
pump,
l.failFast,
)
})
if err != nil {
logger.Error("Mutation job failed", zap.Error(err))
if l.failFast {
cancel()
}
}
}

return l.start(ctx, func(pump <-chan time.Duration, rnd *rand.Rand) Job {
func (l *Runner) startValidation(ctx context.Context, cancel context.CancelFunc, src rand.Source) {
err := l.start(ctx, rand.New(src), func(pump <-chan time.Duration, rnd *rand.Rand) Job {
return NewValidation(
l.logger,
pump,
l.schema, l.schemaCfg,
l.schema,
l.store,
rnd,
&partitionRangeConfig,
l.globalStatus,
l.stopFlag,
l.failFast,
)
})

if err != nil {
l.logger.Error("Validation job failed", zap.Error(err))
if l.failFast {
cancel()
}
}
}

func (l *Runner) start(ctx context.Context, job func(<-chan time.Duration, *rand.Rand) Job) error {
func (l *Runner) start(ctx context.Context, rnd *rand.Rand, job func(<-chan time.Duration, *rand.Rand) Job) error {
g, gCtx := errgroup.WithContext(ctx)

g.SetLimit(int(l.workers))

partitionRangeConfig := l.schemaCfg.GetPartitionRangeConfig()

for j, table := range l.schema.Tables {
gen := l.generators[j]
pump := NewPump(l.stopFlag, l.logger.Named("Pump-"+table.Name))
rnd := rand.New(rand.NewSource(l.random.Uint64()))

v := NewValidation(l.logger, pump, l.schema, l.schemaCfg, l.store, rnd, &partitionRangeConfig, l.globalStatus, l.stopFlag, l.failFast)
j := job(pump, rnd)

g.TryGo(func() error {
return v.Do(gCtx, gen, table)
})
pump := burst.New(ctx, 10, 10*time.Millisecond)

for i := 0; i < int(l.workers)-1; i++ {
for range l.workers {
src := rand.NewSource(rnd.Uint64())
g.TryGo(func() error {
return j.Do(gCtx, gen, table)
return job(pump, rand.New(src)).Do(gCtx, gen, table)
})
}
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/jobs/mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ func (m Mode) IsWrite() bool {
return m[0] == WriteMode
}

func (m Mode) IsRead() bool {
return m[0] == ReadMode || m[1] == ReadMode
}

func ModeFromString(m string) Mode {
switch m {
case WriteMode:
Expand Down
Loading