diff --git a/CHANGELOG.md b/CHANGELOG.md index 20c88bf4280..84d16a52e86 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * [BUGFIX] Fix "magic number" errors and other block mishandling when an ingester forcefully shuts down [#937](https://github.com/grafana/tempo/issues/937) (@mdisibio) * [BUGFIX] Fix compactor memory leak [#806](https://github.com/grafana/tempo/pull/806) (@mdisibio) * [ENHANCEMENT] Added "query blocks" cli option. [#876](https://github.com/grafana/tempo/pull/876) (@joe-elliott) +* [ENHANCEMENT] Added "search blocks" cli option. [#972](https://github.com/grafana/tempo/pull/972) (@joe-elliott) * [ENHANCEMENT] Added traceid to `trace too large message`. [#888](https://github.com/grafana/tempo/pull/888) (@mritunjaysharma394) * [ENHANCEMENT] Add support to tempo workloads to `overrides` from single configmap in microservice mode. [#896](https://github.com/grafana/tempo/pull/896) (@kavirajk) * [ENHANCEMENT] Make `overrides_config` block name consistent with Loki and Cortex in microservice mode. [#906](https://github.com/grafana/tempo/pull/906) (@kavirajk) diff --git a/cmd/tempo-cli/cmd-list-block.go b/cmd/tempo-cli/cmd-list-block.go index 36b3427fa09..bd943e71471 100644 --- a/cmd/tempo-cli/cmd-list-block.go +++ b/cmd/tempo-cli/cmd-list-block.go @@ -7,15 +7,32 @@ import ( "io" "os" "os/signal" + "sort" + "strconv" "syscall" "time" "github.com/dustin/go-humanize" "github.com/google/uuid" + "github.com/grafana/tempo/pkg/model" + "github.com/grafana/tempo/pkg/tempopb" + v1 "github.com/grafana/tempo/pkg/tempopb/common/v1" + "github.com/grafana/tempo/pkg/util" tempodb_backend "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding" + "github.com/grafana/tempo/tempodb/encoding/common" ) +type valueStats struct { + count int +} +type values struct { + all map[string]valueStats + key string + count int +} +type kvPairs map[string]values + type listBlockCmd struct { backendOptions @@ -84,14 +101,19 @@ func dumpBlock(r tempodb_backend.Reader, c tempodb_backend.Compactor, tenantID s dupe := 0 maxObjSize := 0 minObjSize := 0 + maxObjID := common.ID{} + minObjID := common.ID{} + allKVP := kvPairs{} printStats := func() { fmt.Println() fmt.Println("Scanning results:") fmt.Println("Objects scanned : ", i) fmt.Println("Duplicates : ", dupe) - fmt.Println("Smallest object : ", humanize.Bytes(uint64(minObjSize))) - fmt.Println("Largest object : ", humanize.Bytes(uint64(maxObjSize))) + fmt.Println("Smallest object : ", humanize.Bytes(uint64(minObjSize)), " : ", util.TraceIDToHexString(minObjID)) + fmt.Println("Largest object : ", humanize.Bytes(uint64(maxObjSize)), " : ", util.TraceIDToHexString(maxObjID)) + fmt.Println("") + printKVPairs(allKVP) } // Print stats on ctrl+c @@ -116,10 +138,12 @@ func dumpBlock(r tempodb_backend.Reader, c tempodb_backend.Compactor, tenantID s if len(obj) > maxObjSize { maxObjSize = len(obj) + maxObjID = objID } if len(obj) < minObjSize || minObjSize == 0 { minObjSize = len(obj) + minObjID = objID } if bytes.Equal(objID, prevID) { @@ -127,6 +151,19 @@ func dumpBlock(r tempodb_backend.Reader, c tempodb_backend.Compactor, tenantID s } copy(prevID, objID) + + trace, err := model.Unmarshal(obj, meta.DataEncoding) + if err != nil { + return err + } + kvp := extractKVPairs(trace) + for k, vs := range kvp { + addKey(allKVP, k, 1) + for v := range vs.all { + addVal(allKVP, k, v, 1) + } + } + i++ if i%100000 == 0 { fmt.Println("Record: ", i) @@ -138,3 +175,103 @@ func dumpBlock(r tempodb_backend.Reader, c tempodb_backend.Compactor, tenantID s return nil } + +// helper methods for calculating label stats +func printKVPairs(kvp kvPairs) { + allValues := make([]values, 0, len(kvp)) + for _, vs := range kvp { + allValues = append(allValues, vs) + } + sort.Slice(allValues, func(i, j int) bool { + return relativeValue(allValues[i]) > relativeValue(allValues[j]) + }) + for _, vs := range allValues { + fmt.Println("key:", vs.key, "count:", vs.count, "len:", len(vs.all), "value:", relativeValue(vs)) + for a, c := range vs.all { + fmt.Printf(" %s:\t%.2f\n", a, float64(c.count)/float64(vs.count)) + } + } +} + +// attempts to calculate the "value" that storing a given label would provide by. currently (number of times appeared)^2 / cardinality +// this is not researched and could definitely be improved +func relativeValue(v values) float64 { + return (float64(v.count) * float64(v.count)) / float64(len(v.all)) +} +func extractKVPairs(t *tempopb.Trace) kvPairs { + kvp := kvPairs{} + for _, b := range t.Batches { + spanCount := 0 + for _, ils := range b.InstrumentationLibrarySpans { + for _, s := range ils.Spans { + spanCount++ + for _, a := range s.Attributes { + val, yay := stringVal(a.Value) + if !yay { + continue + } + addKey(kvp, a.Key, 1) + addVal(kvp, a.Key, val, 1) + } + } + } + for _, a := range b.Resource.Attributes { + val, yay := stringVal(a.Value) + if !yay { + continue + } + addKey(kvp, a.Key, spanCount) + addVal(kvp, a.Key, val, spanCount) + } + } + return kvp +} +func addKey(kvp kvPairs, key string, count int) { + v, ok := kvp[key] + if !ok { + v = values{ + all: map[string]valueStats{}, + key: key, + } + } + v.count += count + kvp[key] = v +} +func addVal(kvp kvPairs, key string, val string, count int) { + v := kvp[key] + stats, ok := v.all[val] + if !ok { + stats = valueStats{ + count: 0, + } + } + stats.count += count + v.all[val] = stats + kvp[key] = v +} +func stringVal(v *v1.AnyValue) (string, bool) { + if sVal, ok := v.Value.(*v1.AnyValue_StringValue); ok { + return sVal.StringValue, true + } + if nVal, ok := v.Value.(*v1.AnyValue_IntValue); ok { + return strconv.FormatInt(nVal.IntValue, 10), true + } + if dVal, ok := v.Value.(*v1.AnyValue_DoubleValue); ok { + return fmt.Sprintf("%f", dVal.DoubleValue), true + // strconv.FormatFloat() + } + if bVal, ok := v.Value.(*v1.AnyValue_BoolValue); ok { + if bVal.BoolValue { + return "true", true + } + return "false", true + } + // todo? add support for these? + // if kVal, ok := v.Value.(*v1.AnyValue_KvlistValue); ok { + // return fmt.Sprintf("kvval %v", kVal.KvlistValue) // better way? + // } + // if aVal, ok := v.Value.(*v1.AnyValue_ArrayValue); ok { + // return fmt.Sprintf("arrayval %v", aVal.ArrayValue) // better way? + // } + return "", false +} diff --git a/cmd/tempo-cli/cmd-search.go b/cmd/tempo-cli/cmd-search.go new file mode 100644 index 00000000000..587737c7d88 --- /dev/null +++ b/cmd/tempo-cli/cmd-search.go @@ -0,0 +1,181 @@ +package main + +import ( + "context" + "fmt" + "io" + "time" + + "github.com/google/uuid" + "github.com/grafana/tempo/pkg/boundedwaitgroup" + "github.com/grafana/tempo/pkg/model" + "github.com/grafana/tempo/pkg/tempopb" + "github.com/grafana/tempo/pkg/util" + "github.com/grafana/tempo/tempodb/backend" + "github.com/grafana/tempo/tempodb/encoding" + "github.com/grafana/tempo/tempodb/encoding/common" +) + +const ( + layoutString = "2006-01-02T15:04:05" + chunkSize = 10 * 1024 * 1024 + iteratorBuffer = 10000 + limit = 20 +) + +type searchBlocksCmd struct { + backendOptions + + Name string `arg:"" help:"attribute name to search for"` + Value string `arg:"" help:"attribute value to search for"` + Start string `arg:"" help:"start of time range to search (YYYY-MM-DDThh:mm:ss)"` + End string `arg:"" help:"end of time range to search (YYYY-MM-DDThh:mm:ss)"` + TenantID string `arg:"" help:"tenant ID to search"` +} + +func (cmd *searchBlocksCmd) Run(opts *globalOptions) error { + r, _, _, err := loadBackend(&cmd.backendOptions, opts) + if err != nil { + return err + } + + startTime, err := time.Parse(layoutString, cmd.Start) + if err != nil { + return err + } + endTime, err := time.Parse(layoutString, cmd.End) + if err != nil { + return err + } + + ctx := context.Background() + + blockIDs, err := r.Blocks(ctx, cmd.TenantID) + if err != nil { + return err + } + + fmt.Println("Total Blocks:", len(blockIDs)) + + // Load in parallel + wg := boundedwaitgroup.New(20) + resultsCh := make(chan *backend.BlockMeta, len(blockIDs)) + for _, id := range blockIDs { + wg.Add(1) + + go func(id2 uuid.UUID) { + defer wg.Done() + + // search here + meta, err := r.BlockMeta(ctx, id2, cmd.TenantID) + if err == backend.ErrDoesNotExist { + return + } + if err != nil { + fmt.Println("Error querying block:", err) + return + } + if meta.StartTime.Unix() <= endTime.Unix() && + meta.EndTime.Unix() >= startTime.Unix() { + resultsCh <- meta + } + }(id) + } + + wg.Wait() + close(resultsCh) + + blockmetas := []*backend.BlockMeta{} + for q := range resultsCh { + blockmetas = append(blockmetas, q) + } + + fmt.Println("Blocks In Range:", len(blockmetas)) + foundids := []common.ID{} + for _, meta := range blockmetas { + block, err := encoding.NewBackendBlock(meta, r) + if err != nil { + return err + } + + // todo : graduated chunk sizes will increase throughput. i.e. first request should be small to feed the below parsing faster + // later queries should use larger chunk sizes to be more efficient + iter, err := block.Iterator(chunkSize) + if err != nil { + return err + } + + prefetchIter := encoding.NewPrefetchIterator(ctx, iter, iteratorBuffer) + ids, err := searchIterator(prefetchIter, meta.DataEncoding, cmd.Name, cmd.Value, limit) + prefetchIter.Close() + if err != nil { + return err + } + + foundids = append(foundids, ids...) + if len(foundids) >= limit { + break + } + } + + fmt.Println("Matching Traces:", len(foundids)) + for _, id := range foundids { + fmt.Println(" ", util.TraceIDToHexString(id)) + } + + return nil +} + +func searchIterator(iter encoding.Iterator, dataEncoding string, name string, value string, limit int) ([]common.ID, error) { + ctx := context.Background() + found := []common.ID{} + + for { + id, obj, err := iter.Next(ctx) + if err == io.EOF { + break + } + if err != nil { + return nil, err + } + + // todo : parrallelize unmarshal and search + trace, err := model.Unmarshal(obj, dataEncoding) + if err != nil { + return nil, err + } + + if traceContainsKeyValue(trace, name, value) { + found = append(found, id) + } + + if len(found) >= limit { + break + } + } + + return found, nil +} + +func traceContainsKeyValue(trace *tempopb.Trace, name string, value string) bool { + // todo : support other attribute types besides string + for _, b := range trace.Batches { + for _, a := range b.Resource.Attributes { + if a.Key == name && a.Value.GetStringValue() == value { + return true + } + } + + for _, ils := range b.InstrumentationLibrarySpans { + for _, s := range ils.Spans { + for _, a := range s.Attributes { + if a.Key == name && a.Value.GetStringValue() == value { + return true + } + } + } + } + } + + return false +} diff --git a/cmd/tempo-cli/main.go b/cmd/tempo-cli/main.go index cdd9db91a14..d3068a7f8de 100644 --- a/cmd/tempo-cli/main.go +++ b/cmd/tempo-cli/main.go @@ -59,6 +59,10 @@ var cli struct { API queryCmd `cmd:"" help:"query tempo http api"` Blocks queryBlocksCmd `cmd:"" help:"query for a traceid directly from backend blocks"` } `cmd:""` + + Search struct { + Blocks searchBlocksCmd `cmd:"" help:"search for a traceid directly from backend blocks"` + } `cmd:""` } func main() { diff --git a/docs/tempo/website/operations/tempo_cli.md b/docs/tempo/website/operations/tempo_cli.md index 2a1d2871fab..1549e13dac1 100644 --- a/docs/tempo/website/operations/tempo_cli.md +++ b/docs/tempo/website/operations/tempo_cli.md @@ -235,3 +235,25 @@ tempo-cli gen index --backend=local --bucket=./cmd/tempo-cli/test-data/ single-t ``` The index will be generated at the required location under the block folder. + +## Search Blocks Command +Search blocks in a given time range for a specific key/value pair. +```bash +tempo-cli search blocks +``` + **Note:** can be intense as it downloads all relevant blocks and iterates through them. + +Arguments: +- `name` Name of the attribute to search for e.g. `http.post` +- `value` Value of the attribute to search for e.g. `GET` +- `start` Start of the time range to search: (YYYY-MM-DDThh:mm:ss) +- `end` End of the time range to search: (YYYY-MM-DDThh:mm:ss) +- `tenant-id` Tenant to search. + +Options: +See backend options above. + +**Example:** +```bash +tempo-cli search blocks http.post GET 2021-09-21T00:00:00 2021-09-21T00:05:00 single-tenant --backend=gcs --bucket=tempo-trace-data +``` \ No newline at end of file diff --git a/tempodb/encoding/iterator_multiblock_test.go b/tempodb/encoding/iterator_multiblock_test.go index 141d39b6f7c..09ec260a309 100644 --- a/tempodb/encoding/iterator_multiblock_test.go +++ b/tempodb/encoding/iterator_multiblock_test.go @@ -8,6 +8,7 @@ import ( "github.com/grafana/tempo/tempodb/encoding/common" "github.com/stretchr/testify/require" + "go.uber.org/atomic" ) var _ Iterator = (*testIterator)(nil) @@ -17,7 +18,7 @@ type testIterator struct { ids []common.ID data [][]byte errors []error - i int + i atomic.Int32 } func (i *testIterator) Add(id common.ID, data []byte, err error) { @@ -27,14 +28,16 @@ func (i *testIterator) Add(id common.ID, data []byte, err error) { } func (i *testIterator) Next(context.Context) (common.ID, []byte, error) { - if i.i == len(i.ids) { + idx := int(i.i.Load()) + + if idx == len(i.ids) { return nil, nil, io.EOF } - id := i.ids[i.i] - data := i.data[i.i] - err := i.errors[i.i] - i.i++ + id := i.ids[idx] + data := i.data[idx] + err := i.errors[idx] + i.i.Inc() return id, data, err } diff --git a/tempodb/encoding/iterator_prefetch.go b/tempodb/encoding/iterator_prefetch.go new file mode 100644 index 00000000000..03399540b01 --- /dev/null +++ b/tempodb/encoding/iterator_prefetch.go @@ -0,0 +1,102 @@ +package encoding + +import ( + "context" + "io" + + "github.com/grafana/tempo/tempodb/encoding/common" + + "github.com/uber-go/atomic" +) + +type prefetchIterator struct { + iter Iterator + resultsCh chan iteratorResult + quitCh chan struct{} + err atomic.Error +} + +var _ Iterator = (*prefetchIterator)(nil) + +// NewPrefetchIterator Creates a new multiblock iterator. Iterates concurrently in a separate goroutine and results are buffered. +func NewPrefetchIterator(ctx context.Context, iter Iterator, bufferSize int) Iterator { + i := prefetchIterator{ + iter: iter, + resultsCh: make(chan iteratorResult, bufferSize), + quitCh: make(chan struct{}, 1), + } + + go i.iterate(ctx) + + return &i +} + +// Close iterator, signals goroutine to exit if still running. +func (i *prefetchIterator) Close() { + select { + // Signal goroutine to quit. Non-blocking, handles if already + // signalled or goroutine not listening to channel. + case i.quitCh <- struct{}{}: + default: + return + } +} + +// Next returns the next values or error. Blocking read when data not yet available. +func (i *prefetchIterator) Next(ctx context.Context) (common.ID, []byte, error) { + if err := i.err.Load(); err != nil { + return nil, nil, err + } + + select { + case <-ctx.Done(): + return nil, nil, ctx.Err() + + case res, ok := <-i.resultsCh: + if !ok { + // Closed due to error? + if err := i.err.Load(); err != nil { + return nil, nil, err + } + return nil, nil, io.EOF + } + + return res.id, res.object, nil + } +} + +func (i *prefetchIterator) iterate(ctx context.Context) { + defer close(i.resultsCh) + defer i.iter.Close() + + for { + id, obj, err := i.iter.Next(ctx) + if err == io.EOF { + return + } + if err != nil { + i.err.Store(err) + } + + // Copy slices allows data to escape the iterators + res := iteratorResult{ + id: id, + object: obj, + } + + select { + + case <-ctx.Done(): + i.err.Store(ctx.Err()) + return + + case <-i.quitCh: + // Signalled to quit early + return + + case i.resultsCh <- res: + // Send results. Blocks until available buffer in channel + // created by receiving in Next() + } + } +} diff --git a/tempodb/encoding/iterator_prefetch_test.go b/tempodb/encoding/iterator_prefetch_test.go new file mode 100644 index 00000000000..6f04e499d9a --- /dev/null +++ b/tempodb/encoding/iterator_prefetch_test.go @@ -0,0 +1,102 @@ +package encoding + +import ( + "context" + "errors" + "io" + "testing" + "time" + + "github.com/grafana/tempo/tempodb/encoding/common" + "github.com/stretchr/testify/assert" +) + +func TestPrefetchIterates(t *testing.T) { + tests := []struct { + ids []common.ID + objs [][]byte + err error + }{ + { + ids: []common.ID{ + {0x01}, + {0x02}, + {0x03}, + }, + objs: [][]byte{ + {0x05}, + {0x06}, + {0x06}, + }, + err: nil, + }, + { + ids: []common.ID{ + {0x01}, + {0x02}, + {0x03}, + }, + objs: [][]byte{ + {0x05}, + {0x06}, + {0x06}, + }, + err: errors.New("wups"), + }, + } + + ctx := context.Background() + for _, tc := range tests { + iter := &testIterator{} + for i := 0; i < len(tc.ids); i++ { + iter.Add(tc.ids[i], tc.objs[i], tc.err) + } + prefetchIter := NewPrefetchIterator(ctx, iter, 10) + + count := 0 + for { + id, obj, err := prefetchIter.Next(context.TODO()) + if err == io.EOF { + break + } + if err != nil { + assert.Equal(t, tc.err, err) + break + } + assert.Equal(t, tc.ids[count], id) + assert.Equal(t, tc.objs[count], obj) + count++ + } + + if tc.err == nil { + assert.Equal(t, len(tc.ids), count) + } + } +} + +func TestPrefetchPrefetches(t *testing.T) { + ctx := context.Background() + + iter := &testIterator{} + iter.Add([]byte{0x01}, []byte{0x01}, nil) + iter.Add([]byte{0x01}, []byte{0x01}, nil) + iter.Add([]byte{0x01}, []byte{0x01}, nil) + + _ = NewPrefetchIterator(ctx, iter, 10) + time.Sleep(100 * time.Millisecond) + assert.Equal(t, int32(3), iter.i.Load()) // prefetch all 3 + + iter = &testIterator{} + iter.Add([]byte{0x01}, []byte{0x01}, nil) + iter.Add([]byte{0x01}, []byte{0x01}, nil) + iter.Add([]byte{0x01}, []byte{0x01}, nil) + iter.Add([]byte{0x01}, []byte{0x01}, nil) + iter.Add([]byte{0x01}, []byte{0x01}, nil) + + prefetchIter := NewPrefetchIterator(ctx, iter, 3) + time.Sleep(100 * time.Millisecond) + assert.Equal(t, int32(4), iter.i.Load()) // prefetch only the buffer. this happens to be 1 more than the passed buffer. maybe one day we will "correct" that + _, _, _ = prefetchIter.Next(ctx) + time.Sleep(100 * time.Millisecond) + assert.Equal(t, int32(5), iter.i.Load()) // get all +}