Skip to content

Commit

Permalink
Fixes after rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
sarahzinger committed Nov 20, 2023
1 parent 7dfbf2f commit 5a046e6
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 14 deletions.
70 changes: 63 additions & 7 deletions pkg/opensearch/response_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,21 +97,26 @@ func (rp *responseParser) parseResponse() (*backend.QueryDataResponse, error) {
Frames: data.Frames{},
}

// trace span condition
// trace queries are sent from the FE with a metrics field, returning early so the switch doesn't overwrite the response from the traces query
if target.luceneQueryType == luceneQueryTypeTraces {
queryRes = processTraceSpansResponse(res, queryRes)
result.Responses[target.RefID] = queryRes
continue
var queryType string
if target.luceneQueryType == "Traces" {
queryType = "Traces"
} else {
queryType = target.Metrics[0].Type
}

switch target.Metrics[0].Type {
switch queryType {
case rawDataType:
queryRes = processRawDataResponse(res, rp.ConfiguredFields, queryRes)
case rawDocumentType:
queryRes = processRawDocumentResponse(res, target.RefID, queryRes)
case logsType:
queryRes = processLogsResponse(res, rp.ConfiguredFields, queryRes)
case luceneQueryTypeTraces:
if strings.HasPrefix(target.RawQuery, "traceId:") {
queryRes = processTraceSpansResponse(res, queryRes)
} else {
queryRes = processTraceListResponse(res, rp.DSSettings.UID, rp.DSSettings.Name, queryRes)
}
default:
props := make(map[string]string)
err := rp.processBuckets(res.Aggregations, target, &queryRes, props, 0)
Expand All @@ -124,6 +129,7 @@ func (rp *responseParser) parseResponse() (*backend.QueryDataResponse, error) {

result.Responses[target.RefID] = queryRes
}

return result, nil
}

Expand Down Expand Up @@ -265,6 +271,56 @@ func processTraceSpansResponse(res *es.SearchResponse, queryRes backend.DataResp
queryRes.Frames = data.Frames{frame}
return queryRes
}

func processTraceListResponse(res *es.SearchResponse, dsUID string, dsName string, queryRes backend.DataResponse) backend.DataResponse {
// trace list queries are hardcoded with a fairly hardcoded response format
// but es.SearchResponse is deliberately not typed as in other query cases it can be much more open ended
rawTraces := res.Aggregations["traces"].(map[string]interface{})["buckets"].([]interface{})

// get values from raw traces response
traceIds := []string{}
traceGroups := []string{}
traceLatencies := []float64{}
traceErrorCounts := []float64{}
traceLastUpdated := []time.Time{}
for _, trace := range rawTraces {
t := trace.(map[string]interface{})

traceIds = append(traceIds, t["key"].(string))
traceGroups = append(traceGroups, t["trace_group"].(map[string]interface{})["buckets"].([]interface{})[0].(map[string]interface{})["key"].(string))
traceLatencies = append(traceLatencies, t["latency"].(map[string]interface{})["value"].(float64))
traceErrorCounts = append(traceErrorCounts, t["error_count"].(map[string]interface{})["doc_count"].(float64))
lastUpdated := t["last_updated"].(map[string]interface{})["value"].(float64)
traceLastUpdated = append(traceLastUpdated, time.Unix(0, int64(lastUpdated)*int64(time.Millisecond)))
}

allFields := make([]*data.Field, 0, 5)
traceIdColumn := data.NewField("Trace Id", nil, traceIds)
traceIdColumn.Config = &data.FieldConfig{
Links: []data.DataLink{
{
Internal: &data.InternalDataLink{
Query: map[string]interface{}{
"query": "traceId: ${__value.raw}",
"luceneQueryType": "Traces",
},
DatasourceUID: dsUID,
DatasourceName: dsName,
},
},
},
}

allFields = append(allFields, traceIdColumn)
allFields = append(allFields, data.NewField("Trace Group", nil, traceGroups))
allFields = append(allFields, data.NewField("Latency (ms)", nil, traceLatencies))
allFields = append(allFields, data.NewField("Error Count", nil, traceErrorCounts))
allFields = append(allFields, data.NewField("Last Updated", nil, traceLastUpdated))

queryRes.Frames = data.Frames{data.NewFrame("Trace List", allFields...)}
return queryRes
}

func processLogsResponse(res *es.SearchResponse, configuredFields es.ConfiguredFields, queryRes backend.DataResponse) backend.DataResponse {
propNames := make(map[string]bool)
docs := make([]map[string]interface{}, len(res.Hits.Hits))
Expand Down
86 changes: 86 additions & 0 deletions pkg/opensearch/response_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2573,3 +2573,89 @@ func sortLogsByTimestamp(rawObject *data.Field, t *testing.T) []Log {
})
return sortedArray
}

func TestProcessTraceListResponse(t *testing.T) {
targets := map[string]string{
"A": `{
"timeField": "@timestamp",
"metrics": [{ "type": "count", "id": "1" }],
"luceneQueryType": "Traces"
}`,
}

response := `
{
"responses": [{
"aggregations": {
"traces": {
"buckets": [{
"doc_count": 50,
"key": "000000000000000001c01e08995dd2e2",
"last_updated": {
"value": 1700074430928,
"value_as_string": "2023-11-15T18:53:50.928Z"
},
"latency": {
"value": 656.43
},
"trace_group": {
"buckets":[{
"doc_count":50,
"key": "HTTP GET /dispatch"
}]
},
"error_count": {
"doc_count":0
}
}]
}
}
}]
}
`

rp, err := newResponseParserForTest(targets, response, nil, client.ConfiguredFields{TimeField: "@timestamp"}, &backend.DataSourceInstanceSettings{UID: "123", Name: "DatasourceInstanceName"})
assert.Nil(t, err)

result, err := rp.parseResponse()
require.NoError(t, err)
require.Len(t, result.Responses, 1)

queryRes := result.Responses["A"]
require.NotNil(t, queryRes)

dataframes := queryRes.Frames
require.Len(t, dataframes, 1)

frame := dataframes[0]

traceId := frame.Fields[0]
assert.Equal(t, "000000000000000001c01e08995dd2e2", traceId.At(0))
assert.Equal(t, "Trace Id", traceId.Name)
assert.Equal(t, "string", traceId.Type().ItemTypeString())
//deep link config to make it possible to click through to individual trace view
assert.Equal(t, "traceId: ${__value.raw}", traceId.Config.Links[0].Internal.Query.(map[string]interface{})["query"])
assert.Equal(t, "Traces", traceId.Config.Links[0].Internal.Query.(map[string]interface{})["luceneQueryType"])
assert.Equal(t, "123", traceId.Config.Links[0].Internal.DatasourceUID)
assert.Equal(t, "DatasourceInstanceName", traceId.Config.Links[0].Internal.DatasourceName)

traceGroup := frame.Fields[1]
assert.Equal(t, "HTTP GET /dispatch", traceGroup.At(0))
assert.Equal(t, "Trace Group", traceGroup.Name)
assert.Equal(t, "string", traceGroup.Type().ItemTypeString())

latency := frame.Fields[2]
assert.Equal(t, 656.43, latency.At(0))
assert.Equal(t, "Latency (ms)", latency.Name)
assert.Equal(t, "float64", latency.Type().ItemTypeString())

errorCount := frame.Fields[3]
assert.Equal(t, float64(0), errorCount.At(0))
assert.Equal(t, "Error Count", errorCount.Name)
assert.Equal(t, "float64", errorCount.Type().ItemTypeString())

lastUpdated := frame.Fields[4]
assert.Equal(t, time.Time(time.Date(2023, time.November, 15, 13, 53, 50, 928000000, time.Local)), lastUpdated.At(0))
assert.Equal(t, "Last Updated", lastUpdated.Name)
assert.Equal(t, "time.Time", lastUpdated.Type().ItemTypeString())
}
11 changes: 4 additions & 7 deletions src/datasource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -539,17 +539,14 @@ export class OpenSearchDatasource extends DataSourceWithBackend<OpenSearchQuery,
(metric) =>
metric.type === 'raw_data' ||
metric.type === 'raw_document' ||
(request.app === CoreApp.Explore && target.queryType === QueryType.Lucene &&
target.luceneQueryType !== LuceneQueryType.Traces)
(request.app === CoreApp.Explore && target.queryType === QueryType.Lucene)
) ||
(request.app === CoreApp.Explore &&
target.queryType === QueryType.PPL &&
(target.format === 'logs' || target.format === 'table')
) ||
(target.format === 'logs' || target.format === 'table')) ||
(request.app === CoreApp.Explore &&
target.luceneQueryType === LuceneQueryType.Traces &&
getTraceIdFromLuceneQueryString(target.query ?? '')
)
target.luceneQueryType === LuceneQueryType.Traces &&
getTraceIdFromLuceneQueryString(target.query ?? ''))
)
) {
// @ts-ignore
Expand Down

0 comments on commit 5a046e6

Please sign in to comment.