Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

roachtest: create simple tpce workload driver #99810

Merged
merged 1 commit into from
Apr 6, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 83 additions & 22 deletions pkg/cmd/roachtest/tests/tpce.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,82 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

type tpceSpec struct {
loadNode int
roachNodes option.NodeListOption
roachNodeIPFlags []string
}

type tpceCmdOptions struct {
customers int
racks int
duration time.Duration
threads int
}

func (to tpceCmdOptions) AddCommandOptions(cmd *roachtestutil.Command) {
cmd.MaybeFlag(to.customers != 0, "customers", to.customers)
cmd.MaybeFlag(to.racks != 0, "racks", to.racks)
cmd.MaybeFlag(to.duration != 0, "duration", to.duration)
cmd.MaybeFlag(to.threads != 0, "threads", to.threads)
}

func initTPCESpec(
ctx context.Context,
l *logger.Logger,
c cluster.Cluster,
loadNode int,
roachNodes option.NodeListOption,
) (*tpceSpec, error) {
l.Printf("Installing docker")
if err := c.Install(ctx, l, c.Nodes(loadNode), "docker"); err != nil {
return nil, err
}
roachNodeIPs, err := c.InternalIP(ctx, l, roachNodes)
if err != nil {
return nil, err
}
roachNodeIPFlags := make([]string, len(roachNodeIPs))
for i, ip := range roachNodeIPs {
roachNodeIPFlags[i] = fmt.Sprintf("--hosts=%s", ip)
}
return &tpceSpec{
loadNode: loadNode,
roachNodes: roachNodes,
roachNodeIPFlags: roachNodeIPFlags,
}, nil
}

func (ts *tpceSpec) newCmd(o tpceCmdOptions) *roachtestutil.Command {
cmd := roachtestutil.NewCommand(`sudo docker run cockroachdb/tpc-e:latest`)
o.AddCommandOptions(cmd)
return cmd
}

// init initializes an empty cluster with a tpce database. This includes a bulk
// import of the data and schema creation.
func (ts *tpceSpec) init(ctx context.Context, t test.Test, c cluster.Cluster, o tpceCmdOptions) {
cmd := ts.newCmd(o).Option("init")
c.Run(ctx, c.Node(ts.loadNode), fmt.Sprintf("%s %s", cmd, ts.roachNodeIPFlags[0]))
}

// run runs the tpce workload on cluster that has been initialized with the tpce schema.
func (ts *tpceSpec) run(
ctx context.Context, t test.Test, c cluster.Cluster, o tpceCmdOptions,
) (install.RunResultDetails, error) {
cmd := fmt.Sprintf("%s %s", ts.newCmd(o), strings.Join(ts.roachNodeIPFlags, " "))
return c.RunWithDetailsSingleNode(ctx, t.L(), c.Node(ts.loadNode), cmd)
}

func registerTPCE(r registry.Registry) {
type tpceOptions struct {
owner registry.Owner // defaults to test-eng
Expand All @@ -39,7 +109,7 @@ func registerTPCE(r registry.Registry) {

runTPCE := func(ctx context.Context, t test.Test, c cluster.Cluster, opts tpceOptions) {
roachNodes := c.Range(1, opts.nodes)
loadNode := c.Node(opts.nodes + 1)
loadNode := opts.nodes + 1
racks := opts.nodes

t.Status("installing cockroach")
Expand All @@ -50,10 +120,8 @@ func registerTPCE(r registry.Registry) {
settings := install.MakeClusterSettings(install.NumRacksOption(racks))
c.Start(ctx, t.L(), startOpts, settings, roachNodes)

t.Status("installing docker")
if err := c.Install(ctx, t.L(), loadNode, "docker"); err != nil {
t.Fatal(err)
}
tpceSpec, err := initTPCESpec(ctx, t.L(), c, loadNode, roachNodes)
require.NoError(t, err)

// Configure to increase the speed of the import.
func() {
Expand All @@ -73,27 +141,20 @@ func registerTPCE(r registry.Registry) {

m := c.NewMonitor(ctx, roachNodes)
m.Go(func(ctx context.Context) error {
const dockerRun = `sudo docker run cockroachdb/tpc-e:latest`

roachNodeIPs, err := c.InternalIP(ctx, t.L(), roachNodes)
if err != nil {
return err
}
roachNodeIPFlags := make([]string, len(roachNodeIPs))
for i, ip := range roachNodeIPs {
roachNodeIPFlags[i] = fmt.Sprintf("--hosts=%s", ip)
}

t.Status("preparing workload")
c.Run(ctx, loadNode, fmt.Sprintf("%s --customers=%d --racks=%d --init %s",
dockerRun, opts.customers, racks, roachNodeIPFlags[0]))
tpceSpec.init(ctx, t, c, tpceCmdOptions{
customers: opts.customers,
racks: racks,
})

t.Status("running workload")
duration := 2 * time.Hour
threads := opts.nodes * opts.cpus
result, err := c.RunWithDetailsSingleNode(ctx, t.L(), loadNode,
fmt.Sprintf("%s --customers=%d --racks=%d --duration=%s --threads=%d %s",
dockerRun, opts.customers, racks, duration, threads, strings.Join(roachNodeIPFlags, " ")))
result, err := tpceSpec.run(ctx, t, c, tpceCmdOptions{
customers: opts.customers,
racks: racks,
duration: 2 * time.Hour,
threads: opts.nodes * opts.cpus,
})
if err != nil {
t.Fatal(err.Error())
}
Expand Down