From b59542b0230c10830d71b0576380d950401d7980 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Fri, 5 Apr 2019 12:22:55 -0400 Subject: [PATCH 1/3] workload: improve setup time for tpcc This change makes the following improvements: - TPCC connection pools are now set up in parallel (instead of one server URL at a time); - We break up large connection pools into smaller pools and parallelize statement preparation among them. This reduces statement preparation time (inside each pool, preparation happens serially on each connection). - Workers are now created in parallel (which results in more parallelism for prepared statements). We also add a `--conns` flag to control the number of connections (`2W` by default). For a lot of warehouses, it may be a good idea to reduce this value. Ran some tests on 1000 warehouses and 3 nodes, set up time went down from 30s to 7s. On larger clusters we should see even more speedup. Release note: None --- pkg/workload/cli/run.go | 4 +- pkg/workload/indexes/indexes.go | 5 +- pkg/workload/kv/kv.go | 5 +- pkg/workload/pgx_helpers.go | 139 ++++++++++++++++++++++--------- pkg/workload/pgx_helpers_test.go | 49 +++++++++++ pkg/workload/tpcc/tpcc.go | 86 +++++++++++++------ 6 files changed, 217 insertions(+), 71 deletions(-) create mode 100644 pkg/workload/pgx_helpers_test.go diff --git a/pkg/workload/cli/run.go b/pkg/workload/cli/run.go index 41b1bb188493..59e7f24472ce 100644 --- a/pkg/workload/cli/run.go +++ b/pkg/workload/cli/run.go @@ -76,7 +76,7 @@ func init() { } genInitCmd := SetCmdDefaults(&cobra.Command{ - Use: meta.Name, + Use: meta.Name + " [pgurl...]", Short: meta.Description, Long: meta.Description + meta.Details, Args: cobra.ArbitraryArgs, @@ -111,7 +111,7 @@ func init() { } genRunCmd := SetCmdDefaults(&cobra.Command{ - Use: meta.Name, + Use: meta.Name + " [pgurl...]", Short: meta.Description, Long: meta.Description + meta.Details, Args: cobra.ArbitraryArgs, diff --git a/pkg/workload/indexes/indexes.go b/pkg/workload/indexes/indexes.go index 6dd3f992b194..679963ddbcdd 100644 --- a/pkg/workload/indexes/indexes.go +++ b/pkg/workload/indexes/indexes.go @@ -136,7 +136,10 @@ func (w *indexes) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoa if err != nil { return workload.QueryLoad{}, err } - mcp, err := workload.NewMultiConnPool(w.connFlags.Concurrency+1, urls...) + cfg := workload.MultiConnPoolCfg{ + MaxTotalConnections: w.connFlags.Concurrency + 1, + } + mcp, err := workload.NewMultiConnPool(cfg, urls...) if err != nil { return workload.QueryLoad{}, err } diff --git a/pkg/workload/kv/kv.go b/pkg/workload/kv/kv.go index 4e68f0a5bce6..43da643f059f 100644 --- a/pkg/workload/kv/kv.go +++ b/pkg/workload/kv/kv.go @@ -204,7 +204,10 @@ func (w *kv) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoad, er if err != nil { return workload.QueryLoad{}, err } - mcp, err := workload.NewMultiConnPool(w.connFlags.Concurrency+1, urls...) + cfg := workload.MultiConnPoolCfg{ + MaxTotalConnections: w.connFlags.Concurrency + 1, + } + mcp, err := workload.NewMultiConnPool(cfg, urls...) if err != nil { return workload.QueryLoad{}, err } diff --git a/pkg/workload/pgx_helpers.go b/pkg/workload/pgx_helpers.go index a7d835aa7ae7..05e04ff290a8 100644 --- a/pkg/workload/pgx_helpers.go +++ b/pkg/workload/pgx_helpers.go @@ -18,6 +18,7 @@ package workload import ( "context" gosql "database/sql" + "sync" "sync/atomic" "github.com/cockroachdb/cockroach-go/crdb" @@ -32,52 +33,84 @@ type MultiConnPool struct { counter uint32 } -// NewMultiConnPool creates a new MultiConnPool (with one pool per url). +// MultiConnPoolCfg encapsulates the knobs passed to NewMultiConnPool. +type MultiConnPoolCfg struct { + // MaxTotalConnections is the total maximum number of connections across all + // pools. + MaxTotalConnections int + + // MaxConnsPerPool is the maximum number of connections in any single pool. + // Limiting this is useful especially for prepared statements, which are + // prepared on each connection inside a pool (serially). + // If 0, there is no per-pool maximum (other than the total maximum number of + // connections which still applies). + MaxConnsPerPool int +} + +// NewMultiConnPool creates a new MultiConnPool. +// +// Each URL gets one or more pools, and each pool has at most MaxConnsPerPool +// connections. +// // The pools have approximately the same number of max connections, adding up to -// maxTotalConnections. -func NewMultiConnPool(maxTotalConnections int, urls ...string) (*MultiConnPool, error) { - m := &MultiConnPool{ - Pools: make([]*pgx.ConnPool, len(urls)), +// MaxTotalConnections. +func NewMultiConnPool(cfg MultiConnPoolCfg, urls ...string) (*MultiConnPool, error) { + m := &MultiConnPool{} + connsPerURL := distribute(cfg.MaxTotalConnections, len(urls)) + maxConnsPerPool := cfg.MaxConnsPerPool + if maxConnsPerPool == 0 { + maxConnsPerPool = cfg.MaxTotalConnections } + + var warmupConns [][]*pgx.Conn for i := range urls { - cfg, err := pgx.ParseConnectionString(urls[i]) + connCfg, err := pgx.ParseConnectionString(urls[i]) if err != nil { return nil, err } - // Use the average number of remaining connections (this handles - // rounding). - numConn := maxTotalConnections / (len(urls) - i) - maxTotalConnections -= numConn - p, err := pgx.NewConnPool(pgx.ConnPoolConfig{ - ConnConfig: cfg, - MaxConnections: numConn, - }) - if err != nil { - return nil, err + + connsPerPool := distributeMax(connsPerURL[i], maxConnsPerPool) + for _, numConns := range connsPerPool { + p, err := pgx.NewConnPool(pgx.ConnPoolConfig{ + ConnConfig: connCfg, + MaxConnections: numConns, + }) + if err != nil { + return nil, err + } + + warmupConns = append(warmupConns, make([]*pgx.Conn, numConns)) + m.Pools = append(m.Pools, p) } + } - // "Warm up" the pool so we don't have to establish connections later (which - // would affect the observed latencies of the first requests). We do this by - // acquiring all connections (in parallel), then releasing them back to the - // pool. - conns := make([]*pgx.Conn, numConn) - var g errgroup.Group - for i := range conns { - i := i + // "Warm up" the pools so we don't have to establish connections later (which + // would affect the observed latencies of the first requests, especially when + // prepared statements are used). We do this by + // acquiring connections (in parallel), then releasing them back to the + // pool. + var g errgroup.Group + for i, p := range m.Pools { + p := p + conns := warmupConns[i] + for j := range conns { + j := j g.Go(func() error { - conns[i], err = p.Acquire() + var err error + conns[j], err = p.Acquire() return err }) } - if err := g.Wait(); err != nil { - return nil, err - } - for _, c := range conns { + } + if err := g.Wait(); err != nil { + return nil, err + } + for i, p := range m.Pools { + for _, c := range warmupConns[i] { p.Release(c) } - - m.Pools[i] = p } + return m, nil } @@ -94,17 +127,23 @@ func (m *MultiConnPool) Get() *pgx.ConnPool { func (m *MultiConnPool) PrepareEx( ctx context.Context, name, sql string, opts *pgx.PrepareExOptions, ) (*pgx.PreparedStatement, error) { - var ps *pgx.PreparedStatement + var res *pgx.PreparedStatement + var once sync.Once + var g errgroup.Group for _, p := range m.Pools { - var err error - ps, err = p.PrepareEx(ctx, name, sql, opts) - if err != nil { - return nil, err - } + p := p + g.Go(func() error { + ps, err := p.PrepareEx(ctx, name, sql, opts) + if err == nil { + // It doesn't matter which PreparedStatement we return, they should + // contain the same information. + once.Do(func() { res = ps }) + } + return err + }) } - // It doesn't matter which PreparedStatement we return, they should be the - // same. - return ps, nil + err := g.Wait() + return res, err } // Close closes all the pools. @@ -138,3 +177,23 @@ func (tx *PgxTx) Commit() error { func (tx *PgxTx) Rollback() error { return (*pgx.Tx)(tx).Rollback() } + +// distribute returns a slice of integers that add up to and are +// within +/-1 of each other. +func distribute(total, num int) []int { + res := make([]int, num) + for i := range res { + // Use the average number of remaining connections. + div := len(res) - i + res[i] = (total + div/2) / div + total -= res[i] + } + return res +} + +// distributeMax returns a slice of integers that are at most `max` and add up +// to . The slice is as short as possible and the values are within +/-1 +// of each other. +func distributeMax(total, max int) []int { + return distribute(total, (total+max-1)/max) +} diff --git a/pkg/workload/pgx_helpers_test.go b/pkg/workload/pgx_helpers_test.go new file mode 100644 index 000000000000..ca3c7ca82ddf --- /dev/null +++ b/pkg/workload/pgx_helpers_test.go @@ -0,0 +1,49 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. See the AUTHORS file +// for names of contributors. + +package workload + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/util/leaktest" +) + +func TestDistribute(t *testing.T) { + defer leaktest.AfterTest(t)() + + for _, total := range []int{0, 1, 2, 5, 10, 17, 25} { + for _, num := range []int{1, 2, 3, 4, 5, 8, 13, 15} { + d := distribute(total, num) + // Verify the sum is correct and that the variance is no more than 1. + min, max, sum := d[0], d[0], d[0] + for i := 1; i < len(d); i++ { + sum += d[i] + if min > d[i] { + min = d[i] + } + if max < d[i] { + max = d[i] + } + } + if sum != total { + t.Errorf("%d / %d: incorrect sum %d", total, num, sum) + } + if max > min+1 { + t.Errorf("%d / %d: min value %d, max value %d", total, num, min, max) + } + } + } +} diff --git a/pkg/workload/tpcc/tpcc.go b/pkg/workload/tpcc/tpcc.go index c440f1dbe6a3..02deea44d1c3 100644 --- a/pkg/workload/tpcc/tpcc.go +++ b/pkg/workload/tpcc/tpcc.go @@ -35,6 +35,7 @@ import ( "github.com/pkg/errors" "github.com/spf13/pflag" "golang.org/x/exp/rand" + "golang.org/x/sync/errgroup" ) type tpcc struct { @@ -46,6 +47,7 @@ type tpcc struct { activeWarehouses int interleaved bool nowString string + numConns int mix string doWaits bool @@ -114,6 +116,7 @@ var tpccMeta = workload.Meta{ `split`: {RuntimeOnly: true}, `wait`: {RuntimeOnly: true}, `workers`: {RuntimeOnly: true}, + `conns`: {RuntimeOnly: true}, `zones`: {RuntimeOnly: true}, `active-warehouses`: {RuntimeOnly: true}, `expensive-checks`: {RuntimeOnly: true, CheckConsistencyOnly: true}, @@ -129,11 +132,19 @@ var tpccMeta = workload.Meta{ g.flags.StringVar(&g.mix, `mix`, `newOrder=10,payment=10,orderStatus=1,delivery=1,stockLevel=1`, `Weights for the transaction mix. The default matches the TPCC spec.`) + g.flags.BoolVar(&g.doWaits, `wait`, true, `Run in wait mode (include think/keying sleeps)`) g.flags.StringVar(&g.dbOverride, `db`, ``, `Override for the SQL database to use. If empty, defaults to the generator name`) - g.flags.IntVar(&g.workers, `workers`, 0, - `Number of concurrent workers. Defaults to --warehouses * 10`) + + g.flags.IntVar(&g.workers, `workers`, 0, fmt.Sprintf( + `Number of concurrent workers. Defaults to --warehouses * %d`, numWorkersPerWarehouse, + )) + g.flags.IntVar(&g.numConns, `conns`, 0, fmt.Sprintf( + `Number of connections. Defaults to --warehouses * %d (except in nowait mode, where it defaults to --workers`, + numConnsPerWarehouse, + )) + g.flags.BoolVar(&g.fks, `fks`, true, `Add the foreign keys`) g.flags.IntVar(&g.partitions, `partitions`, 1, `Partition tables (requires split)`) g.flags.IntVar(&g.affinityPartition, `partition-affinity`, -1, `Run load generator against specific partition (requires partitions)`) @@ -189,6 +200,18 @@ func (w *tpcc) Hooks() workload.Hooks { w.workers = w.activeWarehouses * numWorkersPerWarehouse } + if w.numConns == 0 { + // If we're not waiting, open up a connection for each worker. If we are + // waiting, we only use up to a set number of connections per warehouse. + // This isn't mandated by the spec, but opening a connection per worker + // when they each spend most of their time waiting is wasteful. + if !w.doWaits { + w.numConns = w.workers + } else { + w.numConns = w.activeWarehouses * numConnsPerWarehouse + } + } + if w.doWaits && w.workers != w.activeWarehouses*numWorkersPerWarehouse { return errors.Errorf(`--wait=true and --warehouses=%d requires --workers=%d`, w.activeWarehouses, w.warehouses*numWorkersPerWarehouse) @@ -470,26 +493,27 @@ func (w *tpcc) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoad, w.reg = reg w.usePostgres = parsedURL.Port() == "5432" - // If we're not waiting, open up a connection for each worker. If we are - // waiting, we only use up to a set number of connections per warehouse. - // This isn't mandated by the spec, but opening a connection per worker - // when they each spend most of their time waiting is wasteful. - nConns := w.workers - if w.doWaits { - nConns = w.activeWarehouses * numConnsPerWarehouse - } - // We can't use a single MultiConnPool because we want to implement partition - // affinity. Instead we have one MultiConnPool per server (we use - // MultiConnPool in order to use SQLRunner, but it's otherwise equivalent to a - // pgx.ConnPool). - nConnsPerURL := (nConns + len(urls) - 1) / len(urls) // round up + // affinity. Instead we have one MultiConnPool per server. + cfg := workload.MultiConnPoolCfg{ + MaxTotalConnections: (w.numConns + len(urls) - 1) / len(urls), // round up + // Limit the number of connections per pool (otherwise preparing statements + // at startup can be slow). + MaxConnsPerPool: 50, + } + fmt.Printf("Initializing %d connections...\n", w.numConns) dbs := make([]*workload.MultiConnPool, len(urls)) - for i, url := range urls { - dbs[i], err = workload.NewMultiConnPool(nConnsPerURL, url) - if err != nil { - return workload.QueryLoad{}, err - } + var g errgroup.Group + for i := range urls { + i := i + g.Go(func() error { + var err error + dbs[i], err = workload.NewMultiConnPool(cfg, urls[i]) + return err + }) + } + if err := g.Wait(); err != nil { + return workload.QueryLoad{}, err } // Create a partitioner to help us partition the warehouses. The base-case is @@ -537,8 +561,12 @@ func (w *tpcc) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoad, } } + fmt.Printf("Initializing %d workers and preparing statements...\n", w.workers) ql := workload.QueryLoad{SQLDatabase: sqlDatabase} - for workerIdx := 0; workerIdx < w.workers; workerIdx++ { + ql.WorkerFns = make([]func(context.Context) error, w.workers) + var group errgroup.Group + for workerIdx := range ql.WorkerFns { + workerIdx := workerIdx warehouse := w.wPart.totalElems[workerIdx%len(w.wPart.totalElems)] p := w.wPart.partElemsMap[warehouse] @@ -549,12 +577,16 @@ func (w *tpcc) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoad, dbs := partitionDBs[p] db := dbs[warehouse%len(dbs)] - worker, err := newWorker(context.TODO(), w, db, reg.GetHandle(), warehouse) - if err != nil { - return workload.QueryLoad{}, err - } - - ql.WorkerFns = append(ql.WorkerFns, worker.run) + group.Go(func() error { + worker, err := newWorker(context.TODO(), w, db, reg.GetHandle(), warehouse) + if err == nil { + ql.WorkerFns[workerIdx] = worker.run + } + return err + }) + } + if err := group.Wait(); err != nil { + return workload.QueryLoad{}, err } // Preregister all of the histograms so they always print. for _, tx := range allTxs { From 76e44eef6e93e5ed05065afdf6d25c0a5d0c5d51 Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Sat, 13 Apr 2019 11:11:13 -0400 Subject: [PATCH 2/3] roachtest: Rate limit connection establishment Acquiring connections with unlimited parallelism triggers syn flood protection on the server and can sometimes lead to failures. This was especially common in tests using the tpcc-1000 dataset (which also creates a large number of connections. Failures fixed by this change look like stdout: Error: read tcp 10.142.0.43:37862->10.142.0.112:26257: read: connection reset by peer Error: exit status 1 Fixes #36745 Release note: None --- pkg/workload/pgx_helpers.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/workload/pgx_helpers.go b/pkg/workload/pgx_helpers.go index 05e04ff290a8..bdff36887039 100644 --- a/pkg/workload/pgx_helpers.go +++ b/pkg/workload/pgx_helpers.go @@ -90,14 +90,23 @@ func NewMultiConnPool(cfg MultiConnPoolCfg, urls ...string) (*MultiConnPool, err // acquiring connections (in parallel), then releasing them back to the // pool. var g errgroup.Group + // Limit concurrent connection establishment. Allowing this to run + // at maximum parallism would trigger syn flood protection on the + // host, which combined with any packet loss could cause Acquire to + // return an error and fail the whole function. The value 100 is + // chosen because it is less than the default value for SOMAXCONN + // (128). + sem := make(chan struct{}, 100) for i, p := range m.Pools { p := p conns := warmupConns[i] for j := range conns { j := j g.Go(func() error { + sem <- struct{}{} var err error conns[j], err = p.Acquire() + <-sem return err }) } From a21969342f1191b09fd251d8abb8a720a61ac955 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Wed, 17 Apr 2019 10:46:42 -0400 Subject: [PATCH 3/3] workload/tpcc: limit parallelism for worker initialization Add a semaphore to avoid initializing all workers in parallel - for 10k warehouses, we have 100k workers and that can lead to OOM. Fixes #36897. Release note: None --- pkg/workload/pgx_helpers.go | 4 ++-- pkg/workload/tpcc/tpcc.go | 5 +++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/workload/pgx_helpers.go b/pkg/workload/pgx_helpers.go index bdff36887039..b62f76ec302c 100644 --- a/pkg/workload/pgx_helpers.go +++ b/pkg/workload/pgx_helpers.go @@ -91,7 +91,7 @@ func NewMultiConnPool(cfg MultiConnPoolCfg, urls ...string) (*MultiConnPool, err // pool. var g errgroup.Group // Limit concurrent connection establishment. Allowing this to run - // at maximum parallism would trigger syn flood protection on the + // at maximum parallelism would trigger syn flood protection on the // host, which combined with any packet loss could cause Acquire to // return an error and fail the whole function. The value 100 is // chosen because it is less than the default value for SOMAXCONN @@ -102,8 +102,8 @@ func NewMultiConnPool(cfg MultiConnPoolCfg, urls ...string) (*MultiConnPool, err conns := warmupConns[i] for j := range conns { j := j + sem <- struct{}{} g.Go(func() error { - sem <- struct{}{} var err error conns[j], err = p.Acquire() <-sem diff --git a/pkg/workload/tpcc/tpcc.go b/pkg/workload/tpcc/tpcc.go index 02deea44d1c3..eb86b8fa70c3 100644 --- a/pkg/workload/tpcc/tpcc.go +++ b/pkg/workload/tpcc/tpcc.go @@ -565,6 +565,9 @@ func (w *tpcc) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoad, ql := workload.QueryLoad{SQLDatabase: sqlDatabase} ql.WorkerFns = make([]func(context.Context) error, w.workers) var group errgroup.Group + // Limit the amount of workers we initialize in parallel, to avoid running out + // of memory (#36897). + sem := make(chan struct{}, 100) for workerIdx := range ql.WorkerFns { workerIdx := workerIdx warehouse := w.wPart.totalElems[workerIdx%len(w.wPart.totalElems)] @@ -577,11 +580,13 @@ func (w *tpcc) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoad, dbs := partitionDBs[p] db := dbs[warehouse%len(dbs)] + sem <- struct{}{} group.Go(func() error { worker, err := newWorker(context.TODO(), w, db, reg.GetHandle(), warehouse) if err == nil { ql.WorkerFns[workerIdx] = worker.run } + <-sem return err }) }