diff --git a/go.mod b/go.mod index b2b20a66a9..87da300ea9 100644 --- a/go.mod +++ b/go.mod @@ -57,7 +57,7 @@ require ( github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829 github.com/prometheus/common v0.3.0 github.com/prometheus/prometheus v0.0.0-20190417125241-3cc5f9d88062 - github.com/prometheus/tsdb v0.7.2-0.20190506134726-2ae028114c89 + github.com/prometheus/tsdb v0.8.0 github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e github.com/sercand/kuberesolver v2.1.0+incompatible // indirect github.com/stretchr/testify v1.3.0 diff --git a/go.sum b/go.sum index 53ca7cf1da..33d9166821 100644 --- a/go.sum +++ b/go.sum @@ -367,6 +367,8 @@ github.com/prometheus/prometheus v0.0.0-20190417125241-3cc5f9d88062/go.mod h1:nq github.com/prometheus/tsdb v0.7.0/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/prometheus/tsdb v0.7.2-0.20190506134726-2ae028114c89 h1:r2TOLePIzxV3KQHQuFz5wPllXq+9Y3g0pjj989nizJo= github.com/prometheus/tsdb v0.7.2-0.20190506134726-2ae028114c89/go.mod h1:fSI0j+IUQrDd7+ZtR9WKIGtoYAYAJUKcKhYLG25tN4g= +github.com/prometheus/tsdb v0.8.0 h1:w1tAGxsBMLkuGrFMhqgcCeBkM5d1YI24udArs+aASuQ= +github.com/prometheus/tsdb v0.8.0/go.mod h1:fSI0j+IUQrDd7+ZtR9WKIGtoYAYAJUKcKhYLG25tN4g= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rlmcpherson/s3gof3r v0.5.0/go.mod h1:s7vv7SMDPInkitQMuZzH615G7yWHdrU2r/Go7Bo71Rs= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= diff --git a/pkg/chunk/chunk_test.go b/pkg/chunk/chunk_test.go index 8a10bb72a1..f41041eea1 100644 --- a/pkg/chunk/chunk_test.go +++ b/pkg/chunk/chunk_test.go @@ -129,7 +129,7 @@ func TestChunkCodec(t *testing.T) { const fixedTimestamp = model.Time(1557654321000) func encodeForCompatibilityTest(t *testing.T) { - dummy := dummyChunkForEncoding(fixedTimestamp, labelsForDummyChunks, encoding.Bigchunk, 1) + dummy := dummyChunkForEncoding(fixedTimestamp, labelsForDummyChunks, encoding.BigChunk, 1) encoded, err := dummy.Encoded() require.NoError(t, err) fmt.Printf("%q\n%q\n", dummy.ExternalKey(), encoded) @@ -142,7 +142,7 @@ func TestChunkDecodeBackwardsCompatibility(t *testing.T) { have, err := ParseExternalKey(userID, "userID/fd3477666dacf92a:16aab37c8e8:16aab6eb768:38eb373c") require.NoError(t, err) require.NoError(t, have.Decode(decodeContext, rawData)) - want := dummyChunkForEncoding(fixedTimestamp, labelsForDummyChunks, encoding.Bigchunk, 1) + want := dummyChunkForEncoding(fixedTimestamp, labelsForDummyChunks, encoding.BigChunk, 1) // We can't just compare these two chunks, since the Bigchunk internals are different on construction and read-in. // Compare the serialised version instead require.NoError(t, have.Encode()) diff --git a/pkg/chunk/encoding/bigchunk.go b/pkg/chunk/encoding/bigchunk.go index 0282c08e99..4d9d82dde2 100644 --- a/pkg/chunk/encoding/bigchunk.go +++ b/pkg/chunk/encoding/bigchunk.go @@ -8,6 +8,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/tsdb/chunkenc" + "github.com/prometheus/tsdb/chunks" ) const samplesPerChunk = 120 @@ -22,18 +23,19 @@ type smallChunk struct { // bigchunk is a set of prometheus/tsdb chunks. It grows over time and has no // upperbound on number of samples it can contain. -type bigchunk struct { +type Bigchunk struct { chunks []smallChunk appender chunkenc.Appender remainingSamples int } -func newBigchunk() *bigchunk { - return &bigchunk{} +// NewBigchunk makes a new Bigchunk. +func NewBigchunk() *Bigchunk { + return &Bigchunk{} } -func (b *bigchunk) Add(sample model.SamplePair) ([]Chunk, error) { +func (b *Bigchunk) Add(sample model.SamplePair) ([]Chunk, error) { if b.remainingSamples == 0 { if bigchunkSizeCapBytes > 0 && b.Size() > bigchunkSizeCapBytes { return addToOverflowChunk(b, sample) @@ -50,7 +52,7 @@ func (b *bigchunk) Add(sample model.SamplePair) ([]Chunk, error) { } // addNextChunk adds a new XOR "subchunk" to the internal list of chunks. -func (b *bigchunk) addNextChunk(start model.Time) error { +func (b *Bigchunk) addNextChunk(start model.Time) error { // To save memory, we "compact" the previous chunk - the array backing the slice // will be upto 2x too big, and we can save this space. const chunkCapacityExcess = 32 // don't bother copying if it's within this range @@ -84,7 +86,7 @@ func (b *bigchunk) addNextChunk(start model.Time) error { return nil } -func (b *bigchunk) Marshal(wio io.Writer) error { +func (b *Bigchunk) Marshal(wio io.Writer) error { w := writer{wio} if err := w.WriteVarInt16(uint16(len(b.chunks))); err != nil { return err @@ -101,12 +103,12 @@ func (b *bigchunk) Marshal(wio io.Writer) error { return nil } -func (b *bigchunk) MarshalToBuf(buf []byte) error { +func (b *Bigchunk) MarshalToBuf(buf []byte) error { writer := bytes.NewBuffer(buf) return b.Marshal(writer) } -func (b *bigchunk) UnmarshalFromBuf(buf []byte) error { +func (b *Bigchunk) UnmarshalFromBuf(buf []byte) error { r := reader{buf: buf} numChunks, err := r.ReadUint16() if err != nil { @@ -144,15 +146,15 @@ func (b *bigchunk) UnmarshalFromBuf(buf []byte) error { return nil } -func (b *bigchunk) Encoding() Encoding { - return Bigchunk +func (b *Bigchunk) Encoding() Encoding { + return BigChunk } -func (b *bigchunk) Utilization() float64 { +func (b *Bigchunk) Utilization() float64 { return 1.0 } -func (b *bigchunk) Len() int { +func (b *Bigchunk) Len() int { sum := 0 for _, c := range b.chunks { sum += c.NumSamples() @@ -160,7 +162,7 @@ func (b *bigchunk) Len() int { return sum } -func (b *bigchunk) Size() int { +func (b *Bigchunk) Size() int { sum := 2 // For the number of sub chunks. for _, c := range b.chunks { sum += 2 // For the length of the sub chunk. @@ -169,14 +171,14 @@ func (b *bigchunk) Size() int { return sum } -func (b *bigchunk) NewIterator() Iterator { +func (b *Bigchunk) NewIterator() Iterator { return &bigchunkIterator{ - bigchunk: b, + Bigchunk: b, curr: b.chunks[0].Iterator(), } } -func (b *bigchunk) Slice(start, end model.Time) Chunk { +func (b *Bigchunk) Slice(start, end model.Time) Chunk { i, j := 0, len(b.chunks) for k := 0; k < len(b.chunks); k++ { if b.chunks[k].end < int64(start) { @@ -187,11 +189,23 @@ func (b *bigchunk) Slice(start, end model.Time) Chunk { break } } - return &bigchunk{ + return &Bigchunk{ chunks: b.chunks[i:j], } } +func (b *Bigchunk) AddSmallChunks(cs []chunks.Meta) { + scs := make([]smallChunk, len(cs), 0) + for _, c := range cs { + xoc := c.Chunk.(*chunkenc.XORChunk) + scs = append(scs, smallChunk{ + XORChunk: xoc, + start: c.MinTime, + end: c.MaxTime, + }) + } +} + type writer struct { io.Writer } @@ -227,7 +241,7 @@ func (r *reader) ReadBytes(count int) ([]byte, error) { } type bigchunkIterator struct { - *bigchunk + *Bigchunk curr chunkenc.Iterator i int diff --git a/pkg/chunk/encoding/bigchunk_test.go b/pkg/chunk/encoding/bigchunk_test.go index 6c3bd432ea..9ba80b367d 100644 --- a/pkg/chunk/encoding/bigchunk_test.go +++ b/pkg/chunk/encoding/bigchunk_test.go @@ -10,7 +10,7 @@ import ( ) func TestSliceBiggerChunk(t *testing.T) { - var c Chunk = newBigchunk() + var c Chunk = NewBigchunk() for i := 0; i < 12*3600/15; i++ { cs, err := c.Add(model.SamplePair{ Timestamp: model.Time(i * step), @@ -60,7 +60,7 @@ func TestSliceBiggerChunk(t *testing.T) { func BenchmarkBiggerChunkMemory(b *testing.B) { for i := 0; i < b.N; i++ { - var c Chunk = newBigchunk() + var c Chunk = NewBigchunk() for i := 0; i < 12*3600/15; i++ { cs, err := c.Add(model.SamplePair{ Timestamp: model.Time(i * step), @@ -70,12 +70,12 @@ func BenchmarkBiggerChunkMemory(b *testing.B) { c = cs[0] } - c.(*bigchunk).printSize() + c.(*Bigchunk).printSize() } } // printSize calculates various sizes of the chunk when encoded, and in memory. -func (b *bigchunk) printSize() { +func (b *Bigchunk) printSize() { var buf bytes.Buffer b.Marshal(&buf) diff --git a/pkg/chunk/encoding/chunk_test.go b/pkg/chunk/encoding/chunk_test.go index d35fe15981..8414f00d75 100644 --- a/pkg/chunk/encoding/chunk_test.go +++ b/pkg/chunk/encoding/chunk_test.go @@ -62,7 +62,7 @@ func TestChunk(t *testing.T) { }{ {DoubleDelta, 989}, {Varbit, 2048}, - {Bigchunk, 4096}, + {BigChunk, 4096}, } { for samples := tc.maxSamples / 10; samples < tc.maxSamples; samples += tc.maxSamples / 10 { diff --git a/pkg/chunk/encoding/factory.go b/pkg/chunk/encoding/factory.go index 26b1308f62..20880ed014 100644 --- a/pkg/chunk/encoding/factory.go +++ b/pkg/chunk/encoding/factory.go @@ -39,7 +39,7 @@ const ( // Varbit encoding Varbit // Bigchunk encoding - Bigchunk + BigChunk ) type encoding struct { @@ -66,10 +66,10 @@ var encodings = map[Encoding]encoding{ return newVarbitChunk(varbitZeroEncoding) }, }, - Bigchunk: { + BigChunk: { Name: "Bigchunk", New: func() Chunk { - return newBigchunk() + return NewBigchunk() }, }, } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 8ab08a4110..6cdc55d9c4 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -94,6 +94,8 @@ type Config struct { RateUpdatePeriod time.Duration + UseTSDB bool + // For testing, you can override the address and ID of this ingester. ingesterClientFactory func(addr string, cfg client.Config) (client.HealthAndIngesterClient, error) } @@ -223,75 +225,20 @@ func (i *Ingester) StopIncomingRequests() { // Push implements client.IngesterServer func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.WriteResponse, error) { - var lastPartialErr error - - for _, ts := range req.Timeseries { - for _, s := range ts.Samples { - err := i.append(ctx, ts.Labels, model.Time(s.TimestampMs), model.SampleValue(s.Value), req.Source) - if err == nil { - continue - } - - ingestedSamplesFail.Inc() - if httpResp, ok := httpgrpc.HTTPResponseFromError(err); ok { - switch httpResp.Code { - case http.StatusBadRequest, http.StatusTooManyRequests: - lastPartialErr = err - continue - } - } - - return nil, err - } - } - - return &client.WriteResponse{}, lastPartialErr -} - -func (i *Ingester) append(ctx context.Context, labels labelPairs, timestamp model.Time, value model.SampleValue, source client.WriteRequest_SourceEnum) error { - labels.removeBlanks() - i.stopLock.RLock() defer i.stopLock.RUnlock() if i.stopped { - return fmt.Errorf("ingester stopping") + return nil, fmt.Errorf("ingester stopping") } i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() - state, fp, series, err := i.userStates.getOrCreateSeries(ctx, labels) + u, err := i.userStates.getOrCreate(ctx) if err != nil { - return err - } - defer func() { - state.fpLocker.Unlock(fp) - }() - - prevNumChunks := len(series.chunkDescs) - if err := series.add(model.SamplePair{ - Value: value, - Timestamp: timestamp, - }); err != nil { - if mse, ok := err.(*memorySeriesError); ok { - validation.DiscardedSamples.WithLabelValues(mse.errorType, state.userID).Inc() - // Use a dumb string template to avoid the message being parsed as a template - err = httpgrpc.Errorf(http.StatusBadRequest, "%s", mse.message) - } - return err - } - - memoryChunks.Add(float64(len(series.chunkDescs) - prevNumChunks)) - ingestedSamples.Inc() - switch source { - case client.RULE: - state.ingestedRuleSamples.inc() - case client.API: - fallthrough - default: - state.ingestedAPISamples.inc() + return nil, err } - return err + return &client.WriteResponse{}, u.append(ctx, req) } // Query implements service.IngesterServer @@ -320,8 +267,8 @@ func (i *Ingester) Query(ctx old_ctx.Context, req *client.QueryRequest) (*client result := &client.QueryResponse{} numSeries, numSamples := 0, 0 maxSamplesPerQuery := i.limits.MaxSamplesPerQuery(userID) - err = state.forSeriesMatching(ctx, matchers, func(ctx context.Context, _ model.Fingerprint, series *memorySeries) error { - values, err := series.samplesForRange(from, through) + err = state.query(ctx, req, func(ctx context.Context, labels labels.Labels, chunks ...*desc) error { + values, err := samplesForRange(chunks, from, through) if err != nil { return err } @@ -336,7 +283,7 @@ func (i *Ingester) Query(ctx old_ctx.Context, req *client.QueryRequest) (*client } ts := client.TimeSeries{ - Labels: client.FromLabelsToLabelAdapaters(series.metric), + Labels: client.FromLabelsToLabelAdapaters(labels), Samples: make([]client.Sample, 0, len(values)), } for _, s := range values { @@ -379,13 +326,13 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ // can iteratively merge them with entries coming from the chunk store. But // that would involve locking all the series & sorting, so until we have // a better solution in the ingesters I'd rather take the hit in the queriers. - err = state.forSeriesMatching(stream.Context(), matchers, func(ctx context.Context, _ model.Fingerprint, series *memorySeries) error { - chunks := make([]*desc, 0, len(series.chunkDescs)) + err = state.query(stream.Context(), req, func(ctx context.Context, labels labels.Labels, chunks ...*desc) error { + /*chunks := make([]*desc, 0, len(chunks)) for _, chunk := range series.chunkDescs { if !(chunk.FirstTime.After(through) || chunk.LastTime.Before(from)) { chunks = append(chunks, chunk.slice(from, through)) } - } + }*/ if len(chunks) == 0 { return nil @@ -399,7 +346,7 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ numChunks += len(wireChunks) batch = append(batch, client.TimeSeriesChunk{ - Labels: client.FromLabelsToLabelAdapaters(series.metric), + Labels: client.FromLabelsToLabelAdapaters(labels), Chunks: wireChunks, }) @@ -482,9 +429,14 @@ func (i *Ingester) MetricsForLabelMatchers(ctx old_ctx.Context, req *client.Metr lss := map[model.Fingerprint]labels.Labels{} for _, matchers := range matchersSet { - if err := state.forSeriesMatching(ctx, matchers, func(ctx context.Context, fp model.Fingerprint, series *memorySeries) error { + req := client.QueryRequest{ + Matchers: matchers, + } + + if err := state.query(ctx, &req, func(ctx context.Context, labels labels.Labels, _ ...*desc) error { + fp := labels.Fingerprint() if _, ok := lss[fp]; !ok { - lss[fp] = series.metric + lss[fp] = labels } return nil }, nil, 0); err != nil { @@ -568,3 +520,18 @@ func (i *Ingester) ReadinessHandler(w http.ResponseWriter, r *http.Request) { http.Error(w, "Not ready: "+err.Error(), http.StatusServiceUnavailable) } } + +func samplesForRange(chunks []*desc, from, through model.Time) ([]model.SamplePair, error) { + var values []model.SamplePair + for i := 0; i <= len(chunks); i++ { + chunk := chunks[i] + it := chunk.C.NewIterator() + for { + values = append(values, it.Value()) + if !it.Scan() { + break + } + } + } + return values, nil +} diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index fe3ab5cad5..29b7fd4005 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -266,6 +266,22 @@ func (s *stream) Send(response *client.QueryStreamResponse) error { return nil } +func WriteRequest(ls labelPairs, ts int64, v float64) *client.WriteRequest { + return &client.WriteRequest{ + Timeseries: []client.PreallocTimeseries{ + { + TimeSeries: client.TimeSeries{ + Labels: ls, + Samples: []client.Sample{ + {TimestampMs: ts, Value: v}, + }, + }, + }, + }, + Source: client.API, + } +} + func TestIngesterAppendOutOfOrderAndDuplicate(t *testing.T) { _, ing := newDefaultTestStore(t) defer ing.Shutdown() @@ -274,22 +290,22 @@ func TestIngesterAppendOutOfOrderAndDuplicate(t *testing.T) { {Name: model.MetricNameLabel, Value: "testmetric"}, } ctx := user.InjectOrgID(context.Background(), userID) - err := ing.append(ctx, m, 1, 0, client.API) + _, err := ing.Push(ctx, WriteRequest(m, 1, 0)) require.NoError(t, err) // Two times exactly the same sample (noop). - err = ing.append(ctx, m, 1, 0, client.API) + _, err = ing.Push(ctx, WriteRequest(m, 1, 0)) require.NoError(t, err) // Earlier sample than previous one. - err = ing.append(ctx, m, 0, 0, client.API) + _, err = ing.Push(ctx, WriteRequest(m, 0, 0)) require.Contains(t, err.Error(), "sample timestamp out of order") errResp, ok := httpgrpc.HTTPResponseFromError(err) require.True(t, ok) require.Equal(t, errResp.Code, int32(400)) // Same timestamp as previous sample, but different value. - err = ing.append(ctx, m, 1, 1, client.API) + _, err = ing.Push(ctx, WriteRequest(m, 1, 1)) require.Contains(t, err.Error(), "sample with repeated timestamp but different value") errResp, ok = httpgrpc.HTTPResponseFromError(err) require.True(t, ok) @@ -307,7 +323,7 @@ func TestIngesterAppendBlankLabel(t *testing.T) { {Name: "bar", Value: ""}, } ctx := user.InjectOrgID(context.Background(), userID) - err := ing.append(ctx, lp, 1, 0, client.API) + _, err := ing.Push(ctx, WriteRequest(lp, 1, 0)) require.NoError(t, err) res, _, err := runTestQuery(ctx, t, ing, labels.MatchEqual, model.MetricNameLabel, "testmetric") diff --git a/pkg/ingester/query_test.go b/pkg/ingester/query_test.go index 9debe783b1..0d0f8cbbe3 100644 --- a/pkg/ingester/query_test.go +++ b/pkg/ingester/query_test.go @@ -30,7 +30,7 @@ func BenchmarkQueryStream(b *testing.B) { numCPUs = 32 ) - encoding.DefaultEncoding = encoding.Bigchunk + encoding.DefaultEncoding = encoding.BigChunk limits.MaxSeriesPerMetric = numSeries limits.MaxSeriesPerQuery = numSeries cfg.FlushCheckPeriod = 15 * time.Minute @@ -55,14 +55,14 @@ func BenchmarkQueryStream(b *testing.B) { {Name: "cpu", Value: cpus[i%numCPUs]}, } - state, fp, series, err := ing.userStates.getOrCreateSeries(ctx, labels) + state, err := ing.userStates.getOrCreate(ctx) + require.NoError(b, err) + + fp, series, err := state.getSeries(labels) require.NoError(b, err) for j := 0; j < numSamples; j++ { - err = series.add(model.SamplePair{ - Value: model.SampleValue(float64(i)), - Timestamp: model.Time(int64(i)), - }) + err = series.add(int64(i), float64(i)) require.NoError(b, err) } diff --git a/pkg/ingester/series.go b/pkg/ingester/series.go index 91039608a7..2e7c17bf68 100644 --- a/pkg/ingester/series.go +++ b/pkg/ingester/series.go @@ -2,14 +2,16 @@ package ingester import ( "fmt" - "sort" + "net/http" + + "github.com/cortexproject/cortex/pkg/util/validation" + "github.com/weaveworks/common/httpgrpc" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/cortexproject/cortex/pkg/chunk/encoding" - "github.com/cortexproject/cortex/pkg/prom1/storage/metric" ) var ( @@ -24,6 +26,7 @@ func init() { } type memorySeries struct { + userID string metric labels.Labels // Sorted by start time, overlapping chunk ranges are forbidden. @@ -40,19 +43,11 @@ type memorySeries struct { lastSampleValue model.SampleValue } -type memorySeriesError struct { - message string - errorType string -} - -func (error *memorySeriesError) Error() string { - return error.message -} - // newMemorySeries returns a pointer to a newly allocated memorySeries for the // given metric. -func newMemorySeries(m labels.Labels) *memorySeries { +func newMemorySeries(userID string, m labels.Labels) *memorySeries { return &memorySeries{ + userID: userID, metric: m, lastTime: model.Earliest, } @@ -62,7 +57,12 @@ func newMemorySeries(m labels.Labels) *memorySeries { // completed chunks (which are now eligible for persistence). // // The caller must have locked the fingerprint of the series. -func (s *memorySeries) add(v model.SamplePair) error { +func (s *memorySeries) add(ts int64, vs float64) error { + v := model.SamplePair{ + Timestamp: model.Time(ts), + Value: model.SampleValue(vs), + } + // Don't report "no-op appends", i.e. where timestamp and sample // value are the same as for the last append, as they are a // common occurrence when using client-side timestamps @@ -73,16 +73,12 @@ func (s *memorySeries) add(v model.SamplePair) error { return nil } if v.Timestamp == s.lastTime { - return &memorySeriesError{ - message: fmt.Sprintf("sample with repeated timestamp but different value for series %v; last value: %v, incoming value: %v", s.metric, s.lastSampleValue, v.Value), - errorType: "new-value-for-timestamp", - } + validation.DiscardedSamples.WithLabelValues("new-value-for-timestamp", s.userID).Inc() + return httpgrpc.Errorf(http.StatusBadRequest, "sample with repeated timestamp but different value for series %v; last value: %v, incoming value: %v", s.metric, s.lastSampleValue, v.Value) } if v.Timestamp < s.lastTime { - return &memorySeriesError{ - message: fmt.Sprintf("sample timestamp out of order for series %v; last timestamp: %v, incoming timestamp: %v", s.metric, s.lastTime, v.Timestamp), - errorType: "sample-out-of-order", - } + validation.DiscardedSamples.WithLabelValues("sample-out-of-order", s.userID).Inc() + return httpgrpc.Errorf(http.StatusBadRequest, "sample timestamp out of order for series %v; last timestamp: %v, incoming timestamp: %v", s.metric, s.lastTime, v.Timestamp) } if len(s.chunkDescs) == 0 || s.headChunkClosed { @@ -157,45 +153,6 @@ func (s *memorySeries) head() *desc { return s.chunkDescs[len(s.chunkDescs)-1] } -func (s *memorySeries) samplesForRange(from, through model.Time) ([]model.SamplePair, error) { - // Find first chunk with start time after "from". - fromIdx := sort.Search(len(s.chunkDescs), func(i int) bool { - return s.chunkDescs[i].FirstTime.After(from) - }) - // Find first chunk with start time after "through". - throughIdx := sort.Search(len(s.chunkDescs), func(i int) bool { - return s.chunkDescs[i].FirstTime.After(through) - }) - if fromIdx == len(s.chunkDescs) { - // Even the last chunk starts before "from". Find out if the - // series ends before "from" and we don't need to do anything. - lt := s.chunkDescs[len(s.chunkDescs)-1].LastTime - if lt.Before(from) { - return nil, nil - } - } - if fromIdx > 0 { - fromIdx-- - } - if throughIdx == len(s.chunkDescs) { - throughIdx-- - } - var values []model.SamplePair - in := metric.Interval{ - OldestInclusive: from, - NewestInclusive: through, - } - for idx := fromIdx; idx <= throughIdx; idx++ { - cd := s.chunkDescs[idx] - chValues, err := encoding.RangeValues(cd.C.NewIterator(), in) - if err != nil { - return nil, err - } - values = append(values, chValues...) - } - return values, nil -} - func (s *memorySeries) setChunks(descs []*desc) error { if len(s.chunkDescs) != 0 { return fmt.Errorf("series already has chunks") diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index 8d52eac171..435a74ff63 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -82,27 +82,29 @@ func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) e fromIngesterID = wireSeries.FromIngesterId level.Info(util.Logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID) } - userCtx := user.InjectOrgID(stream.Context(), wireSeries.UserId) - descs, err := fromWireChunks(wireSeries.Chunks) - if err != nil { - return err - } + /* + userCtx := user.InjectOrgID(stream.Context(), wireSeries.UserId) + descs, err := fromWireChunks(wireSeries.Chunks) + if err != nil { + return err + } - state, fp, series, err := userStates.getOrCreateSeries(userCtx, wireSeries.Labels) - if err != nil { - return err - } - prevNumChunks := len(series.chunkDescs) + state, fp, series, err := userStates.getOrCreateSeries(userCtx, wireSeries.Labels) + if err != nil { + return err + } + prevNumChunks := len(series.chunkDescs) - err = series.setChunks(descs) - state.fpLocker.Unlock(fp) // acquired in getOrCreateSeries - if err != nil { - return err - } + err = series.setChunks(descs) + state.fpLocker.Unlock(fp) // acquired in getOrCreateSeries + if err != nil { + return err + } - seriesReceived++ - memoryChunks.Add(float64(len(series.chunkDescs) - prevNumChunks)) - receivedChunks.Add(float64(len(descs))) + seriesReceived++ + memoryChunks.Add(float64(len(series.chunkDescs) - prevNumChunks)) + receivedChunks.Add(float64(len(descs))) + */ } if seriesReceived == 0 { diff --git a/pkg/ingester/user_state.go b/pkg/ingester/user_state.go index 3b41f65bd8..047d366c8a 100644 --- a/pkg/ingester/user_state.go +++ b/pkg/ingester/user_state.go @@ -2,17 +2,20 @@ package ingester import ( "context" - "fmt" "net/http" - "sync" + "unsafe" + + "github.com/prometheus/tsdb" "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" + tsdbLabels "github.com/prometheus/tsdb/labels" "github.com/segmentio/fasthash/fnv1a" + "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/ingester/index" "github.com/cortexproject/cortex/pkg/util" @@ -20,7 +23,12 @@ import ( "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/cortexproject/cortex/pkg/util/validation" "github.com/weaveworks/common/httpgrpc" - "github.com/weaveworks/common/user" +) + +// DiscardedSamples metric labels +const ( + perUserSeriesLimit = "per_user_series_limit" + perMetricSeriesLimit = "per_metric_series_limit" ) var ( @@ -28,10 +36,6 @@ var ( Name: "cortex_ingester_memory_series", Help: "The current number of series in memory.", }) - memUsers = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "cortex_ingester_memory_users", - Help: "The current number of users in memory.", - }) memSeriesCreatedTotal = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "cortex_ingester_memory_series_created_total", Help: "The total number of series that were created per user.", @@ -42,13 +46,8 @@ var ( }, []string{"user"}) ) -type userStates struct { - states sync.Map - limits *validation.Overrides - cfg Config -} - type userState struct { + cfg Config limits *validation.Overrides userID string fpLocker *fingerprintLocker @@ -62,115 +61,117 @@ type userState struct { memSeriesCreatedTotal prometheus.Counter memSeriesRemovedTotal prometheus.Counter -} -const metricCounterShards = 128 + tsdb *tsdb.DB +} -// DiscardedSamples metric labels -const ( - perUserSeriesLimit = "per_user_series_limit" - perMetricSeriesLimit = "per_metric_series_limit" -) +func newUserState(cfg Config, userID string, limits *validation.Overrides) *userState { + seriesInMetric := make([]metricCounterShard, 0, metricCounterShards) + for i := 0; i < metricCounterShards; i++ { + seriesInMetric = append(seriesInMetric, metricCounterShard{ + m: map[string]int{}, + }) + } -type metricCounterShard struct { - mtx sync.Mutex - m map[string]int + // Speculatively create a userState object and try to store it + // in the map. Another goroutine may have got there before + // us, in which case this userState will be discarded + state := &userState{ + cfg: cfg, + userID: userID, + limits: limits, + fpToSeries: newSeriesMap(), + fpLocker: newFingerprintLocker(16 * 1024), + index: index.New(), + ingestedAPISamples: newEWMARate(0.2, cfg.RateUpdatePeriod), + ingestedRuleSamples: newEWMARate(0.2, cfg.RateUpdatePeriod), + seriesInMetric: seriesInMetric, + + memSeriesCreatedTotal: memSeriesCreatedTotal.WithLabelValues(userID), + memSeriesRemovedTotal: memSeriesRemovedTotal.WithLabelValues(userID), + } + state.mapper = newFPMapper(state.fpToSeries) + return state } -func newUserStates(limits *validation.Overrides, cfg Config) *userStates { - return &userStates{ - limits: limits, - cfg: cfg, +func (u *userState) append(ctx context.Context, req *client.WriteRequest) error { + if u.cfg.UseTSDB { + return u.tsdbAppend(ctx, req) + } else { + return u.legacyAppend(ctx, req) } } -func (us *userStates) cp() map[string]*userState { - states := map[string]*userState{} - us.states.Range(func(key, value interface{}) bool { - states[key.(string)] = value.(*userState) - return true - }) - return states -} +func (u *userState) legacyAppend(ctx context.Context, req *client.WriteRequest) error { + var lastPartialErr error + for _, ts := range req.Timeseries { + for _, s := range ts.Samples { + err := u.legacyAppendInternal(ctx, ts.Labels, s.TimestampMs, s.Value, req.Source) + if err == nil { + continue + } + + ingestedSamplesFail.Inc() + if httpResp, ok := httpgrpc.HTTPResponseFromError(err); ok { + switch httpResp.Code { + case http.StatusBadRequest, http.StatusTooManyRequests: + lastPartialErr = err + continue + } + } -func (us *userStates) gc() { - us.states.Range(func(key, value interface{}) bool { - state := value.(*userState) - if state.fpToSeries.length() == 0 { - us.states.Delete(key) + return err } - return true - }) + } + return lastPartialErr } -func (us *userStates) updateRates() { - us.states.Range(func(key, value interface{}) bool { - state := value.(*userState) - state.ingestedAPISamples.tick() - state.ingestedRuleSamples.tick() - return true - }) -} +func (u *userState) legacyAppendInternal(ctx context.Context, labels labelPairs, timestamp int64, value float64, source client.WriteRequest_SourceEnum) error { + labels.removeBlanks() -func (us *userStates) get(userID string) (*userState, bool) { - state, ok := us.states.Load(userID) - if !ok { - return nil, ok + fp, series, err := u.getSeries(labels) + if err != nil { + return err } - return state.(*userState), ok -} + defer u.fpLocker.Unlock(fp) -func (us *userStates) getViaContext(ctx context.Context) (*userState, bool, error) { - userID, err := user.ExtractOrgID(ctx) - if err != nil { - return nil, false, fmt.Errorf("no user id") + prevNumChunks := len(series.chunkDescs) + if err := series.add(timestamp, value); err != nil { + return err } - state, ok := us.get(userID) - return state, ok, nil -} -func (us *userStates) getOrCreateSeries(ctx context.Context, labels []client.LabelAdapter) (*userState, model.Fingerprint, *memorySeries, error) { - userID, err := user.ExtractOrgID(ctx) - if err != nil { - return nil, 0, nil, fmt.Errorf("no user id") + memoryChunks.Add(float64(len(series.chunkDescs) - prevNumChunks)) + ingestedSamples.Inc() + switch source { + case client.RULE: + u.ingestedRuleSamples.inc() + case client.API: + fallthrough + default: + u.ingestedAPISamples.inc() } - state, ok := us.get(userID) - if !ok { + return err +} - seriesInMetric := make([]metricCounterShard, 0, metricCounterShards) - for i := 0; i < metricCounterShards; i++ { - seriesInMetric = append(seriesInMetric, metricCounterShard{ - m: map[string]int{}, - }) - } +func (u *userState) tsdbAppend(ctx context.Context, req *client.WriteRequest) error { + appender := u.tsdb.Appender() - // Speculatively create a userState object and try to store it - // in the map. Another goroutine may have got there before - // us, in which case this userState will be discarded - state = &userState{ - userID: userID, - limits: us.limits, - fpToSeries: newSeriesMap(), - fpLocker: newFingerprintLocker(16 * 1024), - index: index.New(), - ingestedAPISamples: newEWMARate(0.2, us.cfg.RateUpdatePeriod), - ingestedRuleSamples: newEWMARate(0.2, us.cfg.RateUpdatePeriod), - seriesInMetric: seriesInMetric, - - memSeriesCreatedTotal: memSeriesCreatedTotal.WithLabelValues(userID), - memSeriesRemovedTotal: memSeriesRemovedTotal.WithLabelValues(userID), - } - state.mapper = newFPMapper(state.fpToSeries) - stored, ok := us.states.LoadOrStore(userID, state) - if !ok { - memUsers.Inc() + for _, series := range req.Timeseries { + for _, sample := range series.Samples { + pairs := labelPairs(series.Labels) + pairs.removeBlanks() + labels := client.FromLabelAdaptersToLabels(series.Labels) + tsbdLabels := *(*tsdbLabels.Labels)(unsafe.Pointer(&labels)) + + _, err := appender.Add(tsbdLabels, sample.TimestampMs, sample.Value) + if err != nil { + return err + } } - state = stored.(*userState) } - fp, series, err := state.getSeries(labels) - return state, fp, series, err + return appender.Commit() } func (u *userState) getSeries(metric labelPairs) (model.Fingerprint, *memorySeries, error) { @@ -214,7 +215,7 @@ func (u *userState) getSeries(metric labelPairs) (model.Fingerprint, *memorySeri memSeries.Inc() labels := u.index.Add(metric, fp) - series = newMemorySeries(labels) + series = newMemorySeries(u.userID, labels) u.fpToSeries.put(fp, series) return fp, series, nil @@ -256,22 +257,28 @@ func (u *userState) removeSeries(fp model.Fingerprint, metric labels.Labels) { memSeries.Dec() } -// forSeriesMatching passes all series matching the given matchers to the -// provided callback. Deals with locking and the quirks of zero-length matcher -// values. There are 2 callbacks: -// - The `add` callback is called for each series while the lock is held, and -// is intend to be used by the caller to build a batch. -// - The `send` callback is called at certain intervals specified by batchSize -// with no locks held, and is intended to be used by the caller to send the -// built batches. -func (u *userState) forSeriesMatching(ctx context.Context, allMatchers []*labels.Matcher, - add func(context.Context, model.Fingerprint, *memorySeries) error, +func (u *userState) query( + ctx context.Context, req *client.QueryRequest, + add func(context.Context, labels.Labels, ...*desc) error, + send func(context.Context) error, batchSize int, +) error { + return u.legacyQuery(ctx, req, add, send, batchSize) +} + +func (u *userState) legacyQuery( + ctx context.Context, req *client.QueryRequest, + add func(context.Context, labels.Labels, ...*desc) error, send func(context.Context) error, batchSize int, ) error { - log, ctx := spanlogger.New(ctx, "forSeriesMatching") + log, ctx := spanlogger.New(ctx, "legacyQuery") defer log.Finish() - filters, matchers := util.SplitFiltersAndMatchers(allMatchers) + from, through, matchers, err := client.FromQueryRequest(req) + if err != nil { + return err + } + + filters, matchers := util.SplitFiltersAndMatchers(matchers) fps := u.index.Lookup(matchers) if len(fps) > u.limits.MaxSeriesPerQuery(u.userID) { return httpgrpc.Errorf(http.StatusRequestEntityTooLarge, "exceeded maximum number of series in a query") @@ -302,7 +309,14 @@ outer: } } - err := add(ctx, fp, series) + chunks := make([]*desc, 0, len(series.chunkDescs)) + for _, chunk := range series.chunkDescs { + if !(chunk.FirstTime.After(through) || chunk.LastTime.Before(from)) { + chunks = append(chunks, chunk) + } + } + + err := add(ctx, series.metric, chunks...) u.fpLocker.Unlock(fp) if err != nil { return err @@ -320,3 +334,98 @@ outer: } return nil } + +func (u *userState) tsdbQuery( + ctx context.Context, req *client.QueryRequest, + add func(context.Context, labels.Labels, ...*desc) error, + send func(context.Context) error, batchSize int, +) error { + log, ctx := spanlogger.New(ctx, "tsdbQuery") + defer log.Finish() + + from, through, matchers, err := client.FromQueryRequest(req) + if err != nil { + return err + } + + querier, err := u.tsdb.Querier(int64(from), int64(through)) + if err != nil { + return err + } + + seriesSet, err := querier.Select(convertMatchers(matchers)...) + if err != nil { + return err + } + + i := 0 + for ; seriesSet.Next(); i++ { + if err := ctx.Err(); err != nil { + return err + } + + series := seriesSet.At() + tsdbChunks := series.Chunks() + bigchunk := encoding.NewBigchunk() + bigchunk.AddSmallChunks(tsdbChunks) + tsdbLabels := series.Labels() + labels := *(*labels.Labels)(unsafe.Pointer(&tsdbLabels)) + desc := desc{ + C: bigchunk, + // FirstTime: . + } + + if err := add(ctx, labels, &desc); err != nil { + return err + } + + if batchSize > 0 && (i+1)%batchSize == 0 && send != nil { + if err = send(ctx); err != nil { + return nil + } + } + } + + if batchSize > 0 && i%batchSize > 0 && send != nil { + return send(ctx) + } + + return seriesSet.Err() +} + +func (u *userState) labelNames() []string { + return u.index.LabelNames() +} + +func convertMatchers(oms []*labels.Matcher) []tsdbLabels.Matcher { + ms := make([]tsdbLabels.Matcher, 0, len(oms)) + for _, om := range oms { + ms = append(ms, convertMatcher(om)) + } + return ms +} + +func convertMatcher(m *labels.Matcher) tsdbLabels.Matcher { + switch m.Type { + case labels.MatchEqual: + return tsdbLabels.NewEqualMatcher(m.Name, m.Value) + + case labels.MatchNotEqual: + return tsdbLabels.Not(tsdbLabels.NewEqualMatcher(m.Name, m.Value)) + + case labels.MatchRegexp: + res, err := tsdbLabels.NewRegexpMatcher(m.Name, "^(?:"+m.Value+")$") + if err != nil { + panic(err) + } + return res + + case labels.MatchNotRegexp: + res, err := tsdbLabels.NewRegexpMatcher(m.Name, "^(?:"+m.Value+")$") + if err != nil { + panic(err) + } + return tsdbLabels.Not(res) + } + panic("storage.convertMatcher: invalid matcher type") +} diff --git a/pkg/ingester/user_states.go b/pkg/ingester/user_states.go new file mode 100644 index 0000000000..70e9e07559 --- /dev/null +++ b/pkg/ingester/user_states.go @@ -0,0 +1,102 @@ +package ingester + +import ( + "fmt" + "sync" + + "github.com/cortexproject/cortex/pkg/util/validation" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/weaveworks/common/user" + "golang.org/x/net/context" +) + +const metricCounterShards = 128 + +var ( + memUsers = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "cortex_ingester_memory_users", + Help: "The current number of users in memory.", + }) +) + +type userStates struct { + states sync.Map + limits *validation.Overrides + cfg Config +} + +type metricCounterShard struct { + mtx sync.Mutex + m map[string]int +} + +func newUserStates(limits *validation.Overrides, cfg Config) *userStates { + return &userStates{ + limits: limits, + cfg: cfg, + } +} + +func (us *userStates) cp() map[string]*userState { + states := map[string]*userState{} + us.states.Range(func(key, value interface{}) bool { + states[key.(string)] = value.(*userState) + return true + }) + return states +} + +func (us *userStates) gc() { + us.states.Range(func(key, value interface{}) bool { + state := value.(*userState) + if state.fpToSeries.length() == 0 { + us.states.Delete(key) + } + return true + }) +} + +func (us *userStates) updateRates() { + us.states.Range(func(key, value interface{}) bool { + state := value.(*userState) + state.ingestedAPISamples.tick() + state.ingestedRuleSamples.tick() + return true + }) +} + +func (us *userStates) get(userID string) (*userState, bool) { + state, ok := us.states.Load(userID) + if !ok { + return nil, ok + } + return state.(*userState), ok +} + +func (us *userStates) getViaContext(ctx context.Context) (*userState, bool, error) { + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, false, fmt.Errorf("no user id") + } + state, ok := us.get(userID) + return state, ok, nil +} + +func (us *userStates) getOrCreate(ctx context.Context) (*userState, error) { + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, fmt.Errorf("no user id") + } + state, ok := us.get(userID) + if !ok { + state := newUserState(us.cfg, userID, us.limits) + stored, ok := us.states.LoadOrStore(userID, state) + if !ok { + memUsers.Inc() + } + state = stored.(*userState) + } + + return state, nil +} diff --git a/vendor/github.com/prometheus/tsdb/CHANGELOG.md b/vendor/github.com/prometheus/tsdb/CHANGELOG.md index 87b642019d..610899d72f 100644 --- a/vendor/github.com/prometheus/tsdb/CHANGELOG.md +++ b/vendor/github.com/prometheus/tsdb/CHANGELOG.md @@ -1,8 +1,15 @@ ## master / unreleased - - [BUGFIX] Calling `Close` more than once on a querier returns an error instead of a panic. - - [ENHANCEMENT] Add (again) FromData function to easily create a chunk from bytes. +## 0.8.0 + - [BUGFIX] Calling `Close` more than once on a querier returns an error instead of a panic. + - [BUGFIX] Don't panic and recover nicely when running out of disk space. + - [BUGFIX] Correctly handle empty labels. + - [BUGFIX] Don't crash on an unknown tombstone ref. + - [ENHANCEMENT] Re-add FromData function to create a chunk from bytes. It is used by Cortex and Thanos. + - [ENHANCEMENT] Simplify mergedPostings.Seek. + - [FEATURE] Added `currentSegment` metric for the current WAL segment it is being written to. + ## 0.7.1 - [ENHANCEMENT] Reduce memory usage in mergedPostings.Seek diff --git a/vendor/github.com/prometheus/tsdb/Makefile.common b/vendor/github.com/prometheus/tsdb/Makefile.common index 4f18ea5877..c7f9ea64ff 100644 --- a/vendor/github.com/prometheus/tsdb/Makefile.common +++ b/vendor/github.com/prometheus/tsdb/Makefile.common @@ -69,7 +69,7 @@ else GO_BUILD_PLATFORM ?= $(GOHOSTOS)-$(GOHOSTARCH) endif -PROMU_VERSION ?= 0.3.0 +PROMU_VERSION ?= 0.4.0 PROMU_URL := https://github.com/prometheus/promu/releases/download/v$(PROMU_VERSION)/promu-$(PROMU_VERSION).$(GO_BUILD_PLATFORM).tar.gz GOLANGCI_LINT := diff --git a/vendor/github.com/prometheus/tsdb/head.go b/vendor/github.com/prometheus/tsdb/head.go index cfb7cb89ed..92619a6401 100644 --- a/vendor/github.com/prometheus/tsdb/head.go +++ b/vendor/github.com/prometheus/tsdb/head.go @@ -421,6 +421,10 @@ func (h *Head) loadWAL(r *wal.Reader) error { if itv.Maxt < h.minValidTime { continue } + if m := h.series.getByID(s.ref); m == nil { + unknownRefs++ + continue + } allStones.addInterval(s.ref, itv) } } @@ -755,6 +759,9 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro return 0, ErrOutOfBounds } + // Ensure no empty labels have gotten through. + lset = lset.WithoutEmpty() + s, created := a.head.getOrCreate(lset.Hash(), lset) if created { a.series = append(a.series, RefSeries{ diff --git a/vendor/github.com/prometheus/tsdb/index/postings.go b/vendor/github.com/prometheus/tsdb/index/postings.go index cdad93953f..bb7b5837af 100644 --- a/vendor/github.com/prometheus/tsdb/index/postings.go +++ b/vendor/github.com/prometheus/tsdb/index/postings.go @@ -404,7 +404,6 @@ func (h *postingsHeap) Pop() interface{} { type mergedPostings struct { h postingsHeap initilized bool - heaped bool cur uint64 err error } @@ -434,12 +433,9 @@ func (it *mergedPostings) Next() bool { return false } - if !it.heaped { - heap.Init(&it.h) - it.heaped = true - } // The user must issue an initial Next. if !it.initilized { + heap.Init(&it.h) it.cur = it.h[0].At() it.initilized = true return true @@ -477,33 +473,24 @@ func (it *mergedPostings) Seek(id uint64) bool { return false } } - if it.cur >= id { - return true - } - // Heapifying when there is lots of Seeks is inefficient, - // mark to be re-heapified on the Next() call. - it.heaped = false - lowest := ^uint64(0) - n := 0 - for _, i := range it.h { - if i.Seek(id) { - it.h[n] = i - n++ - if i.At() < lowest { - lowest = i.At() + for it.cur < id { + cur := it.h[0] + if !cur.Seek(id) { + heap.Pop(&it.h) + if cur.Err() != nil { + it.err = cur.Err() + return false } - } else { - if i.Err() != nil { - it.err = i.Err() + if it.h.Len() == 0 { return false } + } else { + // Value of top of heap has changed, re-heapify. + heap.Fix(&it.h, 0) } + + it.cur = it.h[0].At() } - it.h = it.h[:n] - if len(it.h) == 0 { - return false - } - it.cur = lowest return true } diff --git a/vendor/github.com/prometheus/tsdb/labels/labels.go b/vendor/github.com/prometheus/tsdb/labels/labels.go index 35a230f57c..aab8e42be9 100644 --- a/vendor/github.com/prometheus/tsdb/labels/labels.go +++ b/vendor/github.com/prometheus/tsdb/labels/labels.go @@ -103,6 +103,23 @@ func (ls Labels) Map() map[string]string { return m } +// WithoutEmpty returns the labelset without empty labels. +// May return the same labelset. +func (ls Labels) WithoutEmpty() Labels { + for _, v := range ls { + if v.Value == "" { + els := make(Labels, 0, len(ls)-1) + for _, v := range ls { + if v.Value != "" { + els = append(els, v) + } + } + return els + } + } + return ls +} + // New returns a sorted Labels from the given labels. // The caller has to guarantee that all label names are unique. func New(ls ...Label) Labels { @@ -119,7 +136,9 @@ func New(ls ...Label) Labels { func FromMap(m map[string]string) Labels { l := make(Labels, 0, len(m)) for k, v := range m { - l = append(l, Label{Name: k, Value: v}) + if v != "" { + l = append(l, Label{Name: k, Value: v}) + } } sort.Sort(l) @@ -133,7 +152,9 @@ func FromStrings(ss ...string) Labels { } var res Labels for i := 0; i < len(ss); i += 2 { - res = append(res, Label{Name: ss[i], Value: ss[i+1]}) + if ss[i+1] != "" { + res = append(res, Label{Name: ss[i], Value: ss[i+1]}) + } } sort.Sort(res) diff --git a/vendor/github.com/prometheus/tsdb/querier.go b/vendor/github.com/prometheus/tsdb/querier.go index b7116c27e2..e7973f16a8 100644 --- a/vendor/github.com/prometheus/tsdb/querier.go +++ b/vendor/github.com/prometheus/tsdb/querier.go @@ -53,6 +53,9 @@ type Series interface { // Iterator returns a new iterator of the data of the series. Iterator() SeriesIterator + + // Chunks returns a copy of the compressed chunks that make up this series. + Chunks() []chunks.Meta } // querier aggregates querying results from time blocks within @@ -332,15 +335,6 @@ func PostingsForMatchers(ix IndexReader, ms ...labels.Matcher) (index.Postings, it := index.Intersect(its...) for _, n := range notIts { - if _, ok := n.(*index.ListPostings); !ok { - // Best to pre-calculate the merged lists via next rather than have a ton - // of seeks in Without. - pl, err := index.ExpandPostings(n) - if err != nil { - return nil, err - } - n = index.NewListPostings(pl) - } it = index.Without(it, n) } @@ -808,6 +802,10 @@ func (s *chunkSeries) Iterator() SeriesIterator { return newChunkSeriesIterator(s.chunks, s.intervals, s.mint, s.maxt) } +func (s *chunkSeries) Chunks() []chunks.Meta { + return s.chunks +} + // SeriesIterator iterates over the data of a time series. type SeriesIterator interface { // Seek advances the iterator forward to the given timestamp. @@ -836,6 +834,14 @@ func (s *chainedSeries) Iterator() SeriesIterator { return newChainedSeriesIterator(s.series...) } +func (s *chainedSeries) Chunks() []chunks.Meta { + var chunks []chunks.Meta + for _, s := range s.series { + chunks = append(s.Chunks()) + } + return chunks +} + // chainedSeriesIterator implements a series iterater over a list // of time-sorted, non-overlapping iterators. type chainedSeriesIterator struct { @@ -907,6 +913,14 @@ func (s *verticalChainedSeries) Iterator() SeriesIterator { return newVerticalMergeSeriesIterator(s.series...) } +func (s *verticalChainedSeries) Chunks() []chunks.Meta { + var chunks []chunks.Meta + for _, s := range s.series { + chunks = append(s.Chunks()) + } + return chunks +} + // verticalMergeSeriesIterator implements a series iterater over a list // of time-sorted, time-overlapping iterators. type verticalMergeSeriesIterator struct { diff --git a/vendor/github.com/prometheus/tsdb/wal/wal.go b/vendor/github.com/prometheus/tsdb/wal/wal.go index a51d43614e..46504f0d97 100644 --- a/vendor/github.com/prometheus/tsdb/wal/wal.go +++ b/vendor/github.com/prometheus/tsdb/wal/wal.go @@ -171,6 +171,7 @@ type WAL struct { pageCompletions prometheus.Counter truncateFail prometheus.Counter truncateTotal prometheus.Counter + currentSegment prometheus.Gauge } // New returns a new WAL over the given directory. @@ -218,8 +219,12 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi Name: "prometheus_tsdb_wal_truncations_total", Help: "Total number of WAL truncations attempted.", }) + w.currentSegment = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_wal_segment_current", + Help: "WAL segment index that TSDB is currently writing to.", + }) if reg != nil { - reg.MustRegister(w.fsyncDuration, w.pageFlushes, w.pageCompletions, w.truncateFail, w.truncateTotal) + reg.MustRegister(w.fsyncDuration, w.pageFlushes, w.pageCompletions, w.truncateFail, w.truncateTotal, w.currentSegment) } _, j, err := w.Segments() @@ -413,7 +418,7 @@ func (w *WAL) setSegment(segment *Segment) error { return err } w.donePages = int(stat.Size() / pageSize) - + w.currentSegment.Set(float64(segment.Index())) return nil } @@ -426,10 +431,10 @@ func (w *WAL) flushPage(clear bool) error { p := w.page clear = clear || p.full() - // No more data will fit into the page. Enqueue and clear it. + // No more data will fit into the page or an implicit clear. + // Enqueue and clear it. if clear { p.alloc = pageSize // Write till end of page. - w.pageCompletions.Inc() } n, err := w.segment.Write(p.buf[p.flushed:p.alloc]) if err != nil { @@ -445,6 +450,7 @@ func (w *WAL) flushPage(clear bool) error { p.alloc = 0 p.flushed = 0 w.donePages++ + w.pageCompletions.Inc() } return nil } @@ -495,10 +501,18 @@ func (w *WAL) Log(recs ...[]byte) error { return nil } -// log writes rec to the log and forces a flush of the current page if its -// the final record of a batch, the record is bigger than the page size or -// the current page is full. +// log writes rec to the log and forces a flush of the current page if: +// - the final record of a batch +// - the record is bigger than the page size +// - the current page is full. func (w *WAL) log(rec []byte, final bool) error { + // When the last page flush failed the page will remain full. + // When the page is full, need to flush it before trying to add more records to it. + if w.page.full() { + if err := w.flushPage(true); err != nil { + return err + } + } // If the record is too big to fit within the active page in the current // segment, terminate the active segment and advance to the next one. // This ensures that records do not cross segment boundaries. diff --git a/vendor/modules.txt b/vendor/modules.txt index a51ee75171..6a1d58ce66 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -343,17 +343,17 @@ github.com/prometheus/prometheus/discovery/zookeeper github.com/prometheus/prometheus/discovery/refresh github.com/prometheus/prometheus/pkg/logging github.com/prometheus/prometheus/util/treecache -# github.com/prometheus/tsdb v0.7.2-0.20190506134726-2ae028114c89 +# github.com/prometheus/tsdb v0.8.0 github.com/prometheus/tsdb/fileutil github.com/prometheus/tsdb/chunkenc -github.com/prometheus/tsdb/labels github.com/prometheus/tsdb -github.com/prometheus/tsdb/wal +github.com/prometheus/tsdb/labels github.com/prometheus/tsdb/chunks github.com/prometheus/tsdb/encoding github.com/prometheus/tsdb/errors github.com/prometheus/tsdb/goversion github.com/prometheus/tsdb/index +github.com/prometheus/tsdb/wal # github.com/samuel/go-zookeeper v0.0.0-20161028232340-1d7be4effb13 github.com/samuel/go-zookeeper/zk # github.com/satori/go.uuid v1.2.0