Skip to content

Commit

Permalink
Drop Vulture Loki dependency (#509)
Browse files Browse the repository at this point in the history
* Remove Loki deps from Vulture

Signed-off-by: Daniel González Lopes <[email protected]>

* Remove unneeded interation code

Signed-off-by: Daniel González Lopes <[email protected]>

* Change docs

Signed-off-by: Daniel González Lopes <[email protected]>

* Add two endpoints and refactor configs

Signed-off-by: Daniel González Lopes <[email protected]>

* Fix deps

Signed-off-by: Daniel González Lopes <[email protected]>

* Add missing error checks

Signed-off-by: Daniel González Lopes <[email protected]>

* Use time-based approach for seed generation

Signed-off-by: Daniel González Lopes <[email protected]>

* Clean

Signed-off-by: Daniel González Lopes <[email protected]>

* Bump default backoff duration

Signed-off-by: Daniel González Lopes <[email protected]>

* Create beefy traces

Signed-off-by: Daniel González Lopes <[email protected]>

* Randomize strings

Signed-off-by: Daniel González Lopes <[email protected]>

* Add change to changelog

Signed-off-by: Daniel González Lopes <[email protected]>

* Fix vendor

Signed-off-by: Daniel González Lopes <[email protected]>

* Address review comments

Signed-off-by: Daniel González Lopes <[email protected]>

* Move out of loop

Signed-off-by: Daniel González Lopes <[email protected]>

* From slot to interval + split tickers

Signed-off-by: Daniel González Lopes <[email protected]>

* Fix deps

Signed-off-by: Daniel González Lopes <[email protected]>

* Fix deps2

Signed-off-by: Daniel González Lopes <[email protected]>

* Fix things?

Signed-off-by: Daniel González Lopes <[email protected]>

* Fix things

Signed-off-by: Daniel González Lopes <[email protected]>

* Fix things

Signed-off-by: Daniel González Lopes <[email protected]>

* Remove explicit

Signed-off-by: Daniel González Lopes <[email protected]>

* Fix deps

Signed-off-by: Daniel González Lopes <[email protected]>
  • Loading branch information
dgzlopes authored Mar 5, 2021
1 parent 1cbfc21 commit c8cad66
Show file tree
Hide file tree
Showing 14 changed files with 185 additions and 6,483 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* [CHANGE/BUGFIX] Rename `tempodb_compaction_objects_written` and `tempodb_compaction_bytes_written` metrics to `tempodb_compaction_objects_written_total` and `tempodb_compaction_bytes_written_total`. [#524](https://github.com/grafana/tempo/pull/524)
* [CHANGE] Replace tempo-cli `list block` `--check-dupes` option with `--scan` and collect additional stats [#534](https://github.com/grafana/tempo/pull/534)
* [FEATURE] Added block compression. This is a **breaking change** b/c some configuration fields moved. [#504](https://github.com/grafana/tempo/pull/504)
* [CHANGE] Drop Vulture Loki dependency. This is a **breaking change**. [#509](https://github.com/grafana/tempo/pull/509)
* [ENHANCEMENT] Serve config at the "/config" endpoint. [#446](https://github.com/grafana/tempo/pull/446)
* [ENHANCEMENT] Switch blocklist polling and retention to different concurrency mechanism, add configuration options. [#475](https://github.com/grafana/tempo/issues/475)
* [ENHANCEMENT] Add S3 options region and forcepathstyle [#431](https://github.com/grafana/tempo/issues/431)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Check out the [Integration Guides](https://grafana.com/docs/tempo/latest/guides/
tempo-query is jaeger-query with a [hashicorp go-plugin](https://github.com/jaegertracing/jaeger/tree/master/plugin/storage/grpc) to support querying Tempo. Please note that tempo only looks up a trace by ID. Searching for traces is not supported, and the service and operation lists will not populate.

### tempo-vulture
tempo-vulture is tempo's bird themed consistency checking tool. It queries Loki, extracts trace ids and then queries tempo. It metrics 404s and traces with missing spans.
tempo-vulture is tempo's bird themed consistency checking tool. It pushes traces and queries Tempo. It metrics 404s and traces with missing spans.

### tempo-cli
tempo-cli is the place to put any utility functionality related to tempo. See [Documentation](https://grafana.com/docs/tempo/latest/cli/) for more info.
Expand Down
306 changes: 171 additions & 135 deletions cmd/tempo-vulture/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,36 @@ package main

import (
"bytes"
"encoding/json"
"context"
"flag"
"fmt"
"io/ioutil"
"log"
"math/rand"
"net"
"net/http"
"net/url"
"regexp"
"strconv"
"strings"
"time"

"github.com/golang/glog"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util"
jaeger_grpc "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/grpc"
thrift "github.com/jaegertracing/jaeger/thrift-gen/jaeger"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
"google.golang.org/grpc"
)

var (
prometheusListenAddress string
prometheusPath string

lokiBaseURL string
lokiQuery string
lokiUser string
lokiPass string

tempoBaseURL string
tempoOrgID string
tempoBackoffDuration time.Duration
tempoQueryURL string
tempoPushURL string
tempoOrgID string
tempoWriteBackoffDuration time.Duration
tempoReadBackoffDuration time.Duration
tempoRetentionDuration time.Duration
)

type traceMetrics struct {
Expand All @@ -45,95 +44,194 @@ func init() {
flag.StringVar(&prometheusPath, "prometheus-path", "/metrics", "The path to publish Prometheus metrics to.")
flag.StringVar(&prometheusListenAddress, "prometheus-listen-address", ":80", "The address to listen on for Prometheus scrapes.")

flag.StringVar(&lokiBaseURL, "loki-base-url", "", "The base URL (scheme://hostname) at which to find loki.")
flag.StringVar(&lokiQuery, "loki-query", "", "The query to use to find traceIDs in Loki.")
flag.StringVar(&lokiUser, "loki-user", "", "The user to use for Loki basic auth.")
flag.StringVar(&lokiPass, "loki-pass", "", "The password to use for Loki basic auth.")

flag.StringVar(&tempoBaseURL, "tempo-base-url", "", "The base URL (scheme://hostname) at which to find tempo.")
flag.StringVar(&tempoQueryURL, "tempo-query-url", "", "The URL (scheme://hostname) at which to query Tempo.")
flag.StringVar(&tempoPushURL, "tempo-push-url", "", "The URL (scheme://hostname) at which to push traces to Tempo.")
flag.StringVar(&tempoOrgID, "tempo-org-id", "", "The orgID to query in Tempo")
flag.DurationVar(&tempoBackoffDuration, "tempo-backoff-duration", time.Second, "The amount of time to pause between tempo calls")
flag.DurationVar(&tempoWriteBackoffDuration, "tempo-write-backoff-duration", 15*time.Second, "The amount of time to pause between write Tempo calls")
flag.DurationVar(&tempoReadBackoffDuration, "tempo-read-backoff-duration", 30*time.Second, "The amount of time to pause between read Tempo calls")
flag.DurationVar(&tempoRetentionDuration, "tempo-retention-duration", 336*time.Hour, "The block retention that Tempo is using")
}

func main() {
flag.Parse()

glog.Error("Application Starting")
glog.Error("Tempo Vulture Starting")

testDurations := []time.Duration{
24 * time.Hour,
12 * time.Hour,
6 * time.Hour,
3 * time.Hour,
time.Hour,
30 * time.Minute,
}
startTime := time.Now().Unix()
tickerWrite := time.NewTicker(tempoWriteBackoffDuration)
tickerRead := time.NewTicker(tempoReadBackoffDuration)
interval := int64(tempoWriteBackoffDuration / time.Second)

ticker := time.NewTicker(15 * time.Second)
// Write
go func() {
for {
<-ticker.C

for _, duration := range testDurations {
<-tickerWrite.C

rand.Seed((time.Now().Unix() / interval) * interval)
c, err := newJaegerGRPCClient(tempoPushURL)
if err != nil {
glog.Error("error creating grpc client", err)
metricErrorTotal.Inc()
continue
}

// query loki for trace ids
lines, err := queryLoki(lokiBaseURL, lokiQuery, duration, lokiUser, lokiPass)
traceIDHigh := rand.Int63()
traceIDLow := rand.Int63()
for i := int64(0); i < generateRandomInt(1, 100); i++ {
err = c.EmitBatch(context.Background(), makeThriftBatch(traceIDHigh, traceIDLow))
if err != nil {
glog.Error("error querying Loki ", err)
glog.Error("error pushing batch to Tempo ", err)
metricErrorTotal.Inc()
continue
}
ids := extractTraceIDs(lines)
}
}
}()

// query tempo for trace ids
metrics, err := queryTempoAndAnalyze(tempoBaseURL, tempoBackoffDuration, ids)
if err != nil {
glog.Error("error querying Tempo ", err)
metricErrorTotal.Inc()
continue
}
// Read
go func() {
for {
<-tickerRead.C

currentTime := time.Now().Unix()

metricTracesInspected.WithLabelValues(strconv.Itoa(int(duration.Seconds()))).Add(float64(metrics.requested))
metricTracesErrors.WithLabelValues("notfound", strconv.Itoa(int(duration.Seconds()))).Add(float64(metrics.notfound))
metricTracesErrors.WithLabelValues("missingspans", strconv.Itoa(int(duration.Seconds()))).Add(float64(metrics.missingSpans))
// don't query traces before retention
if (currentTime - startTime) > int64(tempoRetentionDuration/time.Second) {
startTime = currentTime - int64(tempoRetentionDuration/time.Second)
}

// pick past interval and re-generate trace
rand.Seed((generateRandomInt(startTime, currentTime) / interval) * interval)
hexID := fmt.Sprintf("%016x%016x", rand.Int63(), rand.Int63())

// query the trace
metrics, err := queryTempoAndAnalyze(tempoQueryURL, hexID)
if err != nil {
glog.Error("error querying Tempo ", err)
metricErrorTotal.Inc()
metricTracesErrors.WithLabelValues("failed").Inc()
continue
}

metricTracesInspected.Add(float64(metrics.requested))
metricTracesErrors.WithLabelValues("notfound").Add(float64(metrics.notfound))
metricTracesErrors.WithLabelValues("missingspans").Add(float64(metrics.missingSpans))
}
}()

http.Handle(prometheusPath, promhttp.Handler())
log.Fatal(http.ListenAndServe(prometheusListenAddress, nil))
}

func queryTempoAndAnalyze(baseURL string, backoff time.Duration, traceIDs []string) (*traceMetrics, error) {
tm := &traceMetrics{
requested: len(traceIDs),
func newJaegerGRPCClient(endpoint string) (*jaeger_grpc.Reporter, error) {
// remove scheme and port
u, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
host, _, err := net.SplitHostPort(u.Host)
if err != nil {
return nil, err
}
// new jaeger grpc exporter
conn, err := grpc.Dial(host+":14250", grpc.WithInsecure())
if err != nil {
return nil, err
}
logger, err := zap.NewDevelopment()
if err != nil {
return nil, err
}
return jaeger_grpc.NewReporter(conn, nil, logger), err
}

for _, id := range traceIDs {
time.Sleep(backoff)
func generateRandomString() string {
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

glog.Error("tempo url ", baseURL+"/api/traces/"+id)
trace, err := util.QueryTrace(baseURL, id, tempoOrgID)
if err == util.ErrTraceNotFound {
glog.Error("trace not found ", id)
tm.notfound++
continue
}
if err != nil {
return nil, err
}
s := make([]rune, generateRandomInt(5, 20))
for i := range s {
s[i] = letters[rand.Intn(len(letters))]
}
return string(s)
}

if len(trace.Batches) == 0 {
glog.Error("trace not found", id)
tm.notfound++
continue
}
func generateRandomTags() []*thrift.Tag {
var tags []*thrift.Tag
count := generateRandomInt(1, 5)
for i := int64(0); i < count; i++ {
value := generateRandomString()
tags = append(tags, &thrift.Tag{
Key: generateRandomString(),
VStr: &value,
})
}
return tags
}

// iterate through
if hasMissingSpans(trace) {
glog.Error("has missing spans", id)
tm.missingSpans++
}
func generateRandomLogs() []*thrift.Log {
var logs []*thrift.Log
count := generateRandomInt(1, 5)
for i := int64(0); i < count; i++ {
logs = append(logs, &thrift.Log{
Timestamp: time.Now().Unix(),
Fields: generateRandomTags(),
})
}
return logs
}

func makeThriftBatch(TraceIDHigh int64, TraceIDLow int64) *thrift.Batch {
var spans []*thrift.Span
count := generateRandomInt(1, 5)
for i := int64(0); i < count; i++ {
spans = append(spans, &thrift.Span{
TraceIdLow: TraceIDLow,
TraceIdHigh: TraceIDHigh,
SpanId: rand.Int63(),
ParentSpanId: 0,
OperationName: generateRandomString(),
References: nil,
Flags: 0,
StartTime: time.Now().Unix(),
Duration: rand.Int63(),
Tags: generateRandomTags(),
Logs: generateRandomLogs(),
})
}
return &thrift.Batch{Spans: spans}
}

func generateRandomInt(min int64, max int64) int64 {
number := min + rand.Int63n(max-min)
if number == min {
return generateRandomInt(min, max)
}
return number
}

func queryTempoAndAnalyze(baseURL string, traceID string) (*traceMetrics, error) {
tm := &traceMetrics{
requested: 1,
}
glog.Error("tempo url ", baseURL+"/api/traces/"+traceID)
trace, err := util.QueryTrace(baseURL, traceID, tempoOrgID)
if err == util.ErrTraceNotFound {
glog.Error("trace not found ", traceID)
tm.notfound++
}
if err != nil {
return nil, err
}

if len(trace.Batches) == 0 {
glog.Error("trace not found", traceID)
tm.notfound++
}

// iterate through
if hasMissingSpans(trace) {
glog.Error("has missing spans", traceID)
tm.missingSpans++
}

return tm, nil
Expand Down Expand Up @@ -175,65 +273,3 @@ func hasMissingSpans(t *tempopb.Trace) bool {

return false
}

func queryLoki(baseURL string, query string, durationAgo time.Duration, user string, pass string) ([]string, error) {
start := time.Now().Add(-durationAgo).Add(-30 * time.Second) // offsetting 30 seconds prevents it from querying logs from now which naturally have a high percentage of errors
end := start.Add(30 * time.Minute)
url := baseURL + fmt.Sprintf("/api/prom/query?limit=10&start=%d&end=%d&query=%s", start.UnixNano(), end.UnixNano(), url.QueryEscape(query))

glog.Error("loki url ", url)

req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, fmt.Errorf("error building request %v", err)
}
req.SetBasicAuth(user, pass)

resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("error querying %v", err)
}
defer func() {
if err := resp.Body.Close(); err != nil {
glog.Error("error closing body ", err)
}
}()

if resp.StatusCode/100 != 2 {
buf, _ := ioutil.ReadAll(resp.Body)
return nil, fmt.Errorf("error response from server: %s %v", string(buf), err)
}
var decoded logproto.QueryResponse
err = json.NewDecoder(resp.Body).Decode(&decoded)
if err != nil {
return nil, fmt.Errorf("error decoding response %v", err)
}

lines := make([]string, 0)
for _, stream := range decoded.Streams {
for _, entry := range stream.Entries {
lines = append(lines, entry.Line)
}
}

return lines, nil
}

func extractTraceIDs(lines []string) []string {
regex := regexp.MustCompile("traceID=(.*?) ")
ids := make([]string, 0, len(lines))

for _, l := range lines {
match := regex.FindString(l)

if match != "" {
traceID := strings.TrimSpace(strings.TrimPrefix(match, "traceID="))
if len(traceID)%2 == 1 {
traceID = "0" + traceID
}
ids = append(ids, traceID)
}
}

return ids
}
Loading

0 comments on commit c8cad66

Please sign in to comment.