-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(influxdb): queue batches to the influxdb if it's slowing down
Previously to this k6 will write to influxdb every second, but if that write took more than 1 second it won't start a second write but instead wait for it. This will generally lead to the write times getting bigger and bigger as more and more data is being written until the max body that influxdb will take is reached when it will return an error and k6 will drop that data. With this commit a configurable number of parallel writes (10 by default), starting again every 1 second (also now configurable). Additionally if we reach the 10 concurrent writes instead of sending all the data that accumulates we will just queue the samples that were generated. This should considerably help with no hitting the max body size of influxdb. I tested with a simple script, doing batch request for an empty local file with 40VUs. Without an output it was getting 8.1K RPS with 650MB of memory usage. Previous to this commit the usage of ram was ~5.7GB for 5736 rps and practically all the data gets lost if you don't up the max body and even than a lot of the data is lost while the memory usage goes up. After this commit the usage of ram was ~2.4GB(or less in some of the tests) with 6273 RPS and there was no lost of data. Even with this commit doing 2 hour of that simple script dies after 1hour and 35 minutes using around 15GB (the test system has 16). Can't be sure of lost of data, as influxdb eat 32GB of memory trying to visualize it and I had to kill it ;(. Some problems with this solution are that: 1. We use a lot of goroutines if things start slowing down - probably not a big problem, but still good idea to fix. 2. We can probably better batch stuff if we add/keep all the unsend samples together and cut them in let say 50k samples. 3. By far the biggest: because the writes are slow if the test is stopped (with Ctrl+C) or it finishes naturally, waiting for those writes can take considerable amount of time - in the above example the 4 minutes tests generally took around 5 minutes :( All of those can be better handled with some more sophisticated queueing code at later time. closes #1081, fixes #1100, fixes #182
- Loading branch information
Showing
4 changed files
with
179 additions
and
21 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
package influxdb | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"io" | ||
"net" | ||
"net/http" | ||
"sync" | ||
"testing" | ||
"time" | ||
|
||
"github.com/loadimpact/k6/stats" | ||
"github.com/stretchr/testify/require" | ||
null "gopkg.in/guregu/null.v3" | ||
) | ||
|
||
func TestBadConcurrentWrites(t *testing.T) { | ||
c := NewConfig() | ||
t.Run("0", func(t *testing.T) { | ||
c.ConcurrentWrites = null.IntFrom(0) | ||
_, err := New(*c) | ||
require.Error(t, err) | ||
require.Equal(t, err.Error(), "influxdb's ConcurrentWrites must be a possitive number") | ||
}) | ||
|
||
t.Run("-2", func(t *testing.T) { | ||
c.ConcurrentWrites = null.IntFrom(-2) | ||
_, err := New(*c) | ||
require.Error(t, err) | ||
require.Equal(t, err.Error(), "influxdb's ConcurrentWrites must be a possitive number") | ||
}) | ||
|
||
t.Run("2", func(t *testing.T) { | ||
c.ConcurrentWrites = null.IntFrom(2) | ||
_, err := New(*c) | ||
require.NoError(t, err) | ||
}) | ||
} | ||
|
||
func testCollectorCycle(t testing.TB, handler http.HandlerFunc, body func(testing.TB, *Collector)) { | ||
s := &http.Server{ | ||
Addr: ":", | ||
Handler: handler, | ||
MaxHeaderBytes: 1 << 20, | ||
} | ||
l, err := net.Listen("tcp", "127.0.0.1:0") | ||
require.NoError(t, err) | ||
defer func() { | ||
_ = l.Close() | ||
}() | ||
|
||
defer func() { | ||
require.NoError(t, s.Shutdown(context.Background())) | ||
}() | ||
|
||
go func() { | ||
require.Equal(t, http.ErrServerClosed, s.Serve(l)) | ||
}() | ||
|
||
config := NewConfig() | ||
config.Addr = null.StringFrom("http://" + l.Addr().String()) | ||
c, err := New(*config) | ||
require.NoError(t, err) | ||
|
||
require.NoError(t, c.Init()) | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
var wg sync.WaitGroup | ||
defer cancel() | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
c.Run(ctx) | ||
}() | ||
|
||
body(t, c) | ||
|
||
cancel() | ||
wg.Wait() | ||
} | ||
func TestCollector(t *testing.T) { | ||
var samplesRead int | ||
defer func() { | ||
require.Equal(t, samplesRead, 20) | ||
}() | ||
testCollectorCycle(t, func(rw http.ResponseWriter, r *http.Request) { | ||
var b = bytes.NewBuffer(nil) | ||
_, _ = io.Copy(b, r.Body) | ||
for { | ||
s, err := b.ReadString('\n') | ||
if len(s) > 0 { | ||
samplesRead++ | ||
} | ||
if err != nil { | ||
break | ||
} | ||
} | ||
|
||
rw.WriteHeader(204) | ||
}, func(tb testing.TB, c *Collector) { | ||
var samples = make(stats.Samples, 10) | ||
for i := 0; i < len(samples); i++ { | ||
samples[i] = stats.Sample{ | ||
Metric: stats.New("testGauge", stats.Gauge), | ||
Time: time.Now(), | ||
Tags: stats.NewSampleTags(map[string]string{ | ||
"something": "else", | ||
"VU": "21", | ||
"else": "something", | ||
}), | ||
Value: 2.0, | ||
} | ||
} | ||
c.Collect([]stats.SampleContainer{samples}) | ||
c.Collect([]stats.SampleContainer{samples}) | ||
}) | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters