Skip to content

Commit

Permalink
Merge pull request cockroachdb#36916 from RaduBerinde/backport19.1-36…
Browse files Browse the repository at this point in the history
…586-36824-36913

release-19.1: backport workload/tpcc improvements and fixes
  • Loading branch information
RaduBerinde authored Apr 18, 2019
2 parents 837e946 + a219693 commit 9938cb1
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 71 deletions.
4 changes: 2 additions & 2 deletions pkg/workload/cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func init() {
}

genInitCmd := SetCmdDefaults(&cobra.Command{
Use: meta.Name,
Use: meta.Name + " [pgurl...]",
Short: meta.Description,
Long: meta.Description + meta.Details,
Args: cobra.ArbitraryArgs,
Expand Down Expand Up @@ -111,7 +111,7 @@ func init() {
}

genRunCmd := SetCmdDefaults(&cobra.Command{
Use: meta.Name,
Use: meta.Name + " [pgurl...]",
Short: meta.Description,
Long: meta.Description + meta.Details,
Args: cobra.ArbitraryArgs,
Expand Down
5 changes: 4 additions & 1 deletion pkg/workload/indexes/indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ func (w *indexes) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoa
if err != nil {
return workload.QueryLoad{}, err
}
mcp, err := workload.NewMultiConnPool(w.connFlags.Concurrency+1, urls...)
cfg := workload.MultiConnPoolCfg{
MaxTotalConnections: w.connFlags.Concurrency + 1,
}
mcp, err := workload.NewMultiConnPool(cfg, urls...)
if err != nil {
return workload.QueryLoad{}, err
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/workload/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,10 @@ func (w *kv) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoad, er
if err != nil {
return workload.QueryLoad{}, err
}
mcp, err := workload.NewMultiConnPool(w.connFlags.Concurrency+1, urls...)
cfg := workload.MultiConnPoolCfg{
MaxTotalConnections: w.connFlags.Concurrency + 1,
}
mcp, err := workload.NewMultiConnPool(cfg, urls...)
if err != nil {
return workload.QueryLoad{}, err
}
Expand Down
148 changes: 108 additions & 40 deletions pkg/workload/pgx_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package workload
import (
"context"
gosql "database/sql"
"sync"
"sync/atomic"

"github.com/cockroachdb/cockroach-go/crdb"
Expand All @@ -32,52 +33,93 @@ type MultiConnPool struct {
counter uint32
}

// NewMultiConnPool creates a new MultiConnPool (with one pool per url).
// MultiConnPoolCfg encapsulates the knobs passed to NewMultiConnPool.
type MultiConnPoolCfg struct {
// MaxTotalConnections is the total maximum number of connections across all
// pools.
MaxTotalConnections int

// MaxConnsPerPool is the maximum number of connections in any single pool.
// Limiting this is useful especially for prepared statements, which are
// prepared on each connection inside a pool (serially).
// If 0, there is no per-pool maximum (other than the total maximum number of
// connections which still applies).
MaxConnsPerPool int
}

// NewMultiConnPool creates a new MultiConnPool.
//
// Each URL gets one or more pools, and each pool has at most MaxConnsPerPool
// connections.
//
// The pools have approximately the same number of max connections, adding up to
// maxTotalConnections.
func NewMultiConnPool(maxTotalConnections int, urls ...string) (*MultiConnPool, error) {
m := &MultiConnPool{
Pools: make([]*pgx.ConnPool, len(urls)),
// MaxTotalConnections.
func NewMultiConnPool(cfg MultiConnPoolCfg, urls ...string) (*MultiConnPool, error) {
m := &MultiConnPool{}
connsPerURL := distribute(cfg.MaxTotalConnections, len(urls))
maxConnsPerPool := cfg.MaxConnsPerPool
if maxConnsPerPool == 0 {
maxConnsPerPool = cfg.MaxTotalConnections
}

var warmupConns [][]*pgx.Conn
for i := range urls {
cfg, err := pgx.ParseConnectionString(urls[i])
connCfg, err := pgx.ParseConnectionString(urls[i])
if err != nil {
return nil, err
}
// Use the average number of remaining connections (this handles
// rounding).
numConn := maxTotalConnections / (len(urls) - i)
maxTotalConnections -= numConn
p, err := pgx.NewConnPool(pgx.ConnPoolConfig{
ConnConfig: cfg,
MaxConnections: numConn,
})
if err != nil {
return nil, err

connsPerPool := distributeMax(connsPerURL[i], maxConnsPerPool)
for _, numConns := range connsPerPool {
p, err := pgx.NewConnPool(pgx.ConnPoolConfig{
ConnConfig: connCfg,
MaxConnections: numConns,
})
if err != nil {
return nil, err
}

warmupConns = append(warmupConns, make([]*pgx.Conn, numConns))
m.Pools = append(m.Pools, p)
}
}

// "Warm up" the pool so we don't have to establish connections later (which
// would affect the observed latencies of the first requests). We do this by
// acquiring all connections (in parallel), then releasing them back to the
// pool.
conns := make([]*pgx.Conn, numConn)
var g errgroup.Group
for i := range conns {
i := i
// "Warm up" the pools so we don't have to establish connections later (which
// would affect the observed latencies of the first requests, especially when
// prepared statements are used). We do this by
// acquiring connections (in parallel), then releasing them back to the
// pool.
var g errgroup.Group
// Limit concurrent connection establishment. Allowing this to run
// at maximum parallelism would trigger syn flood protection on the
// host, which combined with any packet loss could cause Acquire to
// return an error and fail the whole function. The value 100 is
// chosen because it is less than the default value for SOMAXCONN
// (128).
sem := make(chan struct{}, 100)
for i, p := range m.Pools {
p := p
conns := warmupConns[i]
for j := range conns {
j := j
sem <- struct{}{}
g.Go(func() error {
conns[i], err = p.Acquire()
var err error
conns[j], err = p.Acquire()
<-sem
return err
})
}
if err := g.Wait(); err != nil {
return nil, err
}
for _, c := range conns {
}
if err := g.Wait(); err != nil {
return nil, err
}
for i, p := range m.Pools {
for _, c := range warmupConns[i] {
p.Release(c)
}

m.Pools[i] = p
}

return m, nil
}

Expand All @@ -94,17 +136,23 @@ func (m *MultiConnPool) Get() *pgx.ConnPool {
func (m *MultiConnPool) PrepareEx(
ctx context.Context, name, sql string, opts *pgx.PrepareExOptions,
) (*pgx.PreparedStatement, error) {
var ps *pgx.PreparedStatement
var res *pgx.PreparedStatement
var once sync.Once
var g errgroup.Group
for _, p := range m.Pools {
var err error
ps, err = p.PrepareEx(ctx, name, sql, opts)
if err != nil {
return nil, err
}
p := p
g.Go(func() error {
ps, err := p.PrepareEx(ctx, name, sql, opts)
if err == nil {
// It doesn't matter which PreparedStatement we return, they should
// contain the same information.
once.Do(func() { res = ps })
}
return err
})
}
// It doesn't matter which PreparedStatement we return, they should be the
// same.
return ps, nil
err := g.Wait()
return res, err
}

// Close closes all the pools.
Expand Down Expand Up @@ -138,3 +186,23 @@ func (tx *PgxTx) Commit() error {
func (tx *PgxTx) Rollback() error {
return (*pgx.Tx)(tx).Rollback()
}

// distribute returns a slice of <num> integers that add up to <total> and are
// within +/-1 of each other.
func distribute(total, num int) []int {
res := make([]int, num)
for i := range res {
// Use the average number of remaining connections.
div := len(res) - i
res[i] = (total + div/2) / div
total -= res[i]
}
return res
}

// distributeMax returns a slice of integers that are at most `max` and add up
// to <total>. The slice is as short as possible and the values are within +/-1
// of each other.
func distributeMax(total, max int) []int {
return distribute(total, (total+max-1)/max)
}
49 changes: 49 additions & 0 deletions pkg/workload/pgx_helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2018 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.

package workload

import (
"testing"

"github.com/cockroachdb/cockroach/pkg/util/leaktest"
)

func TestDistribute(t *testing.T) {
defer leaktest.AfterTest(t)()

for _, total := range []int{0, 1, 2, 5, 10, 17, 25} {
for _, num := range []int{1, 2, 3, 4, 5, 8, 13, 15} {
d := distribute(total, num)
// Verify the sum is correct and that the variance is no more than 1.
min, max, sum := d[0], d[0], d[0]
for i := 1; i < len(d); i++ {
sum += d[i]
if min > d[i] {
min = d[i]
}
if max < d[i] {
max = d[i]
}
}
if sum != total {
t.Errorf("%d / %d: incorrect sum %d", total, num, sum)
}
if max > min+1 {
t.Errorf("%d / %d: min value %d, max value %d", total, num, min, max)
}
}
}
}
Loading

0 comments on commit 9938cb1

Please sign in to comment.