Skip to content

Commit

Permalink
Merge pull request #1113 from loadimpact/fix/influxdbUsingTooMuchMemory
Browse files Browse the repository at this point in the history
Fix InfluxDB using too much memory
  • Loading branch information
mstoykov authored Sep 3, 2019
2 parents 2dbf695 + 04e1379 commit d385166
Show file tree
Hide file tree
Showing 23 changed files with 639 additions and 358 deletions.
12 changes: 6 additions & 6 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

59 changes: 59 additions & 0 deletions stats/influxdb/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package influxdb

import (
"io"
"io/ioutil"
"net/http"
"testing"
"time"

"github.com/loadimpact/k6/stats"
)

func benchmarkInfluxdb(b *testing.B, t time.Duration) {
testCollectorCycle(b, func(rw http.ResponseWriter, r *http.Request) {
for {
time.Sleep(t)
m, _ := io.CopyN(ioutil.Discard, r.Body, 1<<18) // read 1/4 mb a time
if m == 0 {
break
}
}
rw.WriteHeader(204)
}, func(tb testing.TB, c *Collector) {
b = tb.(*testing.B)
b.ResetTimer()

var samples = make(stats.Samples, 10)
for i := 0; i < len(samples); i++ {
samples[i] = stats.Sample{
Metric: stats.New("testGauge", stats.Gauge),
Time: time.Now(),
Tags: stats.NewSampleTags(map[string]string{
"something": "else",
"VU": "21",
"else": "something",
}),
Value: 2.0,
}
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
c.Collect([]stats.SampleContainer{samples})
time.Sleep(time.Nanosecond * 20)
}
})
}

func BenchmarkInfluxdb1Second(b *testing.B) {
benchmarkInfluxdb(b, time.Second)
}

func BenchmarkInfluxdb2Second(b *testing.B) {
benchmarkInfluxdb(b, 2*time.Second)
}

func BenchmarkInfluxdb100Milliseconds(b *testing.B) {
benchmarkInfluxdb(b, 100*time.Millisecond)
}
43 changes: 27 additions & 16 deletions stats/influxdb/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,14 @@ package influxdb

import (
"context"
"errors"
"sync"
"time"

"github.com/influxdata/influxdb/client/v2"
"github.com/sirupsen/logrus"

client "github.com/influxdata/influxdb1-client/v2"
"github.com/loadimpact/k6/lib"
"github.com/loadimpact/k6/stats"
)

const (
pushInterval = 1 * time.Second
"github.com/sirupsen/logrus"
)

// Verify that Collector implements lib.Collector
Expand All @@ -44,8 +40,10 @@ type Collector struct {
Config Config
BatchConf client.BatchPointsConfig

buffer []stats.Sample
bufferLock sync.Mutex
buffer []stats.Sample
bufferLock sync.Mutex
wg sync.WaitGroup
semaphoreCh chan struct{}
}

func New(conf Config) (*Collector, error) {
Expand All @@ -54,10 +52,14 @@ func New(conf Config) (*Collector, error) {
return nil, err
}
batchConf := MakeBatchConfig(conf)
if conf.ConcurrentWrites.Int64 <= 0 {
return nil, errors.New("influxdb's ConcurrentWrites must be a positive number")
}
return &Collector{
Client: cl,
Config: conf,
BatchConf: batchConf,
Client: cl,
Config: conf,
BatchConf: batchConf,
semaphoreCh: make(chan struct{}, conf.ConcurrentWrites.Int64),
}, nil
}

Expand All @@ -74,13 +76,16 @@ func (c *Collector) Init() error {

func (c *Collector) Run(ctx context.Context) {
logrus.Debug("InfluxDB: Running!")
ticker := time.NewTicker(pushInterval)
ticker := time.NewTicker(time.Duration(c.Config.PushInterval.Duration))
for {
select {
case <-ticker.C:
c.commit()
c.wg.Add(1)
go c.commit()
case <-ctx.Done():
c.commit()
c.wg.Add(1)
go c.commit()
c.wg.Wait()
return
}
}
Expand All @@ -99,12 +104,18 @@ func (c *Collector) Link() string {
}

func (c *Collector) commit() {
defer c.wg.Done()
c.bufferLock.Lock()
samples := c.buffer
c.buffer = nil
c.bufferLock.Unlock()

// let first get the data and then wait our turn
c.semaphoreCh <- struct{}{}
defer func() {
<-c.semaphoreCh
}()
logrus.Debug("InfluxDB: Committing...")
logrus.WithField("samples", len(samples)).Debug("InfluxDB: Writing...")

batch, err := c.batchFromSamples(samples)
if err != nil {
Expand Down
118 changes: 118 additions & 0 deletions stats/influxdb/collector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package influxdb

import (
"bytes"
"context"
"io"
"net"
"net/http"
"sync"
"testing"
"time"

"github.com/loadimpact/k6/stats"
"github.com/stretchr/testify/require"
null "gopkg.in/guregu/null.v3"
)

func TestBadConcurrentWrites(t *testing.T) {
c := NewConfig()
t.Run("0", func(t *testing.T) {
c.ConcurrentWrites = null.IntFrom(0)
_, err := New(*c)
require.Error(t, err)
require.Equal(t, err.Error(), "influxdb's ConcurrentWrites must be a positive number")
})

t.Run("-2", func(t *testing.T) {
c.ConcurrentWrites = null.IntFrom(-2)
_, err := New(*c)
require.Error(t, err)
require.Equal(t, err.Error(), "influxdb's ConcurrentWrites must be a positive number")
})

t.Run("2", func(t *testing.T) {
c.ConcurrentWrites = null.IntFrom(2)
_, err := New(*c)
require.NoError(t, err)
})
}

func testCollectorCycle(t testing.TB, handler http.HandlerFunc, body func(testing.TB, *Collector)) {
s := &http.Server{
Addr: ":",
Handler: handler,
MaxHeaderBytes: 1 << 20,
}
l, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
defer func() {
_ = l.Close()
}()

defer func() {
require.NoError(t, s.Shutdown(context.Background()))
}()

go func() {
require.Equal(t, http.ErrServerClosed, s.Serve(l))
}()

config := NewConfig()
config.Addr = null.StringFrom("http://" + l.Addr().String())
c, err := New(*config)
require.NoError(t, err)

require.NoError(t, c.Init())
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
defer cancel()
wg.Add(1)
go func() {
defer wg.Done()
c.Run(ctx)
}()

body(t, c)

cancel()
wg.Wait()
}
func TestCollector(t *testing.T) {
var samplesRead int
defer func() {
require.Equal(t, samplesRead, 20)
}()
testCollectorCycle(t, func(rw http.ResponseWriter, r *http.Request) {
var b = bytes.NewBuffer(nil)
_, _ = io.Copy(b, r.Body)
for {
s, err := b.ReadString('\n')
if len(s) > 0 {
samplesRead++
}
if err != nil {
break
}
}

rw.WriteHeader(204)
}, func(tb testing.TB, c *Collector) {
var samples = make(stats.Samples, 10)
for i := 0; i < len(samples); i++ {
samples[i] = stats.Sample{
Metric: stats.New("testGauge", stats.Gauge),
Time: time.Now(),
Tags: stats.NewSampleTags(map[string]string{
"something": "else",
"VU": "21",
"else": "something",
}),
Value: 2.0,
}
}
c.Collect([]stats.SampleContainer{samples})
c.Collect([]stats.SampleContainer{samples})
})

}
44 changes: 36 additions & 8 deletions stats/influxdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net/url"
"strconv"
"strings"
"time"

"github.com/kubernetes/helm/pkg/strvals"
"github.com/loadimpact/k6/lib/types"
Expand All @@ -34,11 +35,13 @@ import (

type Config struct {
// Connection.
Addr null.String `json:"addr" envconfig:"INFLUXDB_ADDR"`
Username null.String `json:"username,omitempty" envconfig:"INFLUXDB_USERNAME"`
Password null.String `json:"password,omitempty" envconfig:"INFLUXDB_PASSWORD"`
Insecure null.Bool `json:"insecure,omitempty" envconfig:"INFLUXDB_INSECURE"`
PayloadSize null.Int `json:"payloadSize,omitempty" envconfig:"INFLUXDB_PAYLOAD_SIZE"`
Addr null.String `json:"addr" envconfig:"INFLUXDB_ADDR"`
Username null.String `json:"username,omitempty" envconfig:"INFLUXDB_USERNAME"`
Password null.String `json:"password,omitempty" envconfig:"INFLUXDB_PASSWORD"`
Insecure null.Bool `json:"insecure,omitempty" envconfig:"INFLUXDB_INSECURE"`
PayloadSize null.Int `json:"payloadSize,omitempty" envconfig:"INFLUXDB_PAYLOAD_SIZE"`
PushInterval types.NullDuration `json:"pushInterval,omitempty" envconfig:"INFLUXDB_PUSH_INTERVAL"`
ConcurrentWrites null.Int `json:"concurrentWrites,omitempty" envconfig:"INFLUXDB_CONCURRENT_WRITES"`

// Samples.
DB null.String `json:"db" envconfig:"INFLUXDB_DB"`
Expand All @@ -50,9 +53,11 @@ type Config struct {

func NewConfig() *Config {
c := &Config{
Addr: null.NewString("http://localhost:8086", false),
DB: null.NewString("k6", false),
TagsAsFields: []string{"vu", "iter", "url"},
Addr: null.NewString("http://localhost:8086", false),
DB: null.NewString("k6", false),
TagsAsFields: []string{"vu", "iter", "url"},
ConcurrentWrites: null.NewInt(10, false),
PushInterval: types.NewNullDuration(time.Second, false),
}
return c
}
Expand Down Expand Up @@ -88,6 +93,13 @@ func (c Config) Apply(cfg Config) Config {
if len(cfg.TagsAsFields) > 0 {
c.TagsAsFields = cfg.TagsAsFields
}
if cfg.PushInterval.Valid {
c.PushInterval = cfg.PushInterval
}

if cfg.ConcurrentWrites.Valid {
c.ConcurrentWrites = cfg.ConcurrentWrites
}
return c
}

Expand Down Expand Up @@ -154,13 +166,29 @@ func ParseURL(text string) (Config, error) {
case "payload_size":
var size int
size, err = strconv.Atoi(vs[0])
if err != nil {
return c, err
}
c.PayloadSize = null.IntFrom(int64(size))
case "precision":
c.Precision = null.StringFrom(vs[0])
case "retention":
c.Retention = null.StringFrom(vs[0])
case "consistency":
c.Consistency = null.StringFrom(vs[0])

case "pushInterval":
err = c.PushInterval.UnmarshalText([]byte(vs[0]))
if err != nil {
return c, err
}
case "concurrentWrites":
var writes int
writes, err = strconv.Atoi(vs[0])
if err != nil {
return c, err
}
c.ConcurrentWrites = null.IntFrom(int64(writes))
case "tagsAsFields":
c.TagsAsFields = vs
default:
Expand Down
2 changes: 1 addition & 1 deletion stats/influxdb/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ package influxdb
import (
"strings"

client "github.com/influxdata/influxdb/client/v2"
client "github.com/influxdata/influxdb1-client/v2"
null "gopkg.in/guregu/null.v3"
)

Expand Down
Loading

0 comments on commit d385166

Please sign in to comment.