From 0a4521f19a49a0bacea8a0fe9081a2c9b8b5ca86 Mon Sep 17 00:00:00 2001 From: Sarah Zinger Date: Sun, 19 Nov 2023 12:06:03 -0500 Subject: [PATCH 1/5] Refactor Response Parser --- pkg/opensearch/lucene_handler.go | 8 +- pkg/opensearch/opensearch.go | 2 +- ...{time_series_query.go => query_request.go} | 20 +- ...es_query_test.go => query_request_test.go} | 4 +- pkg/opensearch/response_parser.go | 39 +- pkg/opensearch/response_parser_test.go | 1506 ++++++++--------- 6 files changed, 797 insertions(+), 782 deletions(-) rename pkg/opensearch/{time_series_query.go => query_request.go} (87%) rename pkg/opensearch/{time_series_query_test.go => query_request_test.go} (99%) diff --git a/pkg/opensearch/lucene_handler.go b/pkg/opensearch/lucene_handler.go index e27f9c95..de9dd47f 100644 --- a/pkg/opensearch/lucene_handler.go +++ b/pkg/opensearch/lucene_handler.go @@ -21,15 +21,17 @@ type luceneHandler struct { intervalCalculator tsdb.IntervalCalculator ms *es.MultiSearchRequestBuilder queries []*Query + dsSettings *backend.DataSourceInstanceSettings } -func newLuceneHandler(client es.Client, queries []backend.DataQuery, intervalCalculator tsdb.IntervalCalculator) *luceneHandler { +func newLuceneHandler(client es.Client, queries []backend.DataQuery, intervalCalculator tsdb.IntervalCalculator, dsSettings *backend.DataSourceInstanceSettings) *luceneHandler { return &luceneHandler{ client: client, reqQueries: queries, intervalCalculator: intervalCalculator, ms: client.MultiSearch(), queries: make([]*Query, 0), + dsSettings: dsSettings, } } @@ -278,8 +280,8 @@ func (h *luceneHandler) executeQueries(ctx context.Context) (*backend.QueryDataR return nil, err } - rp := newResponseParser(res.Responses, h.queries, res.DebugInfo) - return rp.getTimeSeries(h.client.GetConfiguredFields()) + rp := newResponseParser(res.Responses, h.queries, res.DebugInfo, h.client.GetConfiguredFields(), h.dsSettings) + return rp.parseResponse() } func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFrom, timeTo int64, timeField string) es.AggBuilder { diff --git a/pkg/opensearch/opensearch.go b/pkg/opensearch/opensearch.go index e3cc99d0..c31c94ad 100644 --- a/pkg/opensearch/opensearch.go +++ b/pkg/opensearch/opensearch.go @@ -64,7 +64,7 @@ func (ds *OpenSearchDatasource) QueryData(ctx context.Context, req *backend.Quer return nil, err } - query := newTimeSeriesQuery(osClient, req.Queries, intervalCalculator) + query := newQueryRequest(osClient, req.Queries, req.PluginContext.DataSourceInstanceSettings, intervalCalculator) response, err := wrapError(query.execute(ctx)) return response, err } diff --git a/pkg/opensearch/time_series_query.go b/pkg/opensearch/query_request.go similarity index 87% rename from pkg/opensearch/time_series_query.go rename to pkg/opensearch/query_request.go index 2c4d2a46..f1477c32 100644 --- a/pkg/opensearch/time_series_query.go +++ b/pkg/opensearch/query_request.go @@ -12,27 +12,29 @@ import ( "github.com/grafana/opensearch-datasource/pkg/utils" ) -type timeSeriesQuery struct { +type queryRequest struct { client es.Client - tsdbQueries []backend.DataQuery + queries []backend.DataQuery + dsSettings *backend.DataSourceInstanceSettings intervalCalculator tsdb.IntervalCalculator } -func newTimeSeriesQuery(client es.Client, query []backend.DataQuery, intervalCalculator tsdb.IntervalCalculator) *timeSeriesQuery { - return &timeSeriesQuery{ +func newQueryRequest(client es.Client, queries []backend.DataQuery, dsSettings *backend.DataSourceInstanceSettings, intervalCalculator tsdb.IntervalCalculator) *queryRequest { + return &queryRequest{ client: client, - tsdbQueries: query, + queries: queries, + dsSettings: dsSettings, intervalCalculator: intervalCalculator, } } -func (e *timeSeriesQuery) execute(ctx context.Context) (*backend.QueryDataResponse, error) { +func (e *queryRequest) execute(ctx context.Context) (*backend.QueryDataResponse, error) { handlers := make(map[string]queryHandler) - handlers[Lucene] = newLuceneHandler(e.client, e.tsdbQueries, e.intervalCalculator) - handlers[PPL] = newPPLHandler(e.client, e.tsdbQueries) + handlers[Lucene] = newLuceneHandler(e.client, e.queries, e.intervalCalculator, e.dsSettings) + handlers[PPL] = newPPLHandler(e.client, e.queries) - queries, err := parse(e.tsdbQueries) + queries, err := parse(e.queries) if err != nil { return nil, err } diff --git a/pkg/opensearch/time_series_query_test.go b/pkg/opensearch/query_request_test.go similarity index 99% rename from pkg/opensearch/time_series_query_test.go rename to pkg/opensearch/query_request_test.go index 3e5038db..acd2c89a 100644 --- a/pkg/opensearch/time_series_query_test.go +++ b/pkg/opensearch/query_request_test.go @@ -969,7 +969,9 @@ func executeTsdbQuery(c es.Client, body string, from, to time.Time, minInterval }, }, } - query := newTimeSeriesQuery(c, tsdbQuery, tsdb.NewIntervalCalculator(&tsdb.IntervalOptions{MinInterval: minInterval})) + + dsSettings := backend.DataSourceInstanceSettings{} + query := newQueryRequest(c, tsdbQuery, &dsSettings, tsdb.NewIntervalCalculator(&tsdb.IntervalOptions{MinInterval: minInterval})) return query.execute(context.Background()) } diff --git a/pkg/opensearch/response_parser.go b/pkg/opensearch/response_parser.go index d7ed6084..1197296d 100644 --- a/pkg/opensearch/response_parser.go +++ b/pkg/opensearch/response_parser.go @@ -13,6 +13,7 @@ import ( simplejson "github.com/bitly/go-simplejson" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/grafana/opensearch-datasource/pkg/opensearch/client" es "github.com/grafana/opensearch-datasource/pkg/opensearch/client" utils "github.com/grafana/opensearch-datasource/pkg/utils" ) @@ -42,35 +43,43 @@ const ( ) type responseParser struct { - Responses []*es.SearchResponse - Targets []*Query - DebugInfo *es.SearchDebugInfo + Responses []*es.SearchResponse + Targets []*Query + DebugInfo *es.SearchDebugInfo + ConfiguredFields client.ConfiguredFields + DSSettings *backend.DataSourceInstanceSettings } -func newResponseParser(responses []*es.SearchResponse, targets []*Query, debugInfo *es.SearchDebugInfo) *responseParser { +func newResponseParser(responses []*es.SearchResponse, targets []*Query, debugInfo *es.SearchDebugInfo, configuredFields client.ConfiguredFields, dsSettings *backend.DataSourceInstanceSettings) *responseParser { return &responseParser{ - Responses: responses, - Targets: targets, - DebugInfo: debugInfo, + Responses: responses, + Targets: targets, + DebugInfo: debugInfo, + ConfiguredFields: configuredFields, + DSSettings: dsSettings, } } -func (rp *responseParser) getTimeSeries(configuredFields es.ConfiguredFields) (*backend.QueryDataResponse, error) { +func (rp *responseParser) parseResponse() (*backend.QueryDataResponse, error) { result := backend.NewQueryDataResponse() if rp.Responses == nil { return result, nil } + // go through each response, create data frames based on type, add them to result for i, res := range rp.Responses { + // grab the associated query target := rp.Targets[i] - var debugInfo *simplejson.Json - if rp.DebugInfo != nil && i == 0 { - debugInfo = utils.NewJsonFromAny(rp.DebugInfo) - } - + // if one of the responses is an error add debug info and error + // and keep trying to process other responses if res.Error != nil { + var debugInfo *simplejson.Json + if rp.DebugInfo != nil && i == 0 { + debugInfo = utils.NewJsonFromAny(rp.DebugInfo) + } + result.Responses[target.RefID] = backend.DataResponse{ Error: getErrorFromOpenSearchResponse(res), Frames: []*data.Frame{ @@ -98,11 +107,11 @@ func (rp *responseParser) getTimeSeries(configuredFields es.ConfiguredFields) (* switch target.Metrics[0].Type { case rawDataType: - queryRes = processRawDataResponse(res, configuredFields, queryRes) + queryRes = processRawDataResponse(res, rp.ConfiguredFields, queryRes) case rawDocumentType: queryRes = processRawDocumentResponse(res, target.RefID, queryRes) case logsType: - queryRes = processLogsResponse(res, configuredFields, queryRes) + queryRes = processLogsResponse(res, rp.ConfiguredFields, queryRes) default: props := make(map[string]string) err := rp.processBuckets(res.Aggregations, target, &queryRes, props, 0) diff --git a/pkg/opensearch/response_parser_test.go b/pkg/opensearch/response_parser_test.go index dcb057c4..1ae1c726 100644 --- a/pkg/opensearch/response_parser_test.go +++ b/pkg/opensearch/response_parser_test.go @@ -17,6 +17,36 @@ import ( "github.com/stretchr/testify/require" ) +func newResponseParserForTest(tsdbQueries map[string]string, responseBody string, debugInfo *client.SearchDebugInfo, configuredFields client.ConfiguredFields, dsSettings *backend.DataSourceInstanceSettings) (*responseParser, error) { + from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC) + to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC) + dataQueries := []backend.DataQuery{} + + for refID, tsdbQueryBody := range tsdbQueries { + dataQueries = append(dataQueries, backend.DataQuery{ + JSON: []byte(tsdbQueryBody), + RefID: refID, + TimeRange: backend.TimeRange{ + From: from, + To: to, + }, + }) + } + + var response client.MultiSearchResponse + err := json.Unmarshal([]byte(responseBody), &response) + if err != nil { + return nil, err + } + + queries, err := parse(dataQueries) + if err != nil { + return nil, err + } + + return newResponseParser(response.Responses, queries, debugInfo, configuredFields, dsSettings), nil +} + func Test_ResponseParser_test(t *testing.T) { t.Run("Simple query and count", func(t *testing.T) { targets := map[string]string{ @@ -57,9 +87,9 @@ func Test_ResponseParser_test(t *testing.T) { } ] }` - rp, err := newResponseParserForTest(targets, response) + rp, err := newResponseParserForTest(targets, response, nil, client.ConfiguredFields{TimeField: "@timestamp"}, nil) assert.Nil(t, err) - result, err := rp.getTimeSeries(client.ConfiguredFields{TimeField: "@timestamp"}) + result, err := rp.parseResponse() assert.Nil(t, err) require.Len(t, result.Responses, 1) @@ -81,36 +111,36 @@ func Test_ResponseParser_test(t *testing.T) { t.Run("Simple query count & avg aggregation", func(t *testing.T) { targets := map[string]string{ "A": `{ - "timeField": "@timestamp", - "metrics": [{ "type": "count", "id": "1" }, {"type": "avg", "field": "value", "id": "2" }], - "bucketAggs": [{ "type": "date_histogram", "field": "@timestamp", "id": "3" }] - }`, + "timeField": "@timestamp", + "metrics": [{ "type": "count", "id": "1" }, {"type": "avg", "field": "value", "id": "2" }], + "bucketAggs": [{ "type": "date_histogram", "field": "@timestamp", "id": "3" }] + }`, } response := `{ - "responses": [ - { - "aggregations": { - "3": { - "buckets": [ - { - "2": { "value": 88 }, - "doc_count": 10, - "key": 1000 - }, - { - "2": { "value": 99 }, - "doc_count": 15, - "key": 2000 - } - ] - } - } - } - ] - }` - rp, err := newResponseParserForTest(targets, response) + "responses": [ + { + "aggregations": { + "3": { + "buckets": [ + { + "2": { "value": 88 }, + "doc_count": 10, + "key": 1000 + }, + { + "2": { "value": 99 }, + "doc_count": 15, + "key": 2000 + } + ] + } + } + } + ] + }` + rp, err := newResponseParserForTest(targets, response, nil, client.ConfiguredFields{TimeField: "@timestamp"}, nil) assert.Nil(t, err) - result, err := rp.getTimeSeries(client.ConfiguredFields{TimeField: "@timestamp"}) + result, err := rp.parseResponse() assert.Nil(t, err) require.Len(t, result.Responses, 1) @@ -170,9 +200,9 @@ func Test_ResponseParser_test(t *testing.T) { } ] }` - rp, err := newResponseParserForTest(targets, response) + rp, err := newResponseParserForTest(targets, response, nil, client.ConfiguredFields{TimeField: "@timestamp"}, nil) assert.Nil(t, err) - result, err := rp.getTimeSeries(client.ConfiguredFields{TimeField: "@timestamp"}) + result, err := rp.parseResponse() assert.Nil(t, err) responseForA, ok := result.Responses["A"] require.True(t, ok) @@ -240,9 +270,9 @@ func Test_ResponseParser_test(t *testing.T) { } ] }` - rp, err := newResponseParserForTest(targets, response) + rp, err := newResponseParserForTest(targets, response, nil, client.ConfiguredFields{TimeField: "@timestamp"}, nil) assert.Nil(t, err) - result, err := rp.getTimeSeries(client.ConfiguredFields{TimeField: "@timestamp"}) + result, err := rp.parseResponse() assert.Nil(t, err) require.Len(t, result.Responses, 1) @@ -313,9 +343,9 @@ func Test_ResponseParser_test(t *testing.T) { } ] }` - rp, err := newResponseParserForTest(targets, response) + rp, err := newResponseParserForTest(targets, response, nil, client.ConfiguredFields{TimeField: "@timestamp"}, nil) assert.Nil(t, err) - result, err := rp.getTimeSeries(client.ConfiguredFields{TimeField: "@timestamp"}) + result, err := rp.parseResponse() assert.Nil(t, err) require.Len(t, result.Responses, 1) @@ -393,9 +423,9 @@ func Test_ResponseParser_test(t *testing.T) { } ] }` - rp, err := newResponseParserForTest(targets, response) + rp, err := newResponseParserForTest(targets, response, nil, client.ConfiguredFields{TimeField: "@timestamp"}, nil) assert.Nil(t, err) - result, err := rp.getTimeSeries(client.ConfiguredFields{TimeField: "@timestamp"}) + result, err := rp.parseResponse() assert.Nil(t, err) require.Len(t, result.Responses, 1) @@ -478,9 +508,9 @@ func Test_ResponseParser_test(t *testing.T) { } ] }` - rp, err := newResponseParserForTest(targets, response) + rp, err := newResponseParserForTest(targets, response, nil, client.ConfiguredFields{TimeField: "@timestamp"}, nil) assert.Nil(t, err) - result, err := rp.getTimeSeries(client.ConfiguredFields{TimeField: "@timestamp"}) + result, err := rp.parseResponse() assert.Nil(t, err) require.Len(t, result.Responses, 1) @@ -582,9 +612,9 @@ func Test_ResponseParser_test(t *testing.T) { } ] }` - rp, err := newResponseParserForTest(targets, response) + rp, err := newResponseParserForTest(targets, response, nil, client.ConfiguredFields{TimeField: "@timestamp"}, nil) assert.Nil(t, err) - result, err := rp.getTimeSeries(client.ConfiguredFields{TimeField: "@timestamp"}) + result, err := rp.parseResponse() assert.Nil(t, err) require.Len(t, result.Responses, 1) @@ -642,9 +672,9 @@ func Test_ResponseParser_test(t *testing.T) { } ] }` - rp, err := newResponseParserForTest(targets, response) + rp, err := newResponseParserForTest(targets, response, nil, client.ConfiguredFields{TimeField: "@timestamp"}, nil) assert.Nil(t, err) - result, err := rp.getTimeSeries(client.ConfiguredFields{TimeField: "@timestamp"}) + result, err := rp.parseResponse() assert.Nil(t, err) require.Len(t, result.Responses, 1) @@ -703,9 +733,9 @@ func Test_ResponseParser_test(t *testing.T) { } ] }` - rp, err := newResponseParserForTest(targets, response) + rp, err := newResponseParserForTest(targets, response, nil, client.ConfiguredFields{TimeField: "@timestamp"}, nil) assert.Nil(t, err) - result, err := rp.getTimeSeries(client.ConfiguredFields{TimeField: "@timestamp"}) + result, err := rp.parseResponse() assert.Nil(t, err) require.Len(t, result.Responses, 1) @@ -776,9 +806,9 @@ func Test_ResponseParser_test(t *testing.T) { } ] }` - rp, err := newResponseParserForTest(targets, response) + rp, err := newResponseParserForTest(targets, response, nil, client.ConfiguredFields{TimeField: "@timestamp"}, nil) assert.Nil(t, err) - result, err := rp.getTimeSeries(client.ConfiguredFields{TimeField: "@timestamp"}) + result, err := rp.parseResponse() assert.Nil(t, err) require.Len(t, result.Responses, 1) @@ -806,36 +836,36 @@ func Test_ResponseParser_test(t *testing.T) { t.Run("No group by time", func(t *testing.T) { targets := map[string]string{ "A": `{ - "timeField": "@timestamp", - "metrics": [{ "type": "avg", "id": "1" }, { "type": "count" }], - "bucketAggs": [{ "type": "terms", "field": "host", "id": "2" }] - }`, + "timeField": "@timestamp", + "metrics": [{ "type": "avg", "id": "1" }, { "type": "count" }], + "bucketAggs": [{ "type": "terms", "field": "host", "id": "2" }] + }`, } response := `{ - "responses": [ - { - "aggregations": { - "2": { - "buckets": [ - { - "1": { "value": 1000 }, - "key": "server-1", - "doc_count": 369 - }, - { - "1": { "value": 2000 }, - "key": "server-2", - "doc_count": 200 + "responses": [ + { + "aggregations": { + "2": { + "buckets": [ + { + "1": { "value": 1000 }, + "key": "server-1", + "doc_count": 369 + }, + { + "1": { "value": 2000 }, + "key": "server-2", + "doc_count": 200 + } + ] } - ] + } } - } - } - ] - }` - rp, err := newResponseParserForTest(targets, response) + ] + }` + rp, err := newResponseParserForTest(targets, response, nil, client.ConfiguredFields{TimeField: "@timestamp"}, nil) assert.Nil(t, err) - result, err := rp.getTimeSeries(client.ConfiguredFields{TimeField: "@timestamp"}) + result, err := rp.parseResponse() assert.Nil(t, err) require.Len(t, result.Responses, 1) @@ -865,32 +895,32 @@ func Test_ResponseParser_test(t *testing.T) { t.Run("Multiple metrics of same type", func(t *testing.T) { targets := map[string]string{ "A": `{ - "timeField": "@timestamp", - "metrics": [{ "type": "avg", "field": "test", "id": "1" }, { "type": "avg", "field": "test2", "id": "2" }], - "bucketAggs": [{ "type": "terms", "field": "host", "id": "2" }] - }`, + "timeField": "@timestamp", + "metrics": [{ "type": "avg", "field": "test", "id": "1" }, { "type": "avg", "field": "test2", "id": "2" }], + "bucketAggs": [{ "type": "terms", "field": "host", "id": "2" }] + }`, } response := `{ - "responses": [ - { - "aggregations": { - "2": { - "buckets": [ - { - "1": { "value": 1000 }, - "2": { "value": 3000 }, - "key": "server-1", - "doc_count": 369 + "responses": [ + { + "aggregations": { + "2": { + "buckets": [ + { + "1": { "value": 1000 }, + "2": { "value": 3000 }, + "key": "server-1", + "doc_count": 369 + } + ] } - ] + } } - } - } - ] - }` - rp, err := newResponseParserForTest(targets, response) + ] + }` + rp, err := newResponseParserForTest(targets, response, nil, client.ConfiguredFields{TimeField: "@timestamp"}, nil) assert.Nil(t, err) - result, err := rp.getTimeSeries(client.ConfiguredFields{TimeField: "@timestamp"}) + result, err := rp.parseResponse() assert.Nil(t, err) require.Len(t, result.Responses, 1) @@ -916,50 +946,50 @@ func Test_ResponseParser_test(t *testing.T) { t.Run("With bucket_script", func(t *testing.T) { targets := map[string]string{ "A": `{ - "timeField": "@timestamp", - "metrics": [ - { "id": "1", "type": "sum", "field": "@value" }, - { "id": "3", "type": "max", "field": "@value" }, - { - "id": "4", - "field": "select field", - "pipelineVariables": [{ "name": "var1", "pipelineAgg": "1" }, { "name": "var2", "pipelineAgg": "3" }], - "settings": { "script": "params.var1 * params.var2" }, - "type": "bucket_script" - } - ], - "bucketAggs": [{ "type": "date_histogram", "field": "@timestamp", "id": "2" }] - }`, + "timeField": "@timestamp", + "metrics": [ + { "id": "1", "type": "sum", "field": "@value" }, + { "id": "3", "type": "max", "field": "@value" }, + { + "id": "4", + "field": "select field", + "pipelineVariables": [{ "name": "var1", "pipelineAgg": "1" }, { "name": "var2", "pipelineAgg": "3" }], + "settings": { "script": "params.var1 * params.var2" }, + "type": "bucket_script" + } + ], + "bucketAggs": [{ "type": "date_histogram", "field": "@timestamp", "id": "2" }] + }`, } response := `{ - "responses": [ - { - "aggregations": { - "2": { - "buckets": [ - { - "1": { "value": 2 }, - "3": { "value": 3 }, - "4": { "value": 6 }, - "doc_count": 60, - "key": 1000 - }, - { - "1": { "value": 3 }, - "3": { "value": 4 }, - "4": { "value": 12 }, - "doc_count": 60, - "key": 2000 + "responses": [ + { + "aggregations": { + "2": { + "buckets": [ + { + "1": { "value": 2 }, + "3": { "value": 3 }, + "4": { "value": 6 }, + "doc_count": 60, + "key": 1000 + }, + { + "1": { "value": 3 }, + "3": { "value": 4 }, + "4": { "value": 12 }, + "doc_count": 60, + "key": 2000 + } + ] } - ] + } } - } - } - ] - }` - rp, err := newResponseParserForTest(targets, response) + ] + }` + rp, err := newResponseParserForTest(targets, response, nil, client.ConfiguredFields{TimeField: "@timestamp"}, nil) assert.Nil(t, err) - result, err := rp.getTimeSeries(client.ConfiguredFields{TimeField: "@timestamp"}) + result, err := rp.parseResponse() assert.Nil(t, err) require.Len(t, result.Responses, 1) @@ -1000,59 +1030,59 @@ func Test_ResponseParser_test(t *testing.T) { t.Run("Terms with two bucket_script", func(t *testing.T) { targets := map[string]string{ "A": `{ - "timeField": "@timestamp", - "metrics": [ - { "id": "1", "type": "sum", "field": "@value" }, - { "id": "3", "type": "max", "field": "@value" }, - { - "id": "4", - "field": "select field", - "pipelineVariables": [{ "name": "var1", "pipelineAgg": "1" }, { "name": "var2", "pipelineAgg": "3" }], - "settings": { "script": "params.var1 * params.var2" }, - "type": "bucket_script" - }, - { - "id": "5", - "field": "select field", - "pipelineVariables": [{ "name": "var1", "pipelineAgg": "1" }, { "name": "var2", "pipelineAgg": "3" }], - "settings": { "script": "params.var1 * params.var2 * 2" }, - "type": "bucket_script" - } - ], - "bucketAggs": [{ "type": "terms", "field": "@timestamp", "id": "2" }] - }`, + "timeField": "@timestamp", + "metrics": [ + { "id": "1", "type": "sum", "field": "@value" }, + { "id": "3", "type": "max", "field": "@value" }, + { + "id": "4", + "field": "select field", + "pipelineVariables": [{ "name": "var1", "pipelineAgg": "1" }, { "name": "var2", "pipelineAgg": "3" }], + "settings": { "script": "params.var1 * params.var2" }, + "type": "bucket_script" + }, + { + "id": "5", + "field": "select field", + "pipelineVariables": [{ "name": "var1", "pipelineAgg": "1" }, { "name": "var2", "pipelineAgg": "3" }], + "settings": { "script": "params.var1 * params.var2 * 2" }, + "type": "bucket_script" + } + ], + "bucketAggs": [{ "type": "terms", "field": "@timestamp", "id": "2" }] + }`, } response := `{ - "responses": [ - { - "aggregations": { - "2": { - "buckets": [ - { - "1": { "value": 2 }, - "3": { "value": 3 }, - "4": { "value": 6 }, - "5": { "value": 24 }, - "doc_count": 60, - "key": 1000 - }, - { - "1": { "value": 3 }, - "3": { "value": 4 }, - "4": { "value": 12 }, - "5": { "value": 48 }, - "doc_count": 60, - "key": 2000 + "responses": [ + { + "aggregations": { + "2": { + "buckets": [ + { + "1": { "value": 2 }, + "3": { "value": 3 }, + "4": { "value": 6 }, + "5": { "value": 24 }, + "doc_count": 60, + "key": 1000 + }, + { + "1": { "value": 3 }, + "3": { "value": 4 }, + "4": { "value": 12 }, + "5": { "value": 48 }, + "doc_count": 60, + "key": 2000 + } + ] + } } - ] - } } - } - ] - }` - rp, err := newResponseParserForTest(targets, response) + ] + }` + rp, err := newResponseParserForTest(targets, response, nil, client.ConfiguredFields{TimeField: "@timestamp"}, nil) assert.Nil(t, err) - result, err := rp.getTimeSeries(client.ConfiguredFields{TimeField: "@timestamp"}) + result, err := rp.parseResponse() assert.Nil(t, err) require.Len(t, result.Responses, 1) queryRes := result.Responses["A"] @@ -1092,79 +1122,79 @@ func TestProcessLogsResponse_creates_correct_data_frame_fields(t *testing.T) { // creates correct data frame fields targets := map[string]string{ "A": `{ - "refId": "A", - "timeField": "@timestamp", - "metrics": [{ "type": "logs"}], - "bucketAggs": [ - { - "type": "date_histogram", - "settings": { "interval": "auto" }, - "id": "2" - } - ], - "key": "Q-1561369883389-0.7611823271062786-0", - "query": "hello AND message" - }`, + "refId": "A", + "timeField": "@timestamp", + "metrics": [{ "type": "logs"}], + "bucketAggs": [ + { + "type": "date_histogram", + "settings": { "interval": "auto" }, + "id": "2" + } + ], + "key": "Q-1561369883389-0.7611823271062786-0", + "query": "hello AND message" + }`, } response := ` - { - "responses":[ - { - "aggregations":{ - - }, - "hits":{ - "hits":[ - { - "_id":"fdsfs", - "_type":"_doc", - "_index":"mock-index", - "_source":{ - "testtime":"06/24/2019", - "host":"djisaodjsoad", - "number":1, - "line":"hello, i am a message", - "level":"debug", - "fields":{ - "lvl":"debug" - } - }, - "fields":{ - "testtime":[ - "2019-06-24T09:51:19.765Z" - ] - } - }, - { - "_id":"kdospaidopa", - "_type":"_doc", - "_index":"mock-index", - "_source":{ - "testtime":"06/24/2019", - "host":"dsalkdakdop", - "number":2, - "line":"hello, i am also message", - "level":"error", - "fields":{ - "lvl":"info" - } - }, - "fields":{ - "testtime":[ - "2019-06-24T09:52:19.765Z" - ] - } - } - ] - } - } - ] - }` + { + "responses":[ + { + "aggregations":{ - rp, err := newResponseParserForTest(targets, response) + }, + "hits":{ + "hits":[ + { + "_id":"fdsfs", + "_type":"_doc", + "_index":"mock-index", + "_source":{ + "testtime":"06/24/2019", + "host":"djisaodjsoad", + "number":1, + "line":"hello, i am a message", + "level":"debug", + "fields":{ + "lvl":"debug" + } + }, + "fields":{ + "testtime":[ + "2019-06-24T09:51:19.765Z" + ] + } + }, + { + "_id":"kdospaidopa", + "_type":"_doc", + "_index":"mock-index", + "_source":{ + "testtime":"06/24/2019", + "host":"dsalkdakdop", + "number":2, + "line":"hello, i am also message", + "level":"error", + "fields":{ + "lvl":"info" + } + }, + "fields":{ + "testtime":[ + "2019-06-24T09:52:19.765Z" + ] + } + } + ] + } + } + ] + }` + + rp, err := newResponseParserForTest(targets, response, nil, client.ConfiguredFields{TimeField: "testtime"}, nil) assert.NoError(t, err) - result, err := rp.getTimeSeries(client.ConfiguredFields{TimeField: "testtime"}) + result, err := rp.parseResponse() require.NoError(t, err) queryRes := result.Responses["A"] @@ -1232,37 +1262,37 @@ func TestProcessLogsResponse_empty_response(t *testing.T) { // Empty response targets := map[string]string{ "A": ` - { - "refId":"A", - "timeField": "@timestamp", - "metrics":[ - { - "type":"logs", - "id":"2" - } - ], - "bucketAggs":[ - - ], - "key":"Q-1561369883389-0.7611823271062786-0", - "query":"hello AND message" - }`, + { + "refId":"A", + "timeField": "@timestamp", + "metrics":[ + { + "type":"logs", + "id":"2" + } + ], + "bucketAggs":[ + + ], + "key":"Q-1561369883389-0.7611823271062786-0", + "query":"hello AND message" + }`, } response := ` - { - "responses": [ - { - "hits": { "hits": [] }, - "aggregations": {}, - "status": 200 - } - ] - }` + { + "responses": [ + { + "hits": { "hits": [] }, + "aggregations": {}, + "status": 200 + } + ] + }` - rp, err := newResponseParserForTest(targets, response) + rp, err := newResponseParserForTest(targets, response, nil, client.ConfiguredFields{TimeField: "testtime"}, nil) assert.NoError(t, err) - result, err := rp.getTimeSeries(client.ConfiguredFields{TimeField: "testtime"}) + result, err := rp.parseResponse() require.NoError(t, err) queryRes := result.Responses["A"] @@ -1281,90 +1311,90 @@ func TestProcessLogsResponse_log_query_with_nested_fields(t *testing.T) { targets := map[string]string{"A": `{"timeField": "@timestamp", "metrics": [{ "type": "logs" }]}`} response := ` - { - "responses":[ - { - "hits":{ - "total":{ - "value":109, - "relation":"eq" - }, - "max_score":null, - "hits":[ - { - "_index":"logs-2023.02.08", - "_id":"GB2UMYYBfCQ-FCMjayJa", - "_score":null, - "_source":{ - "@timestamp":"2023-02-08T15:10:55.830Z", - "line":"log text [479231733]", - "counter":"109", - "float":58.253758485091, - "label":"val1", - "lvl":"info", - "location":"17.089705232090438, 41.62861966340297", - "nested":{ - "field":{ - "double_nested":true - } - }, - "shapes":[ - { - "type":"triangle" - }, - { - "type":"square" - } - ], - "xyz":null - }, - "sort":[ - 1675869055830, - 4 - ] - }, - { - "_index":"logs-2023.02.08", - "_id":"Fx2UMYYBfCQ-FCMjZyJ_", - "_score":null, - "_source":{ - "@timestamp":"2023-02-08T15:10:54.835Z", - "line":"log text with ANSI \u001b[31mpart of the text\u001b[0m [493139080]", - "counter":"108", - "float":54.5977098233944, - "label":"val1", - "lvl":"info", - "location":"19.766305918490463, 40.42639175509792", - "nested":{ - "field":{ - "double_nested":false - } - }, - "shapes":[ - { - "type":"triangle" - }, - { - "type":"square" - } - ], - "xyz":"def" - }, - "sort":[ - 1675869054835, - 7 - ] - } - ] - }, - "status":200 - } - ] - }` + { + "responses":[ + { + "hits":{ + "total":{ + "value":109, + "relation":"eq" + }, + "max_score":null, + "hits":[ + { + "_index":"logs-2023.02.08", + "_id":"GB2UMYYBfCQ-FCMjayJa", + "_score":null, + "_source":{ + "@timestamp":"2023-02-08T15:10:55.830Z", + "line":"log text [479231733]", + "counter":"109", + "float":58.253758485091, + "label":"val1", + "lvl":"info", + "location":"17.089705232090438, 41.62861966340297", + "nested":{ + "field":{ + "double_nested":true + } + }, + "shapes":[ + { + "type":"triangle" + }, + { + "type":"square" + } + ], + "xyz":null + }, + "sort":[ + 1675869055830, + 4 + ] + }, + { + "_index":"logs-2023.02.08", + "_id":"Fx2UMYYBfCQ-FCMjZyJ_", + "_score":null, + "_source":{ + "@timestamp":"2023-02-08T15:10:54.835Z", + "line":"log text with ANSI \u001b[31mpart of the text\u001b[0m [493139080]", + "counter":"108", + "float":54.5977098233944, + "label":"val1", + "lvl":"info", + "location":"19.766305918490463, 40.42639175509792", + "nested":{ + "field":{ + "double_nested":false + } + }, + "shapes":[ + { + "type":"triangle" + }, + { + "type":"square" + } + ], + "xyz":"def" + }, + "sort":[ + 1675869054835, + 7 + ] + } + ] + }, + "status":200 + } + ] + }` - rp, err := newResponseParserForTest(targets, response) + rp, err := newResponseParserForTest(targets, response, nil, client.ConfiguredFields{TimeField: "@timestamp", LogMessageField: "line", LogLevelField: "lvl"}, nil) assert.NoError(t, err) - result, err := rp.getTimeSeries(client.ConfiguredFields{TimeField: "@timestamp", LogMessageField: "line", LogLevelField: "lvl"}) + result, err := rp.parseResponse() require.NoError(t, err) queryRes := result.Responses["A"] @@ -1538,47 +1568,47 @@ func Test_ProcessRawDataResponse(t *testing.T) { t.Run("ProcessRawDataResponse populates standard fields and gets other fields from _source, in alphabetical order, with time at the beginning", func(t *testing.T) { targets := map[string]string{ "A": `{ - "timeField": "@timestamp", - "metrics": [{"type": "raw_data"}] - }`, + "timeField": "@timestamp", + "metrics": [{"type": "raw_data"}] + }`, } response := `{ - "responses": [ - { - "hits": { - "total": { - "value": 109, - "relation": "eq" - }, - "max_score": null, - "hits": [ - { - "_index": "logs-2023.02.08", - "_id": "some id", - "_score": null, - "_source": { - "some other field": 15 - }, - "fields": { - "@timestamp": [ - "2022-12-30T15:42:54.000Z" - ] + "responses": [ + { + "hits": { + "total": { + "value": 109, + "relation": "eq" }, - "sort": [ - 1675869055830, - 4 + "max_score": null, + "hits": [ + { + "_index": "logs-2023.02.08", + "_id": "some id", + "_score": null, + "_source": { + "some other field": 15 + }, + "fields": { + "@timestamp": [ + "2022-12-30T15:42:54.000Z" + ] + }, + "sort": [ + 1675869055830, + 4 + ] + } ] - } - ] - }, - "status": 200 - } - ] - }` + }, + "status": 200 + } + ] + }` - rp, err := newResponseParserForTest(targets, response) + rp, err := newResponseParserForTest(targets, response, nil, client.ConfiguredFields{TimeField: "@timestamp"}, nil) assert.Nil(t, err) - result, err := rp.getTimeSeries(client.ConfiguredFields{TimeField: "@timestamp"}) + result, err := rp.parseResponse() require.NoError(t, err) require.Len(t, result.Responses, 1) @@ -1609,35 +1639,35 @@ func Test_ProcessRawDataResponse(t *testing.T) { t.Run("no time in _source or in fields does not create data frame field at the beginning with a nil time", func(t *testing.T) { targets := map[string]string{ "A": `{ - "timeField": "@timestamp", - "metrics": [{"type": "raw_data"}] - }`, + "timeField": "@timestamp", + "metrics": [{"type": "raw_data"}] + }`, } response := `{ - "responses": [ - { - "hits": { - "hits": [ - { - "_index": "logs-2023.02.08", - "_id": "some id", - "_score": null, - "_source": {}, - "sort": [ - 1675869055830, - 4 + "responses": [ + { + "hits": { + "hits": [ + { + "_index": "logs-2023.02.08", + "_id": "some id", + "_score": null, + "_source": {}, + "sort": [ + 1675869055830, + 4 + ] + } ] } - ] - } - } - ] - }` + } + ] + }` - rp, err := newResponseParserForTest(targets, response) + rp, err := newResponseParserForTest(targets, response, nil, client.ConfiguredFields{TimeField: "@timestamp"}, nil) assert.Nil(t, err) - result, err := rp.getTimeSeries(client.ConfiguredFields{TimeField: "@timestamp"}) + result, err := rp.parseResponse() require.NoError(t, err) require.Len(t, result.Responses, 1) @@ -1656,105 +1686,105 @@ func Test_ProcessRawDataResponse(t *testing.T) { t.Run("Simple raw data query", func(t *testing.T) { targets := map[string]string{ "A": `{ - "timeField": "@timestamp", - "metrics": [{"type": "raw_data"}] - }`, + "timeField": "@timestamp", + "metrics": [{"type": "raw_data"}] + }`, } response := `{ - "responses":[ - { - "hits":{ - "total":{ - "value":109, - "relation":"eq" - }, - "max_score":null, - "hits":[ - { - "_index":"logs-2023.02.08", - "_id":"some id", - "_score":null, - "_source":{ - "@timestamp":"2023-02-08T15:10:55.830Z", - "line":"log text [479231733]", - "counter":"109", - "float":58.253758485091, - "label":"val1", - "level":"info", - "location":"17.089705232090438, 41.62861966340297", - "nested":{ - "field":{ - "double_nested":"value" - } - }, - "shapes":[ - { - "type":"triangle" + "responses":[ + { + "hits":{ + "total":{ + "value":109, + "relation":"eq" + }, + "max_score":null, + "hits":[ + { + "_index":"logs-2023.02.08", + "_id":"some id", + "_score":null, + "_source":{ + "@timestamp":"2023-02-08T15:10:55.830Z", + "line":"log text [479231733]", + "counter":"109", + "float":58.253758485091, + "label":"val1", + "level":"info", + "location":"17.089705232090438, 41.62861966340297", + "nested":{ + "field":{ + "double_nested":"value" + } + }, + "shapes":[ + { + "type":"triangle" + }, + { + "type":"square" + } + ], + "xyz":null + }, + "fields": { + "@timestamp": [ + "2023-02-08T15:10:55.830Z" + ] }, - { - "type":"square" - } - ], - "xyz":null - }, - "fields": { - "@timestamp": [ - "2023-02-08T15:10:55.830Z" + "sort":[ + 1675869055830, + 4 ] - }, - "sort":[ - 1675869055830, - 4 - ] - }, - { - "_index":"logs-2023.02.08", - "_id":"Fx2UMYYBfCQ-FCMjZyJ_", - "_score":null, - "_source":{ - "@timestamp":"2023-02-08T15:10:54.835Z", - "line":"log text with ANSI \u001b[31mpart of the text\u001b[0m [493139080]", - "counter":"108", - "float":54.5977098233944, - "label":"val1", - "level":"info", - "location":"19.766305918490463, 40.42639175509792", - "nested":{ - "field":{ - "double_nested":"value" - } - }, - "shapes":[ - { - "type":"triangle" + }, + { + "_index":"logs-2023.02.08", + "_id":"Fx2UMYYBfCQ-FCMjZyJ_", + "_score":null, + "_source":{ + "@timestamp":"2023-02-08T15:10:54.835Z", + "line":"log text with ANSI \u001b[31mpart of the text\u001b[0m [493139080]", + "counter":"108", + "float":54.5977098233944, + "label":"val1", + "level":"info", + "location":"19.766305918490463, 40.42639175509792", + "nested":{ + "field":{ + "double_nested":"value" + } + }, + "shapes":[ + { + "type":"triangle" + }, + { + "type":"square" + } + ], + "xyz":"def" + }, + "fields": { + "@timestamp": [ + "2023-02-08T15:10:54.835Z" + ] }, - { - "type":"square" - } - ], - "xyz":"def" - }, - "fields": { - "@timestamp": [ - "2023-02-08T15:10:54.835Z" + "sort":[ + 1675869054835, + 7 ] - }, - "sort":[ - 1675869054835, - 7 - ] - } - ] - }, - "status":200 - } - ] - }` + } + ] + }, + "status":200 + } + ] + }` - rp, err := newResponseParserForTest(targets, response) + rp, err := newResponseParserForTest(targets, response, nil, client.ConfiguredFields{TimeField: "@timestamp"}, nil) assert.Nil(t, err) - result, err := rp.getTimeSeries(client.ConfiguredFields{TimeField: "@timestamp"}) + result, err := rp.parseResponse() require.NoError(t, err) require.Len(t, result.Responses, 1) @@ -1784,40 +1814,40 @@ func Test_ProcessRawDataResponse(t *testing.T) { t.Run("Raw data query filterable fields", func(t *testing.T) { targets := map[string]string{ "A": `{ - "timeField": "@timestamp", - "metrics": [{ "type": "raw_data", "id": "1" }], - "bucketAggs": [] - }`, + "timeField": "@timestamp", + "metrics": [{ "type": "raw_data", "id": "1" }], + "bucketAggs": [] + }`, } response := ` - { - "responses": [ - { - "hits": { - "total": { "relation": "eq", "value": 1 }, - "hits": [ - { - "_id": "1", - "_type": "_doc", - "_index": "index", - "_source": { "sourceProp": "asd" }, - "fields": { - "@timestamp": [ - "2023-02-08T15:10:54.835Z" - ] - } + { + "responses": [ + { + "hits": { + "total": { "relation": "eq", "value": 1 }, + "hits": [ + { + "_id": "1", + "_type": "_doc", + "_index": "index", + "_source": { "sourceProp": "asd" }, + "fields": { + "@timestamp": [ + "2023-02-08T15:10:54.835Z" + ] + } + } + ] } - ] - } - } - ] - } - ` + } + ] + } + ` - rp, err := newResponseParserForTest(targets, response) + rp, err := newResponseParserForTest(targets, response, nil, client.ConfiguredFields{TimeField: "@timestamp"}, nil) assert.Nil(t, err) - result, err := rp.getTimeSeries(client.ConfiguredFields{TimeField: "@timestamp"}) + result, err := rp.parseResponse() require.NoError(t, err) require.NotNil(t, result.Responses["A"]) @@ -1833,62 +1863,32 @@ func Test_ProcessRawDataResponse(t *testing.T) { }) } -func newResponseParserForTest(tsdbQueries map[string]string, responseBody string) (*responseParser, error) { - from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC) - to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC) - dataQueries := []backend.DataQuery{} - - for refID, tsdbQueryBody := range tsdbQueries { - dataQueries = append(dataQueries, backend.DataQuery{ - JSON: []byte(tsdbQueryBody), - RefID: refID, - TimeRange: backend.TimeRange{ - From: from, - To: to, - }, - }) - } - - var response client.MultiSearchResponse - err := json.Unmarshal([]byte(responseBody), &response) - if err != nil { - return nil, err - } - - queries, err := parse(dataQueries) - if err != nil { - return nil, err - } - - return newResponseParser(response.Responses, queries, nil), nil -} - func TestHistogramSimple(t *testing.T) { query := map[string]string{ "A": `{ - "timeField": "@timestamp", - "metrics": [{ "type": "count", "id": "1" }], - "bucketAggs": [{ "type": "histogram", "field": "bytes", "id": "3" }] - }`} + "timeField": "@timestamp", + "metrics": [{ "type": "count", "id": "1" }], + "bucketAggs": [{ "type": "histogram", "field": "bytes", "id": "3" }] + }`} response := ` - { - "responses": [ - { - "aggregations": { - "3": { - "buckets": [ - { "doc_count": 1, "key": 1000 }, - { "doc_count": 3, "key": 2000 }, - { "doc_count": 2, "key": 1000 } - ] + { + "responses": [ + { + "aggregations": { + "3": { + "buckets": [ + { "doc_count": 1, "key": 1000 }, + { "doc_count": 3, "key": 2000 }, + { "doc_count": 2, "key": 1000 } + ] + } + } } - } - } - ] - }` - rp, err := newResponseParserForTest(query, response) + ] + }` + rp, err := newResponseParserForTest(query, response, nil, client.ConfiguredFields{TimeField: "@timestamp"}, nil) assert.NoError(t, err) - result, err := rp.getTimeSeries(client.ConfiguredFields{TimeField: "@timestamp"}) + result, err := rp.parseResponse() assert.NoError(t, err) require.Len(t, result.Responses, 1) @@ -2029,40 +2029,40 @@ func TestProcessRawDocumentResponse(t *testing.T) { t.Run("Simple raw document query", func(t *testing.T) { targets := map[string]string{ "A": `{ - "timeField": "@timestamp", - "refId": "A", - "metrics": [{ "type": "raw_document", "id": "1" }], - "bucketAggs": [] - }`, + "timeField": "@timestamp", + "refId": "A", + "metrics": [{ "type": "raw_document", "id": "1" }], + "bucketAggs": [] + }`, } response := ` - { - "responses": [ - { - "hits": { - "total": 100, - "hits": [ - { - "_id": "1", - "_type": "type", - "_index": "index", - "_source": { "sourceProp": "asd" }, - "fields": { "fieldProp": "field" } - }, + { + "responses": [ { - "_source": { "sourceProp": "asd2" }, - "fields": { "fieldProp": "field2" } + "hits": { + "total": 100, + "hits": [ + { + "_id": "1", + "_type": "type", + "_index": "index", + "_source": { "sourceProp": "asd" }, + "fields": { "fieldProp": "field" } + }, + { + "_source": { "sourceProp": "asd2" }, + "fields": { "fieldProp": "field2" } + } + ] } - ] - } - } - ] - }` + } + ] + }` - rp, err := newResponseParserForTest(targets, response) + rp, err := newResponseParserForTest(targets, response, nil, client.ConfiguredFields{TimeField: "@timestamp"}, nil) assert.Nil(t, err) - result, err := rp.getTimeSeries(client.ConfiguredFields{TimeField: "@timestamp"}) + result, err := rp.parseResponse() require.NoError(t, err) require.Len(t, result.Responses, 1) @@ -2083,77 +2083,77 @@ func TestProcessRawDocumentResponse(t *testing.T) { t.Run("More complex raw document query", func(t *testing.T) { targets := map[string]string{ "A": `{ - "timeField": "@timestamp", - "metrics": [{ "type": "raw_document" }] - }`, + "timeField": "@timestamp", + "metrics": [{ "type": "raw_document" }] + }`, } response := `{ - "responses":[ - { - "hits":{ - "total":{ - "value":109, - "relation":"eq" - }, - "max_score":null, - "hits":[ - { - "_index":"logs-2023.02.08", - "_id":"GB2UMYYBfCQ-FCMjayJa", - "_score":null, - "fields":{ - "test_field":"A", - "@timestamp":[ - "2023-02-08T15:10:55.830Z" - ] - }, - "_source":{ - "line":"log text [479231733]", - "counter":"109", - "float":58.253758485091, - "label":"val1", - "level":"info", - "location":"17.089705232090438, 41.62861966340297", - "nested":{ - "field":{ - "double_nested":"value" - } - } - } - }, - { - "_index":"logs-2023.02.08", - "_id":"Fx2UMYYBfCQ-FCMjZyJ_", - "_score":null, - "fields":{ - "test_field":"A" - }, - "_source":{ - "@timestamp":"2023-02-08T15:10:54.835Z", - "line":"log text with ANSI \u001b[31mpart of the text\u001b[0m [493139080]", - "counter":"108", - "float":54.5977098233944, - "label":"val1", - "level":"info", - "location":"19.766305918490463, 40.42639175509792", - "nested":{ - "field":{ - "double_nested":"value1" - } - } - } - } - ] - }, - "status":200 - } - ] - }` + "responses":[ + { + "hits":{ + "total":{ + "value":109, + "relation":"eq" + }, + "max_score":null, + "hits":[ + { + "_index":"logs-2023.02.08", + "_id":"GB2UMYYBfCQ-FCMjayJa", + "_score":null, + "fields":{ + "test_field":"A", + "@timestamp":[ + "2023-02-08T15:10:55.830Z" + ] + }, + "_source":{ + "line":"log text [479231733]", + "counter":"109", + "float":58.253758485091, + "label":"val1", + "level":"info", + "location":"17.089705232090438, 41.62861966340297", + "nested":{ + "field":{ + "double_nested":"value" + } + } + } + }, + { + "_index":"logs-2023.02.08", + "_id":"Fx2UMYYBfCQ-FCMjZyJ_", + "_score":null, + "fields":{ + "test_field":"A" + }, + "_source":{ + "@timestamp":"2023-02-08T15:10:54.835Z", + "line":"log text with ANSI \u001b[31mpart of the text\u001b[0m [493139080]", + "counter":"108", + "float":54.5977098233944, + "label":"val1", + "level":"info", + "location":"19.766305918490463, 40.42639175509792", + "nested":{ + "field":{ + "double_nested":"value1" + } + } + } + } + ] + }, + "status":200 + } + ] + }` - rp, err := newResponseParserForTest(targets, response) + rp, err := newResponseParserForTest(targets, response, nil, client.ConfiguredFields{TimeField: "@timestamp"}, nil) assert.Nil(t, err) - result, err := rp.getTimeSeries(client.ConfiguredFields{TimeField: "@timestamp"}) + result, err := rp.parseResponse() require.NoError(t, err) require.Len(t, result.Responses, 1) @@ -2167,78 +2167,78 @@ func TestProcessRawDocumentResponse(t *testing.T) { doc1 := dataframes[0].Fields[0].At(0).(*json.RawMessage) assert.JSONEq(t, `{ - "@timestamp":["2023-02-08T15:10:55.830Z"], - "_id":"GB2UMYYBfCQ-FCMjayJa", - "_index":"logs-2023.02.08", - "_type":null, - "counter":"109", - "float":58.253758485091, - "label":"val1", - "level":"info", - "line":"log text [479231733]", - "location":"17.089705232090438, 41.62861966340297", - "nested":{ - "field":{ - "double_nested":"value" - } - }, - "test_field":"A" - }`, string(*doc1)) + "@timestamp":["2023-02-08T15:10:55.830Z"], + "_id":"GB2UMYYBfCQ-FCMjayJa", + "_index":"logs-2023.02.08", + "_type":null, + "counter":"109", + "float":58.253758485091, + "label":"val1", + "level":"info", + "line":"log text [479231733]", + "location":"17.089705232090438, 41.62861966340297", + "nested":{ + "field":{ + "double_nested":"value" + } + }, + "test_field":"A" + }`, string(*doc1)) doc2 := dataframes[0].Fields[0].At(1).(*json.RawMessage) assert.JSONEq(t, `{ - "@timestamp":"2023-02-08T15:10:54.835Z", - "_id":"Fx2UMYYBfCQ-FCMjZyJ_", - "_index":"logs-2023.02.08", - "_type":null, - "counter":"108", - "float":54.5977098233944, - "label":"val1", - "level":"info", - "line":"log text with ANSI \u001b[31mpart of the text\u001b[0m [493139080]", - "location":"19.766305918490463, 40.42639175509792", - "nested":{ - "field":{ - "double_nested":"value1" - } - }, - "test_field":"A" - }`, string(*doc2)) + "@timestamp":"2023-02-08T15:10:54.835Z", + "_id":"Fx2UMYYBfCQ-FCMjZyJ_", + "_index":"logs-2023.02.08", + "_type":null, + "counter":"108", + "float":54.5977098233944, + "label":"val1", + "level":"info", + "line":"log text with ANSI \u001b[31mpart of the text\u001b[0m [493139080]", + "location":"19.766305918490463, 40.42639175509792", + "nested":{ + "field":{ + "double_nested":"value1" + } + }, + "test_field":"A" + }`, string(*doc2)) }) t.Run("doc returns timeField preferentially from fields", func(t *testing.T) { // documents that the timefield is taken from `fields` preferentially because we want to ensure it is the format requested in AddTimeFieldWithStandardizedFormat targets := map[string]string{ "A": `{ - "timeField": "@timestamp", - "metrics": [{ "type": "raw_document", "id": "1" }] - }`, + "timeField": "@timestamp", + "metrics": [{ "type": "raw_document", "id": "1" }] + }`, } response := ` - { - "responses":[ - { - "hits":{ - "hits":[ - { - "_source":{ - "@timestamp":"1999-01-01T12:12:12.111Z" - }, - "fields":{ - "@timestamp":[ - "2023-02-08T15:10:55.830Z" - ] - } - } - ] - } - } - ] - }` + { + "responses":[ + { + "hits":{ + "hits":[ + { + "_source":{ + "@timestamp":"1999-01-01T12:12:12.111Z" + }, + "fields":{ + "@timestamp":[ + "2023-02-08T15:10:55.830Z" + ] + } + } + ] + } + } + ] + }` - rp, err := newResponseParserForTest(targets, response) + rp, err := newResponseParserForTest(targets, response, nil, client.ConfiguredFields{TimeField: "@timestamp"}, nil) assert.Nil(t, err) - result, err := rp.getTimeSeries(client.ConfiguredFields{TimeField: "@timestamp"}) + result, err := rp.parseResponse() require.NoError(t, err) require.Len(t, result.Responses, 1) @@ -2258,31 +2258,31 @@ func TestProcessRawDocumentResponse(t *testing.T) { // documents that timeField that in _source will be returned targets := map[string]string{ "A": `{ - "timeField": "@timestamp", - "metrics": [{ "type": "raw_document", "id": "1" }] - }`, + "timeField": "@timestamp", + "metrics": [{ "type": "raw_document", "id": "1" }] + }`, } response := ` - { - "responses":[ - { - "hits":{ - "hits":[ - { - "_source":{ - "@timestamp":"1999-01-01T12:12:12.111Z" - } - } - ] - } - } - ] - }` + { + "responses":[ + { + "hits":{ + "hits":[ + { + "_source":{ + "@timestamp":"1999-01-01T12:12:12.111Z" + } + } + ] + } + } + ] + }` - rp, err := newResponseParserForTest(targets, response) + rp, err := newResponseParserForTest(targets, response, nil, client.ConfiguredFields{TimeField: "@timestamp"}, nil) assert.Nil(t, err) - result, err := rp.getTimeSeries(client.ConfiguredFields{TimeField: "@timestamp"}) + result, err := rp.parseResponse() require.NoError(t, err) require.Len(t, result.Responses, 1) From 7dfbf2fb8dba8d010c3d2d001aa688a7f6c06e30 Mon Sep 17 00:00:00 2001 From: Sarah Zinger Date: Sun, 19 Nov 2023 20:12:03 -0500 Subject: [PATCH 2/5] Fix test after rebase --- pkg/opensearch/response_parser_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/opensearch/response_parser_test.go b/pkg/opensearch/response_parser_test.go index 1ae1c726..77914cb3 100644 --- a/pkg/opensearch/response_parser_test.go +++ b/pkg/opensearch/response_parser_test.go @@ -2434,9 +2434,9 @@ func TestProcessTraceSpans_creates_correct_data_frame_fields(t *testing.T) { ] }` - rp, err := newResponseParserForTest(targets, response) + rp, err := newResponseParserForTest(targets, response, nil, client.ConfiguredFields{TimeField: "testtime"}, nil) assert.NoError(t, err) - result, err := rp.getTimeSeries(client.ConfiguredFields{TimeField: "testtime"}) + result, err := rp.parseResponse() require.NoError(t, err) queryRes := result.Responses["A"] @@ -2541,7 +2541,7 @@ func sortObjectsByKey(rawObject *data.Field, t *testing.T) []KeyValue { jsonRawMessage, ok := rawObject.At(0).(*json.RawMessage) require.True(t, ok) require.NotNil(t, jsonRawMessage) - + var sortedObject []KeyValue err := json.Unmarshal(*jsonRawMessage, &sortedObject) require.Nil(t, err) @@ -2563,7 +2563,7 @@ func sortLogsByTimestamp(rawObject *data.Field, t *testing.T) []Log { jsonRawMessage, ok := rawObject.At(0).(*json.RawMessage) require.True(t, ok) require.NotNil(t, jsonRawMessage) - + var sortedArray []Log err := json.Unmarshal(*jsonRawMessage, &sortedArray) require.Nil(t, err) From afe66267ebf780b0ad37ca2518345ded41ea9cc7 Mon Sep 17 00:00:00 2001 From: Sarah Zinger Date: Mon, 20 Nov 2023 15:46:52 -0500 Subject: [PATCH 3/5] Update pkg/opensearch/response_parser.go Co-authored-by: Kevin Yu --- pkg/opensearch/response_parser.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/opensearch/response_parser.go b/pkg/opensearch/response_parser.go index 1197296d..10c11f01 100644 --- a/pkg/opensearch/response_parser.go +++ b/pkg/opensearch/response_parser.go @@ -46,7 +46,7 @@ type responseParser struct { Responses []*es.SearchResponse Targets []*Query DebugInfo *es.SearchDebugInfo - ConfiguredFields client.ConfiguredFields + ConfiguredFields es.ConfiguredFields DSSettings *backend.DataSourceInstanceSettings } From e449f43008062c5d975e7ab918fef13d179b5f23 Mon Sep 17 00:00:00 2001 From: Sarah Zinger Date: Mon, 20 Nov 2023 15:47:01 -0500 Subject: [PATCH 4/5] Update pkg/opensearch/response_parser.go Co-authored-by: Kevin Yu --- pkg/opensearch/response_parser.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/opensearch/response_parser.go b/pkg/opensearch/response_parser.go index 10c11f01..895d5dcc 100644 --- a/pkg/opensearch/response_parser.go +++ b/pkg/opensearch/response_parser.go @@ -50,7 +50,7 @@ type responseParser struct { DSSettings *backend.DataSourceInstanceSettings } -func newResponseParser(responses []*es.SearchResponse, targets []*Query, debugInfo *es.SearchDebugInfo, configuredFields client.ConfiguredFields, dsSettings *backend.DataSourceInstanceSettings) *responseParser { +func newResponseParser(responses []*es.SearchResponse, targets []*Query, debugInfo *es.SearchDebugInfo, configuredFields es.ConfiguredFields, dsSettings *backend.DataSourceInstanceSettings) *responseParser { return &responseParser{ Responses: responses, Targets: targets, From a033253300b9f2ba795fbf3ca3bca0a502195707 Mon Sep 17 00:00:00 2001 From: Sarah Zinger Date: Mon, 20 Nov 2023 15:48:15 -0500 Subject: [PATCH 5/5] No need to import client --- pkg/opensearch/response_parser.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/opensearch/response_parser.go b/pkg/opensearch/response_parser.go index 895d5dcc..4595fd10 100644 --- a/pkg/opensearch/response_parser.go +++ b/pkg/opensearch/response_parser.go @@ -13,7 +13,6 @@ import ( simplejson "github.com/bitly/go-simplejson" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" - "github.com/grafana/opensearch-datasource/pkg/opensearch/client" es "github.com/grafana/opensearch-datasource/pkg/opensearch/client" utils "github.com/grafana/opensearch-datasource/pkg/utils" )