diff --git a/cmd/plugin-backend.go b/cmd/plugin-backend.go index 30da7d65f..37d165f19 100644 --- a/cmd/plugin-backend.go +++ b/cmd/plugin-backend.go @@ -10,7 +10,7 @@ import ( "github.com/sirupsen/logrus" - "github.com/netobserv/network-observability-console-plugin/pkg/handler" + "github.com/netobserv/network-observability-console-plugin/pkg/loki" "github.com/netobserv/network-observability-console-plugin/pkg/server" ) @@ -72,12 +72,7 @@ func main() { CORSAllowMethods: *corsMethods, CORSAllowHeaders: *corsHeaders, CORSMaxAge: *corsMaxAge, - Loki: handler.LokiConfig{ - URL: lURL, - Timeout: *lokiTimeout, - TenantID: *lokiTenantID, - Labels: strings.Split(lLabels, ","), - }, - FrontendConfig: *frontendConfig, + Loki: loki.NewConfig(lURL, *lokiTimeout, *lokiTenantID, strings.Split(lLabels, ",")), + FrontendConfig: *frontendConfig, }) } diff --git a/pkg/handler/csv/loki_csv.go b/pkg/handler/csv/loki_csv.go index 3c87cb754..f2d516c15 100644 --- a/pkg/handler/csv/loki_csv.go +++ b/pkg/handler/csv/loki_csv.go @@ -13,67 +13,58 @@ const ( ) func GetCSVData(qr *model.QueryResponse, columns []string) ([][]string, error) { - if columns != nil && len(columns) == 0 { - return nil, fmt.Errorf("columns can't be empty if specified") - } - - if streams, ok := qr.Data.Result.(model.Streams); ok { - return manageStreams(streams, columns) - } - return nil, fmt.Errorf("loki returned an unexpected type: %T", qr.Data.Result) -} - -func manageStreams(streams model.Streams, columns []string) ([][]string, error) { - //make csv datas containing header as first line + rows - datas := make([][]string, 1) - //prepare columns for faster lookup - columnsMap := utils.GetMapInterface(columns) - //set Timestamp as first data - includeTimestamp := false - if _, exists := columnsMap[timestampCol]; exists || len(columns) == 0 { - datas[0] = append(datas[0], timestampCol) - includeTimestamp = true - } - //keep ordered labels / field names between each lines - //filtered by columns parameter if specified - var labels []string - var fields []string - for _, stream := range streams { - //get labels from first stream - if labels == nil { - labels = make([]string, 0, len(stream.Labels)) - for name := range stream.Labels { - if _, exists := columnsMap[name]; exists || len(columns) == 0 { - labels = append(fields, name) + if streams, ok := qr.Data.Result.(model.Streams); ok { //make csv datas containing header as first line + rows + data := make([][]string, 1) + //prepare columns for faster lookup + columnsMap := utils.GetMapInterface(columns) + //set Timestamp as first data + includeTimestamp := false + if _, exists := columnsMap[timestampCol]; exists || len(columns) == 0 { + data[0] = append(data[0], timestampCol) + includeTimestamp = true + } + //keep ordered labels / field names between each lines + //filtered by columns parameter if specified + var labels []string + var fields []string + for _, stream := range streams { + //get labels from first stream + if labels == nil { + labels = make([]string, 0, len(stream.Labels)) + for name := range stream.Labels { + if _, exists := columnsMap[name]; exists || len(columns) == 0 { + labels = append(fields, name) + } } + data[0] = append(data[0], labels...) } - datas[0] = append(datas[0], labels...) - } - //apply timestamp & labels for each entries and add json line fields - for _, entry := range stream.Entries { - //get json line - var line map[string]interface{} - err := json.Unmarshal([]byte(entry.Line), &line) - if err != nil { - return nil, fmt.Errorf("cannot unmarshal line %s", entry.Line) - } + //apply timestamp & labels for each entries and add json line fields + for _, entry := range stream.Entries { + //get json line + var line map[string]interface{} + err := json.Unmarshal([]byte(entry.Line), &line) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal line %s", entry.Line) + } - //get fields from first line - if fields == nil { - fields = make([]string, 0, len(line)) - for name := range line { - if _, exists := columnsMap[name]; exists || len(columns) == 0 { - fields = append(fields, name) + //get fields from first line + if fields == nil { + fields = make([]string, 0, len(line)) + for name := range line { + if _, exists := columnsMap[name]; exists || len(columns) == 0 { + fields = append(fields, name) + } } + data[0] = append(data[0], fields...) } - datas[0] = append(datas[0], fields...) - } - datas = append(datas, getRowDatas(stream, entry, labels, fields, line, len(datas[0]), includeTimestamp)) + data = append(data, getRowDatas(stream, entry, labels, fields, line, len(data[0]), includeTimestamp)) + } } + return data, nil } - return datas, nil + return nil, fmt.Errorf("loki returned an unexpected type: %T", qr.Data.Result) } func getRowDatas(stream model.Stream, entry model.Entry, labels, fields []string, diff --git a/pkg/handler/export.go b/pkg/handler/export.go new file mode 100644 index 000000000..6d0118e2e --- /dev/null +++ b/pkg/handler/export.go @@ -0,0 +1,43 @@ +package handler + +import ( + "fmt" + "net/http" + "strings" + + "github.com/netobserv/network-observability-console-plugin/pkg/loki" +) + +const ( + exportCSVFormat = "csv" + exportFormatKey = "format" + exportcolumnsKey = "columns" +) + +func ExportFlows(cfg loki.Config) func(w http.ResponseWriter, r *http.Request) { + lokiClient := newLokiClient(&cfg) + + return func(w http.ResponseWriter, r *http.Request) { + params := r.URL.Query() + hlog.Debugf("ExportFlows query params: %s", params) + + flows, code, err := getFlows(cfg, lokiClient, params) + if err != nil { + writeError(w, code, err.Error()) + return + } + + exportFormat := params.Get(exportFormatKey) + var exportColumns []string + if str := params.Get(exportcolumnsKey); len(str) > 0 { + exportColumns = strings.Split(str, ",") + } + + switch exportFormat { + case exportCSVFormat: + writeCSV(w, http.StatusOK, flows, exportColumns) + default: + writeError(w, http.StatusBadRequest, fmt.Sprintf("export format %q is not valid", exportFormat)) + } + } +} diff --git a/pkg/handler/flows.go b/pkg/handler/flows.go new file mode 100644 index 000000000..bc176198c --- /dev/null +++ b/pkg/handler/flows.go @@ -0,0 +1,136 @@ +package handler + +import ( + "errors" + "net/http" + "net/url" + "strconv" + "strings" + "time" + + "github.com/netobserv/network-observability-console-plugin/pkg/httpclient" + "github.com/netobserv/network-observability-console-plugin/pkg/loki" +) + +const ( + startTimeKey = "startTime" + endTimeKey = "endTime" + timeRangeKey = "timeRange" + limitKey = "limit" + reporterKey = "reporter" + filtersKey = "filters" +) + +type errorWithCode struct { + err error + code int +} + +type keyValues = []string + +// Example of raw filters (url-encoded): +// foo=a,b&bar=c|baz=d +func parseFilters(raw string) ([][]keyValues, error) { + var parsed [][]keyValues + decoded, err := url.QueryUnescape(raw) + if err != nil { + return nil, err + } + groups := strings.Split(decoded, "|") + for _, group := range groups { + var andFilters []keyValues + filters := strings.Split(group, "&") + for _, filter := range filters { + pair := strings.Split(filter, "=") + if len(pair) == 2 { + andFilters = append(andFilters, pair) + } + } + parsed = append(parsed, andFilters) + } + return parsed, nil +} + +func getStartTime(params url.Values) (string, error) { + start := params.Get(startTimeKey) + if len(start) == 0 { + tr := params.Get(timeRangeKey) + if len(tr) > 0 { + r, err := strconv.ParseInt(tr, 10, 64) + if err != nil { + return "", errors.New("Could not parse time range: " + err.Error()) + } + start = strconv.FormatInt(time.Now().Unix()-r, 10) + } + } + return start, nil +} + +func GetFlows(cfg loki.Config) func(w http.ResponseWriter, r *http.Request) { + lokiClient := newLokiClient(&cfg) + + return func(w http.ResponseWriter, r *http.Request) { + params := r.URL.Query() + hlog.Debugf("GetFlows query params: %s", params) + + flows, code, err := getFlows(cfg, lokiClient, params) + if err != nil { + writeError(w, code, err.Error()) + return + } + + writeRawJSON(w, http.StatusOK, flows) + } +} + +func getFlows(cfg loki.Config, client httpclient.HTTPClient, params url.Values) ([]byte, int, error) { + start, err := getStartTime(params) + if err != nil { + return nil, http.StatusBadRequest, err + } + end := params.Get(endTimeKey) + limit := params.Get(limitKey) + reporter := params.Get(reporterKey) + rawFilters := params.Get(filtersKey) + filterGroups, err := parseFilters(rawFilters) + if err != nil { + return nil, http.StatusBadRequest, err + } + + var rawJSON []byte + if len(filterGroups) > 1 { + // match any, and multiple filters => run in parallel then aggregate + var queries []string + for _, group := range filterGroups { + qb := loki.NewFlowQueryBuilder(&cfg, start, end, limit, reporter) + err := qb.Filters(group) + if err != nil { + return nil, http.StatusBadRequest, errors.New("Can't build query: " + err.Error()) + } + queries = append(queries, qb.Build()) + } + res, code, err := fetchParallel(client, queries) + if err != nil { + return nil, code, errors.New("Error while fetching flows from Loki: " + err.Error()) + } + rawJSON = res + } else { + // else, run all at once + qb := loki.NewFlowQueryBuilder(&cfg, start, end, limit, reporter) + if len(filterGroups) > 0 { + err := qb.Filters(filterGroups[0]) + if err != nil { + return nil, http.StatusBadRequest, err + } + } + query := qb.Build() + resp, code, err := executeLokiQuery(query, client) + if err != nil { + return nil, code, errors.New("Error while fetching flows from Loki: " + err.Error()) + } + rawJSON = resp + } + + hlog.Tracef("GetFlows raw response: %v", string(rawJSON)) + return rawJSON, http.StatusOK, nil +} diff --git a/pkg/handler/flows_test.go b/pkg/handler/flows_test.go new file mode 100644 index 000000000..6443334ea --- /dev/null +++ b/pkg/handler/flows_test.go @@ -0,0 +1,63 @@ +package handler + +import ( + "net/url" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestParseFilters(t *testing.T) { + // 2 groups + groups, err := parseFilters(url.QueryEscape("foo=a,b&bar=c|baz=d")) + require.NoError(t, err) + + assert.Len(t, groups, 2) + assert.Equal(t, [][]string{ + {"foo", "a,b"}, + {"bar", "c"}, + }, groups[0]) + assert.Equal(t, [][]string{ + {"baz", "d"}, + }, groups[1]) + + // Resource path + port, match all + groups, err = parseFilters(url.QueryEscape(`SrcK8S_Type="Pod"&SrcK8S_Namespace="default"&SrcK8S_Name="test"&SrcPort=8080`)) + require.NoError(t, err) + + assert.Len(t, groups, 1) + assert.Equal(t, [][]string{ + {"SrcK8S_Type", `"Pod"`}, + {"SrcK8S_Namespace", `"default"`}, + {"SrcK8S_Name", `"test"`}, + {"SrcPort", `8080`}, + }, groups[0]) + + // Resource path + port, match any + groups, err = parseFilters(url.QueryEscape(`SrcK8S_Type="Pod"&SrcK8S_Namespace="default"&SrcK8S_Name="test"|SrcPort=8080`)) + require.NoError(t, err) + + assert.Len(t, groups, 2) + assert.Equal(t, [][]string{ + {"SrcK8S_Type", `"Pod"`}, + {"SrcK8S_Namespace", `"default"`}, + {"SrcK8S_Name", `"test"`}, + }, groups[0]) + + assert.Equal(t, [][]string{ + {"SrcPort", `8080`}, + }, groups[1]) + + // Resource path + name, match all + groups, err = parseFilters(url.QueryEscape(`SrcK8S_Type="Pod"&SrcK8S_Namespace="default"&SrcK8S_Name="test"&SrcK8S_Name="nomatch"`)) + require.NoError(t, err) + + assert.Len(t, groups, 1) + assert.Equal(t, [][]string{ + {"SrcK8S_Type", `"Pod"`}, + {"SrcK8S_Namespace", `"default"`}, + {"SrcK8S_Name", `"test"`}, + {"SrcK8S_Name", `"nomatch"`}, + }, groups[0]) +} diff --git a/pkg/handler/loki.go b/pkg/handler/loki.go index 588b4c13a..6cb8ce066 100644 --- a/pkg/handler/loki.go +++ b/pkg/handler/loki.go @@ -5,31 +5,23 @@ import ( "errors" "fmt" "net/http" - "net/url" "strings" - "time" + "sync" "github.com/sirupsen/logrus" "github.com/netobserv/network-observability-console-plugin/pkg/httpclient" "github.com/netobserv/network-observability-console-plugin/pkg/loki" + "github.com/netobserv/network-observability-console-plugin/pkg/model" ) var hlog = logrus.WithField("module", "handler") const ( - exportCSVFormat = "csv" lokiOrgIDHeader = "X-Scope-OrgID" ) -type LokiConfig struct { - URL *url.URL - Timeout time.Duration - TenantID string - Labels []string -} - -func newLokiClient(cfg *LokiConfig) httpclient.HTTPClient { +func newLokiClient(cfg *loki.Config) httpclient.HTTPClient { var headers map[string][]string if cfg.TenantID != "" { headers = map[string][]string{ @@ -40,45 +32,6 @@ func newLokiClient(cfg *LokiConfig) httpclient.HTTPClient { return httpclient.NewHTTPClient(cfg.Timeout, headers) } -func GetFlows(cfg LokiConfig, allowExport bool) func(w http.ResponseWriter, r *http.Request) { - lokiClient := newLokiClient(&cfg) - - // TODO: improve search mecanism: - // - better way to make difference between labels and values - // - don't always use regex (port number for example) - // - manage range (check RANGE_SPLIT_CHAR on front side) - return func(w http.ResponseWriter, r *http.Request) { - params := r.URL.Query() - hlog.Debugf("GetFlows query params: %s", params) - - //allow export only on specific endpoints - queryBuilder := loki.NewQuery(cfg.URL.String(), cfg.Labels, allowExport) - if err := queryBuilder.AddParams(params); err != nil { - writeError(w, http.StatusBadRequest, err.Error()) - return - } - - resp, code, err := executeFlowQuery(queryBuilder, lokiClient) - if err != nil { - writeError(w, code, "Loki query failed: "+err.Error()) - return - } - - hlog.Tracef("GetFlows raw response: %s", resp) - if allowExport { - switch f := queryBuilder.ExportFormat(); f { - case exportCSVFormat: - writeCSV(w, http.StatusOK, resp, queryBuilder.ExportColumns()) - default: - writeError(w, http.StatusServiceUnavailable, - fmt.Sprintf("export format %q is not valid", f)) - } - } else { - writeRawJSON(w, http.StatusOK, resp) - } - } -} - /* loki query will fail if spaces or quotes are not encoded * we can't use url.QueryEscape or url.Values here since Loki doesn't manage encoded parenthesis */ @@ -101,23 +54,10 @@ func getLokiError(resp []byte, code int) string { return fmt.Sprintf("Error from Loki (code: %d): %s", code, message) } -func executeFlowQuery(queryBuilder *loki.Query, lokiClient httpclient.HTTPClient) ([]byte, int, error) { - queryBuilder, err := queryBuilder.PrepareToSubmit() - if err != nil { - return nil, http.StatusBadRequest, err - } - - flowsURL, err := queryBuilder.URLQuery() - if err != nil { - return nil, http.StatusBadRequest, err - } - return executeLokiQuery(flowsURL, lokiClient) -} - func executeLokiQuery(flowsURL string, lokiClient httpclient.HTTPClient) ([]byte, int, error) { hlog.Debugf("executeLokiQuery URL: %s", flowsURL) - resp, code, err := lokiClient.Get(EncodeQuery(flowsURL)) + resp, code, err := lokiClient.Get(flowsURL) if err != nil { return nil, http.StatusServiceUnavailable, err } @@ -127,3 +67,66 @@ func executeLokiQuery(flowsURL string, lokiClient httpclient.HTTPClient) ([]byte } return resp, http.StatusOK, nil } + +func fetchParallel(lokiClient httpclient.HTTPClient, queries []string) ([]byte, int, error) { + // Run queries in parallel, then aggregate them + resChan := make(chan model.QueryResponse, len(queries)) + errChan := make(chan errorWithCode, len(queries)) + var wg sync.WaitGroup + wg.Add(len(queries)) + + for _, q := range queries { + go func(query string) { + defer wg.Done() + resp, code, err := executeLokiQuery(query, lokiClient) + if err != nil { + errChan <- errorWithCode{err: err, code: code} + } else { + var qr model.QueryResponse + err := json.Unmarshal(resp, &qr) + if err != nil { + errChan <- errorWithCode{err: err, code: http.StatusInternalServerError} + } else { + resChan <- qr + } + } + }(q) + } + + wg.Wait() + close(resChan) + close(errChan) + + for errWithCode := range errChan { + return nil, errWithCode.code, errWithCode.err + } + + // Aggregate results + first := true + var resp model.QueryResponse + merger := loki.NewStreamMerger() + for r := range resChan { + if streams, ok := r.Data.Result.(model.Streams); ok { + merger.Add(streams) + if first { + first = false + resp = r + } + } else { + return nil, http.StatusInternalServerError, fmt.Errorf("loki returned an unexpected type: %T", r.Data.Result) + } + } + + if first { + return []byte{}, http.StatusNoContent, nil + } + + // Encode back to json + resp.Data.Result = merger.GetStreams() + encoded, err := json.Marshal(resp) + if err != nil { + return nil, http.StatusInternalServerError, err + } + + return encoded, http.StatusOK, nil +} diff --git a/pkg/handler/resources.go b/pkg/handler/resources.go index f23f63dee..d977e7478 100644 --- a/pkg/handler/resources.go +++ b/pkg/handler/resources.go @@ -16,28 +16,32 @@ import ( "github.com/netobserv/network-observability-console-plugin/pkg/utils" ) -func GetNamespaces(cfg LokiConfig) func(w http.ResponseWriter, r *http.Request) { +func GetNamespaces(cfg loki.Config) func(w http.ResponseWriter, r *http.Request) { lokiClient := newLokiClient(&cfg) return func(w http.ResponseWriter, r *http.Request) { + // Initialize values explicitely to avoid null json when emtpy + values := []string{} + // Fetch and merge values for SrcK8S_Namespace and DstK8S_Namespace - values, code, err := getLabelValues(&cfg, lokiClient, fields.SrcNamespace) + values1, code, err := getLabelValues(&cfg, lokiClient, fields.SrcNamespace) if err != nil { writeError(w, code, "Error while fetching label source namespace values from Loki: "+err.Error()) return } + values = append(values, values1...) values2, code, err := getLabelValues(&cfg, lokiClient, fields.DstNamespace) if err != nil { writeError(w, code, "Error while fetching label destination namespace values from Loki: "+err.Error()) return } - values = append(values, values2...) + writeJSON(w, http.StatusOK, utils.NonEmpty(utils.Dedup(values))) } } -func getLabelValues(cfg *LokiConfig, lokiClient httpclient.HTTPClient, label string) ([]string, int, error) { +func getLabelValues(cfg *loki.Config, lokiClient httpclient.HTTPClient, label string) ([]string, int, error) { baseURL := strings.TrimRight(cfg.URL.String(), "/") url := fmt.Sprintf("%s/loki/api/v1/label/%s/values", baseURL, label) hlog.Debugf("getLabelValues URL: %s", url) @@ -59,48 +63,55 @@ func getLabelValues(cfg *LokiConfig, lokiClient httpclient.HTTPClient, label str return lvr.Data, http.StatusOK, nil } -func GetNames(cfg LokiConfig) func(w http.ResponseWriter, r *http.Request) { +func GetNames(cfg loki.Config) func(w http.ResponseWriter, r *http.Request) { lokiClient := newLokiClient(&cfg) return func(w http.ResponseWriter, r *http.Request) { params := mux.Vars(r) namespace := params["namespace"] kind := params["kind"] - names, code, err := getNamesForPrefix(cfg, lokiClient, fields.Src, kind, namespace) + // Initialize names explicitely to avoid null json when emtpy + names := []string{} + + // TODO: parallelize + names1, code, err := getNamesForPrefix(cfg, lokiClient, fields.Src, kind, namespace) if err != nil { writeError(w, code, err.Error()) return } + names = append(names, names1...) + names2, code, err := getNamesForPrefix(cfg, lokiClient, fields.Dst, kind, namespace) if err != nil { writeError(w, code, err.Error()) return } - names = append(names, names2...) + writeJSON(w, http.StatusOK, utils.NonEmpty(utils.Dedup(names))) } } -func getNamesForPrefix(cfg LokiConfig, lokiClient httpclient.HTTPClient, prefix, kind, namespace string) ([]string, int, error) { - lokiParams := map[string][]string{ - prefix + fields.Namespace: {exact(namespace)}, - } +func getNamesForPrefix(cfg loki.Config, lokiClient httpclient.HTTPClient, prefix, kind, namespace string) ([]string, int, error) { + lokiParams := [][]string{{ + prefix + fields.Namespace, exact(namespace), + }} var fieldToExtract string if utils.IsOwnerKind(kind) { - lokiParams[prefix+fields.OwnerType] = []string{exact(kind)} + lokiParams = append(lokiParams, []string{prefix + fields.OwnerType, exact(kind)}) fieldToExtract = prefix + fields.OwnerName } else { - lokiParams[prefix+fields.Type] = []string{exact(kind)} + lokiParams = append(lokiParams, []string{prefix + fields.Type, exact(kind)}) fieldToExtract = prefix + fields.Name } - queryBuilder := loki.NewQuery(cfg.URL.String(), cfg.Labels, false) - if err := queryBuilder.AddParams(lokiParams); err != nil { + queryBuilder := loki.NewFlowQueryBuilderWithDefaults(&cfg) + if err := queryBuilder.Filters(lokiParams); err != nil { return nil, http.StatusBadRequest, err } - resp, code, err := executeFlowQuery(queryBuilder, lokiClient) + query := queryBuilder.Build() + resp, code, err := executeLokiQuery(query, lokiClient) if err != nil { return nil, code, errors.New("Loki query failed: " + err.Error()) } diff --git a/pkg/handler/response.go b/pkg/handler/response.go index aca0e63ff..753fc535b 100644 --- a/pkg/handler/response.go +++ b/pkg/handler/response.go @@ -44,11 +44,12 @@ func writeCSV(w http.ResponseWriter, code int, payload []byte, columns []string) return } - datas, err := csvdata.GetCSVData(&qr, columns) + data, err := csvdata.GetCSVData(&qr, columns) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } + hlog.Tracef("CSV data: %v", data) t := time.Now() //output file would be 'export-stdLongYear-stdZeroMonth-stdZeroDay-stdHour-stdZeroMinute.csv' @@ -57,7 +58,7 @@ func writeCSV(w http.ResponseWriter, code int, payload []byte, columns []string) w.Header().Set("Transfer-Encoding", "chunked") w.WriteHeader(code) writer := csv.NewWriter(w) - for _, row := range datas { + for _, row := range data { //write csv row err := writer.Write(row) if err != nil { diff --git a/pkg/handler/topology.go b/pkg/handler/topology.go index 108eff261..0f536c9f1 100644 --- a/pkg/handler/topology.go +++ b/pkg/handler/topology.go @@ -1,42 +1,89 @@ package handler import ( + "errors" "net/http" + "net/url" + "github.com/netobserv/network-observability-console-plugin/pkg/httpclient" "github.com/netobserv/network-observability-console-plugin/pkg/loki" ) -func GetTopology(cfg LokiConfig) func(w http.ResponseWriter, r *http.Request) { +func GetTopology(cfg loki.Config) func(w http.ResponseWriter, r *http.Request) { lokiClient := newLokiClient(&cfg) return func(w http.ResponseWriter, r *http.Request) { - params := r.URL.Query() - hlog.Debugf("GetTopology query params: %s", params) - - queryBuilder := loki.NewTopologyQuery(cfg.URL.String(), cfg.Labels) - if err := queryBuilder.AddParams(params); err != nil { - writeError(w, http.StatusBadRequest, err.Error()) + flows, code, err := getTopologyFlows(cfg, lokiClient, r.URL.Query()) + if err != nil { + writeError(w, code, err.Error()) return } - queryBuilder, err := queryBuilder.PrepareToSubmit() + writeRawJSON(w, http.StatusOK, flows) + } +} + +func getTopologyFlows(cfg loki.Config, client httpclient.HTTPClient, params url.Values) ([]byte, int, error) { + hlog.Debugf("GetTopology query params: %s", params) + + start, err := getStartTime(params) + if err != nil { + return nil, http.StatusBadRequest, err + } + end := params.Get(endTimeKey) + limit := params.Get(limitKey) + reporter := params.Get(reporterKey) + rawFilters := params.Get(filtersKey) + filterGroups, err := parseFilters(rawFilters) + if err != nil { + return nil, http.StatusBadRequest, err + } + + var rawJSON []byte + if len(filterGroups) > 1 { + // match any, and multiple filters => run in parallel then aggregate + var queries []string + for _, group := range filterGroups { + query, code, err := buildTopologyQuery(&cfg, group, start, end, limit, reporter) + if err != nil { + return nil, code, errors.New("Can't build query: " + err.Error()) + } + queries = append(queries, query) + } + res, code, err := fetchParallel(client, queries) if err != nil { - writeError(w, http.StatusBadRequest, err.Error()) - return + return nil, code, errors.New("Error while fetching flows from Loki: " + err.Error()) } - - flowsURL, err := queryBuilder.URLQuery() + rawJSON = res + } else { + // else, run all at once + var filters [][]string + if len(filterGroups) > 0 { + filters = filterGroups[0] + } + query, code, err := buildTopologyQuery(&cfg, filters, start, end, limit, reporter) if err != nil { - writeError(w, http.StatusBadRequest, err.Error()) - return + return nil, code, err } - resp, code, err := executeLokiQuery(flowsURL, lokiClient) + resp, code, err := executeLokiQuery(query, client) if err != nil { - writeError(w, code, "Loki query failed: "+err.Error()) - return + return nil, code, errors.New("Error while fetching flows from Loki: " + err.Error()) } + rawJSON = resp + } + + hlog.Tracef("GetTopology raw response: %v", rawJSON) + return rawJSON, http.StatusOK, nil +} - hlog.Tracef("GetFlows raw response: %s", resp) - writeRawJSON(w, http.StatusOK, resp) +func buildTopologyQuery(cfg *loki.Config, filters [][]string, start, end, limit, reporter string) (string, int, error) { + qb, err := loki.NewTopologyQuery(cfg, start, end, limit, reporter) + if err != nil { + return "", http.StatusBadRequest, err + } + err = qb.Filters(filters) + if err != nil { + return "", http.StatusBadRequest, err } + return EncodeQuery(qb.Build()), http.StatusOK, nil } diff --git a/pkg/loki/config.go b/pkg/loki/config.go new file mode 100644 index 000000000..0d9798f98 --- /dev/null +++ b/pkg/loki/config.go @@ -0,0 +1,29 @@ +package loki + +import ( + "net/url" + "time" + + "github.com/netobserv/network-observability-console-plugin/pkg/utils" +) + +type Config struct { + URL *url.URL + Timeout time.Duration + TenantID string + Labels map[string]struct{} +} + +func NewConfig(url *url.URL, timeout time.Duration, tenantID string, labels []string) Config { + return Config{ + URL: url, + Timeout: timeout, + TenantID: tenantID, + Labels: utils.GetMapInterface(labels), + } +} + +func (c *Config) IsLabel(key string) bool { + _, isLabel := c.Labels[key] + return isLabel +} diff --git a/pkg/loki/convert.go b/pkg/loki/convert.go deleted file mode 100644 index 939b19e50..000000000 --- a/pkg/loki/convert.go +++ /dev/null @@ -1,87 +0,0 @@ -package loki - -import ( - "regexp" - "strings" - - "github.com/sirupsen/logrus" -) - -var clog = logrus.WithField("component", "loki.convertToAnyMatch") - -// convertToAnyMatch returns a new query with all the matchers and filters -// of the receiver query. The returned query would match if only one of the -// filters/attributes/selectors match. -func (q *Query) convertToAnyMatch() *Query { - // if the input query only has zero or one criterion, or it only has Label filters, - // we return a copy by changing the Label Joiner - if len(q.lineFilters)+len(q.labelFilters)+len(q.streamSelector) <= 1 || - len(q.streamSelector)+len(q.lineFilters) == 0 { - // return a copy of the input query, just changing the label joiner - cp := *q - cp.labelJoiner = joinOr - return &cp - } - - out := Query{ - baseURL: q.baseURL, - urlParams: q.urlParams, - labelJoiner: joinOr, - specialAttrs: q.specialAttrs, - } - // if the input query only has line filters, we merge them into a single - // regexp and return it - if len(q.labelFilters)+len(q.streamSelector) == 0 { - out.lineFilters = []string{ - "(" + strings.Join(q.lineFilters, ")|(") + ")", - } - return &out - } - // otherwise, we move all the arguments to Label filters to be checked after JSON conversion - out.labelFilters = append(q.streamSelector, q.labelFilters...) - for _, lf := range q.lineFilters { - label, ok := lineToLabelFilter(lf) - if !ok { - clog.WithField("lineFilter", lf). - Warningf("line filter can't be parsed as json attribute. Ignoring it") - continue - } - out.labelFilters = append(out.labelFilters, label) - } - return &out -} - -// jsonField captures the fields of any Json argument. -// Second capture: name of the field. -// Third capture: JSON string value or empty -// Fourth capture: JSON number value or empty -var jsonField = regexp.MustCompile(`([\w.-]*)":(?:(?:"((?:(?:\^")|[^"])*)"?)|(\d+))$`) - -// lineToLabelFilter extracts the JSON field name and value from a line filter and converts -// it to a labelFilter -func lineToLabelFilter(lf string) (labelFilter, bool) { - submatch := jsonField.FindStringSubmatch(lf) - if len(submatch) == 0 { - return labelFilter{}, false - } - if len(submatch[3]) > 0 { - return labelFilter{ - key: submatch[1], - matcher: labelEqual, - value: submatch[3], - valueType: typeNumber, - }, true - } - - v := submatch[2] - if !strings.HasSuffix(v, `"`) && !strings.HasSuffix(v, ".*") { - v = v + ".*" - } - - return labelFilter{ - key: submatch[1], - matcher: labelMatches, - value: v, - valueType: typeRegex, - }, true -} diff --git a/pkg/loki/filter.go b/pkg/loki/filter.go index a74407828..2ad9f9d29 100644 --- a/pkg/loki/filter.go +++ b/pkg/loki/filter.go @@ -2,7 +2,6 @@ package loki import ( "fmt" - "strconv" "strings" ) @@ -31,16 +30,16 @@ type labelFilter struct { valueType valueType } -func stringLabelFilter(labelKey string, matcher labelMatcher, value string) labelFilter { +func stringLabelFilter(labelKey string, value string) labelFilter { return labelFilter{ key: labelKey, - matcher: matcher, + matcher: labelEqual, value: value, valueType: typeString, } } -func regexLabelFilter(labelKey string, matcher labelMatcher, value string) labelFilter { +func regexLabelFilter(labelKey string, value string) labelFilter { return labelFilter{ key: labelKey, matcher: labelMatches, @@ -49,15 +48,6 @@ func regexLabelFilter(labelKey string, matcher labelMatcher, value string) label } } -func intLabelFilter(labelKey string, value int) labelFilter { - return labelFilter{ - key: labelKey, - matcher: labelEqual, - value: strconv.Itoa(value), - valueType: typeNumber, - } -} - func ipLabelFilter(labelKey, cidr string) labelFilter { return labelFilter{ key: labelKey, diff --git a/pkg/loki/flow_query.go b/pkg/loki/flow_query.go new file mode 100644 index 000000000..0e7c2b4ee --- /dev/null +++ b/pkg/loki/flow_query.go @@ -0,0 +1,247 @@ +// Package loki provides functionalities for interacting with Loki +package loki + +import ( + "fmt" + "regexp" + "strings" + + "github.com/netobserv/network-observability-console-plugin/pkg/model/fields" + "github.com/netobserv/network-observability-console-plugin/pkg/utils/constants" +) + +const ( + startParam = "start" + endParam = "end" + limitParam = "limit" + queryRangePath = "/loki/api/v1/query_range?query=" + jsonOrJoiner = "+or+" +) + +// can contains only alphanumeric / '-' / '_' / '.' / ',' / '"' / '*' / ':' / '/' characteres +var filterRegexpValidation = regexp.MustCompile(`^[\w-_.,\"*:/]*$`) + +// remove quotes and replace * by regex any +var valueReplacer = strings.NewReplacer(`*`, `.*`, `"`, "") + +// FlowQueryBuilder stores a state to build a LogQL query +type FlowQueryBuilder struct { + config *Config + startTime string + endTime string + limit string + labelFilters []labelFilter + lineFilters []string + jsonFilters [][]labelFilter +} + +func NewFlowQueryBuilder(cfg *Config, start, end, limit, reporter string) *FlowQueryBuilder { + // Always use app stream selector, which will apply whichever matching criteria (any or all) + labelFilters := []labelFilter{ + stringLabelFilter(constants.AppLabel, constants.AppLabelValue), + } + if reporter == constants.ReporterSource { + labelFilters = append(labelFilters, stringLabelFilter(fields.FlowDirection, "1")) + } else if reporter == constants.ReporterDestination { + labelFilters = append(labelFilters, stringLabelFilter(fields.FlowDirection, "0")) + } + return &FlowQueryBuilder{ + config: cfg, + startTime: start, + endTime: end, + limit: limit, + labelFilters: labelFilters, + } +} + +func NewFlowQueryBuilderWithDefaults(cfg *Config) *FlowQueryBuilder { + return NewFlowQueryBuilder(cfg, "", "", "", constants.ReporterBoth) +} + +func (q *FlowQueryBuilder) Filters(filters [][]string) error { + for _, filter := range filters { + if err := q.AddFilter(filter[0], filter[1]); err != nil { + return err + } + } + return nil +} + +func (q *FlowQueryBuilder) AddFilter(key, joinedValues string) error { + if !filterRegexpValidation.MatchString(joinedValues) { + return fmt.Errorf("unauthorized sign in flows request: %s", joinedValues) + } + + values := strings.Split(joinedValues, ",") + + // Stream selector labels + if q.config.IsLabel(key) { + if len(values) == 1 && isExactMatch(values[0]) { + q.addExactMatchSingleLabel(key, trimExactMatch(values[0])) + } else { + q.addLabelRegex(key, values) + } + } else if fields.IsIP(key) { + q.addIPFilters(key, values) + } else { + q.addLineFilters(key, values) + } + + return nil +} + +func (q *FlowQueryBuilder) addExactMatchSingleLabel(key string, value string) { + q.labelFilters = append(q.labelFilters, stringLabelFilter(key, value)) +} + +func (q *FlowQueryBuilder) addLabelRegex(key string, values []string) { + regexStr := strings.Builder{} + for i, value := range values { + if i > 0 { + regexStr.WriteByte('|') + } + //match the beginning of string if quoted without a star + //and case insensitive if no quotes + if !strings.HasPrefix(value, `"`) { + regexStr.WriteString("(?i).*") + } else if !strings.HasPrefix(value, `"*`) { + regexStr.WriteString("^") + } + //inject value with regex + regexStr.WriteString(valueReplacer.Replace(value)) + //match the end of string if quoted without a star + if !strings.HasSuffix(value, `"`) { + regexStr.WriteString(".*") + } else if !strings.HasSuffix(value, `*"`) { + regexStr.WriteString("$") + } + } + + if regexStr.Len() > 0 { + q.labelFilters = append(q.labelFilters, regexLabelFilter(key, regexStr.String())) + } +} + +func (q *FlowQueryBuilder) addLineFilters(key string, values []string) { + regexStr := strings.Builder{} + for i, value := range values { + if i > 0 { + regexStr.WriteByte('|') + } + //match end of KEY + regex VALUE: + //if numeric, KEY":VALUE + //if string KEY":"VALUE" + //ie 'Port' key will match both 'SrcPort":"XXX"' and 'DstPort":"XXX" + //VALUE can be quoted for exact match or contains * to inject regex any + regexStr.WriteString(key) + regexStr.WriteString(`":`) + if fields.IsNumeric(key) { + regexStr.WriteString(value) + } else { + regexStr.WriteString(`"`) + // match start any if not quoted + // and case insensitive + if !strings.HasPrefix(value, `"`) { + regexStr.WriteString("(?i).*") + } + //inject value with regex + regexStr.WriteString(valueReplacer.Replace(value)) + // match end any if not quoted + if !strings.HasSuffix(value, `"`) { + regexStr.WriteString(".*") + } + regexStr.WriteString(`"`) + } + } + + if regexStr.Len() > 0 { + q.lineFilters = append(q.lineFilters, regexStr.String()) + } +} + +// addIPFilters assumes that we are searching for that IP addresses as part +// of the log line (not in the stream selector labels) +func (q *FlowQueryBuilder) addIPFilters(key string, values []string) { + var filtersPerKey []labelFilter + for _, value := range values { + filtersPerKey = append(filtersPerKey, ipLabelFilter(key, value)) + } + q.jsonFilters = append(q.jsonFilters, filtersPerKey) +} + +func (q *FlowQueryBuilder) createStringBuilderURL() *strings.Builder { + sb := strings.Builder{} + sb.WriteString(strings.TrimRight(q.config.URL.String(), "/")) + sb.WriteString(queryRangePath) + return &sb +} + +func (q *FlowQueryBuilder) appendLabels(sb *strings.Builder) { + sb.WriteString("{") + for i, ss := range q.labelFilters { + if i > 0 { + sb.WriteByte(',') + } + ss.writeInto(sb) + } + sb.WriteByte('}') +} + +func (q *FlowQueryBuilder) appendLineFilters(sb *strings.Builder) { + for _, lf := range q.lineFilters { + sb.WriteString("|~`") + sb.WriteString(lf) + sb.WriteByte('`') + } +} + +func (q *FlowQueryBuilder) appendJSON(sb *strings.Builder, forceAppend bool) { + if forceAppend || len(q.jsonFilters) > 0 { + sb.WriteString("|json") + for _, lfPerKey := range q.jsonFilters { + sb.WriteByte('|') + for i, lf := range lfPerKey { + if i > 0 { + sb.WriteString(jsonOrJoiner) + } + lf.writeInto(sb) + } + } + } +} + +func (q *FlowQueryBuilder) appendQueryParams(sb *strings.Builder) { + if len(q.startTime) > 0 { + appendQueryParam(sb, startParam, q.startTime) + } + if len(q.endTime) > 0 { + appendQueryParam(sb, endParam, q.endTime) + } + if len(q.limit) > 0 { + appendQueryParam(sb, limitParam, q.limit) + } +} + +func (q *FlowQueryBuilder) Build() string { + sb := q.createStringBuilderURL() + q.appendLabels(sb) + q.appendLineFilters(sb) + q.appendJSON(sb, false) + q.appendQueryParams(sb) + return sb.String() +} + +func appendQueryParam(sb *strings.Builder, key, value string) { + sb.WriteByte('&') + sb.WriteString(key) + sb.WriteByte('=') + sb.WriteString(value) +} + +func isExactMatch(value string) bool { + return strings.HasPrefix(value, `"`) && strings.HasSuffix(value, `"`) +} + +func trimExactMatch(value string) string { + return strings.TrimPrefix(strings.TrimSuffix(value, `"`), `"`) +} diff --git a/pkg/loki/merger.go b/pkg/loki/merger.go new file mode 100644 index 000000000..c4eeb2f36 --- /dev/null +++ b/pkg/loki/merger.go @@ -0,0 +1,85 @@ +package loki + +import ( + "sort" + "strings" + + "github.com/netobserv/network-observability-console-plugin/pkg/model" +) + +type StreamMerger struct { + index map[string]indexedStream + merged model.Streams +} + +func NewStreamMerger() StreamMerger { + return StreamMerger{ + index: map[string]indexedStream{}, + merged: model.Streams{}, + } +} + +type indexedStream struct { + stream model.Stream + entries map[string]interface{} + index int +} + +func uniqueStream(s *model.Stream) string { + var labels []string + for lbl := range s.Labels { + labels = append(labels, lbl) + } + sort.Strings(labels) + sb := strings.Builder{} + for _, lbl := range labels { + sb.WriteString(lbl) + sb.WriteByte('=') + sb.WriteString(s.Labels[lbl]) + sb.WriteByte(',') + } + return sb.String() +} + +func uniqueEntry(e *model.Entry) string { + return e.Timestamp.String() + e.Line +} + +func (m *StreamMerger) Add(from model.Streams) model.Streams { + for _, stream := range from { + lkey := uniqueStream(&stream) + idxStream, streamExists := m.index[lkey] + if !streamExists { + // Stream doesn't exist => create new index + idxStream = indexedStream{ + stream: stream, + entries: map[string]interface{}{}, + index: len(m.index), + } + } + // Merge content (entries) + for _, e := range stream.Entries { + ekey := uniqueEntry(&e) + if _, entryExists := idxStream.entries[ekey]; !entryExists { + // Add entry to the existing stream, and mark it as existing in idxStream.entries + idxStream.entries[ekey] = nil + if streamExists { + idxStream.stream.Entries = append(m.index[lkey].stream.Entries, e) + } + } // Else: entry found => ignore duplicate + } + // Add or overwrite index + m.index[lkey] = idxStream + if !streamExists { + // Stream doesn't exist => append it + m.merged = append(m.merged, idxStream.stream) + } else { + m.merged[idxStream.index] = idxStream.stream + } + } + return m.merged +} + +func (m *StreamMerger) GetStreams() model.Streams { + return m.merged +} diff --git a/pkg/loki/merger_test.go b/pkg/loki/merger_test.go new file mode 100644 index 000000000..6150c27f0 --- /dev/null +++ b/pkg/loki/merger_test.go @@ -0,0 +1,111 @@ +package loki + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/netobserv/network-observability-console-plugin/pkg/model" +) + +func TestMerge(t *testing.T) { + now := time.Now() + merger := NewStreamMerger() + baseline := model.Stream{ + Labels: map[string]string{ + "foo": "bar", + }, + Entries: []model.Entry{{ + Timestamp: now, + Line: "{key: value}", + }}, + } + merger.Add(model.Streams{baseline}) + + // Different label, different line => no dedup + merged := merger.Add(model.Streams{{ + Labels: map[string]string{ + "foo": "bar", + "foo2": "bar2", + }, + Entries: baseline.Entries, + }, { + Labels: baseline.Labels, + Entries: []model.Entry{{ + Timestamp: now, + Line: "{key2: value2}", + }}, + }}) + assert.Len(t, merged, 2) + assert.Len(t, merged[0].Entries, 2) + assert.Len(t, merged[1].Entries, 1) + + // Same labels in different order => no dedup + merged = merger.Add(model.Streams{{ + Labels: map[string]string{ + "foo2": "bar2", + "foo": "bar", + }, + Entries: baseline.Entries, + }, { + Labels: map[string]string{ + "foo2": "bar2", + "foo": "bar", + }, + Entries: baseline.Entries, + }, { + Labels: map[string]string{ + "foo": "bar", + "foo2": "bar2", + }, + Entries: baseline.Entries, + }}) + assert.Len(t, merged, 2) + assert.Len(t, merged[0].Entries, 2) + assert.Len(t, merged[1].Entries, 1) + + // Different timestamp => no dedup + merged = merger.Add(model.Streams{{ + Labels: baseline.Labels, + Entries: []model.Entry{{ + Timestamp: now.Add(time.Hour), + Line: "{key: value}", + }}, + }}) + assert.Len(t, merged, 2) + assert.Len(t, merged[0].Entries, 3) + assert.Len(t, merged[1].Entries, 1) + + // some dedup + merged = merger.Add(model.Streams{{ + // changed line => no dedup + Labels: baseline.Labels, + Entries: []model.Entry{{ + Timestamp: now, + Line: "{key: value3}", + }}, + }, { + // changed line => no dedup + Labels: baseline.Labels, + Entries: []model.Entry{{ + Timestamp: now, + Line: "{}", + }}, + }, { + // same as previously modified timestamp => must be ignored + Labels: baseline.Labels, + Entries: []model.Entry{{ + Timestamp: now.Add(time.Hour), + Line: "{key: value}", + }}, + }, + // baseline itself => must be ignored + baseline, + }) + + // Different timestamp => no dedup + assert.Len(t, merged, 2) + assert.Len(t, merged[0].Entries, 5) + assert.Len(t, merged[1].Entries, 1) +} diff --git a/pkg/loki/query.go b/pkg/loki/query.go deleted file mode 100644 index 3e7fdd764..000000000 --- a/pkg/loki/query.go +++ /dev/null @@ -1,505 +0,0 @@ -// Package loki provides functionalities for interacting with Loki -package loki - -import ( - "errors" - "fmt" - "regexp" - "strconv" - "strings" - "time" - - "github.com/sirupsen/logrus" - - "github.com/netobserv/network-observability-console-plugin/pkg/model/fields" - "github.com/netobserv/network-observability-console-plugin/pkg/utils" -) - -const ( - startTimeKey = "startTime" - endTimeTimeKey = "endTime" - timeRangeKey = "timeRange" - limitKey = "limit" - exportFormatKey = "format" - columnsKey = "columns" - startParam = "start" - endParam = "end" - limitParam = "limit" - matchParam = "match" - flowDirParam = "FlowDirection" - anyMatchValue = "any" - queryRangePath = "/loki/api/v1/query_range?query=" -) - -var qlog = logrus.WithField("component", "loki.query") - -// can contains only alphanumeric / '-' / '_' / '.' / ',' / '"' / '*' / ':' / '/' characteres -var filterRegexpValidation = regexp.MustCompile(`^[\w-_.,\"*:/]*$`) - -// remove quotes and replace * by regex any -var valueReplacer = strings.NewReplacer(`*`, `.*`, `"`, "") - -type LabelJoiner string - -const ( - joinAnd = LabelJoiner("+and+") - joinOr = LabelJoiner("+or+") - joinPipeAnd = LabelJoiner("|") -) - -// Query for a LogQL HTTP petition -// The HTTP body of the query is composed by: -// {streamSelector}|lineFilters|json|labelFilters -type Query struct { - // urlParams for the HTTP call - baseURL string - urlParams [][2]string - labelMap map[string]struct{} - streamSelector []labelFilter - lineFilters []string - labelFilters []labelFilter - currentGroup *string - groupedLabelFilters map[string][]labelFilter - labelJoiner LabelJoiner - // Attributes with a special meaning that need to be processed independently - specialAttrs map[string]string - export *Export -} - -type Export struct { - format string - columns []string -} - -func NewQuery(baseURL string, labels []string, export bool) *Query { - var exp *Export - if export { - exp = &Export{} - } - return &Query{ - baseURL: baseURL, - specialAttrs: map[string]string{}, - labelJoiner: joinPipeAnd, - export: exp, - labelMap: utils.GetMapInterface(labels), - groupedLabelFilters: map[string][]labelFilter{}, - } -} - -func (q *Query) AddParams(params map[string][]string) error { - for key, values := range params { - if len(values) == 0 { - // Silently ignore - continue - } - - // Note: empty string allowed - if err := q.AddParam(key, values[0]); err != nil { - return err - } - } - return nil -} - -func (q *Query) AddParam(key, value string) error { - if !filterRegexpValidation.MatchString(value) { - return fmt.Errorf("unauthorized sign in flows request: %s", value) - } - switch key { - case exportFormatKey: - return q.addParamFormat(value) - case columnsKey: - return q.addParamColumns(value) - case startTimeKey: - q.addURLParam(startParam, value) - case endTimeTimeKey: - q.addURLParam(endParam, value) - case timeRangeKey: - return q.addParamTime(value) - case limitKey: - q.addURLParam(limitParam, value) - // Attributes that have a special meaning and need to be treated apart - case matchParam, flowDirParam: - q.specialAttrs[key] = value - // IP filter labels - case fields.DstAddr, fields.SrcAddr, fields.DstHostIP, fields.SrcHostIP: - q.processIPFilters(key, strings.Split(value, ",")) - case fields.K8SObject, fields.SrcK8SObject, fields.DstK8SObject, fields.K8SOwnerObject, fields.SrcK8SOwnerObject, fields.DstK8SOwnerObject: - return q.processK8SObjectFilter(key, strings.Split(value, ",")) - case fields.AddrPort, fields.SrcAddrPort, fields.DstAddrPort: - q.processAddressPortFilter(key, strings.Split(value, ",")) - default: - return q.addParamDefault(key, value) - } - return nil -} - -func (q *Query) addParamFormat(value string) error { - if q.export != nil { - q.export.format = value - } else { - return fmt.Errorf("export format is not allowed for this endpoint") - } - return nil -} - -func (q *Query) addParamColumns(value string) error { - if q.export != nil { - values := strings.Split(value, ",") - q.export.columns = values - } else { - return fmt.Errorf("export columns are not allowed for this endpoint") - } - return nil -} - -func (q *Query) addParamTime(value string) error { - r, err := strconv.ParseInt(value, 10, 64) - if err != nil { - return err - } - q.addURLParam(startParam, strconv.FormatInt(time.Now().Unix()-r, 10)) - return nil -} - -func (q *Query) addParamDefault(key, value string) error { - // Stream selector labels - if _, ok := q.labelMap[key]; ok { - q.processStreamSelector(key, strings.Split(value, ",")) - } else { - srcKey, dstKey := fields.ToSrcDst(key) - if _, ok := q.labelMap[srcKey]; ok { - if _, ok := q.labelMap[dstKey]; !ok { - qlog.WithField("label", key). - Warningf("can't run common label filter as Src field is defined as a label, but Dst is not. Ignoring it") - } else { - q.processCommonLabelFilter(key, strings.Split(value, ",")) - } - } else { - return q.processLineFilters(key, strings.Split(value, ",")) - } - } - return nil -} - -func (q *Query) URLQuery() (string, error) { - if len(q.streamSelector) == 0 { - return "", errors.New("there is no stream selector. At least one label matcher is needed") - } - - endpoint, mainPart, jsonPart, params := q.urlQueryParts() - if len(jsonPart) > 0 { - return endpoint + mainPart + "|json|" + jsonPart + params, nil - } - return endpoint + mainPart + params, nil -} - -func (q *Query) urlQueryParts() (string, string, string, string) { - endpointSb := strings.Builder{} - querySb := strings.Builder{} - jsonSb := strings.Builder{} - paramSb := strings.Builder{} - - endpointSb.WriteString(strings.TrimRight(q.baseURL, "/")) - endpointSb.WriteString(queryRangePath) - querySb.WriteByte('{') - for i, ss := range q.streamSelector { - if i > 0 { - querySb.WriteByte(',') - } - ss.writeInto(&querySb) - } - querySb.WriteByte('}') - for _, lf := range q.lineFilters { - querySb.WriteString("|~`") - querySb.WriteString(lf) - querySb.WriteByte('`') - } - if len(q.labelFilters) > 0 || len(q.groupedLabelFilters) > 0 { - if q.labelJoiner == "" { - panic("Label Joiner can't be empty") - } - q.WriteLabelFilter(&jsonSb, &q.labelFilters, q.labelJoiner) - i := 0 - for _, glf := range q.groupedLabelFilters { - if i > 0 { - jsonSb.WriteString(string(q.labelJoiner)) - } - //group with parenthesis - jsonSb.WriteByte('(') - //each group member must match - q.WriteLabelFilter(&jsonSb, &glf, joinAnd) - jsonSb.WriteByte(')') - i++ - } - } - if len(q.urlParams) > 0 { - for _, p := range q.urlParams { - paramSb.WriteByte('&') - paramSb.WriteString(p[0]) - paramSb.WriteByte('=') - paramSb.WriteString(p[1]) - } - } - return endpointSb.String(), querySb.String(), jsonSb.String(), paramSb.String() -} - -func (q *Query) WriteLabelFilter(sb *strings.Builder, lfs *[]labelFilter, lj LabelJoiner) { - for i, lf := range *lfs { - if i > 0 { - sb.WriteString(string(lj)) - } - lf.writeInto(sb) - } -} - -// PrepareToSubmit returns a new Query that already handles the special behavior of some attributes -// that mustn't be used as part of a generic query. -func (q *Query) PrepareToSubmit() (*Query, error) { - var out *Query - // If match=any, it converts the query to a query that matches when only of the given - // attributes match - if match := q.specialAttrs[matchParam]; match == anyMatchValue { - out = q.convertToAnyMatch() - } else { - // copy receiver query - cp := *q - out = &cp - } - - // Append app stream selector, which will apply whichever matching criteria (any or all) - out.streamSelector = append(out.streamSelector, - stringLabelFilter("app", labelEqual, "netobserv-flowcollector")) - - // Filter by flow direction independently of the matching criteria (any or all) - if flowDir, ok := out.specialAttrs[flowDirParam]; ok { - out.streamSelector = append(out.streamSelector, - stringLabelFilter(flowDirParam, labelEqual, flowDir)) - } - return out, nil -} - -func (q *Query) addURLParam(key, val string) { - q.urlParams = append(q.urlParams, [2]string{key, val}) -} - -func (q *Query) processStreamSelector(key string, values []string) { - regexStr := strings.Builder{} - for i, value := range values { - if i > 0 { - regexStr.WriteByte('|') - } - //match the begining of string if quoted without a star - //and case insensitive if no quotes - if !strings.HasPrefix(value, `"`) { - regexStr.WriteString("(?i).*") - } else if !strings.HasPrefix(value, `"*`) { - regexStr.WriteString("^") - } - //inject value with regex - regexStr.WriteString(valueReplacer.Replace(value)) - //match the end of string if quoted without a star - if !strings.HasSuffix(value, `"`) { - regexStr.WriteString(".*") - } else if !strings.HasSuffix(value, `*"`) { - regexStr.WriteString("$") - } - } - - if regexStr.Len() > 0 { - if q.currentGroup == nil { - q.streamSelector = append(q.streamSelector, - stringLabelFilter(key, labelMatches, regexStr.String())) - } else { - q.groupedLabelFilters[*q.currentGroup] = append(q.groupedLabelFilters[*q.currentGroup], - stringLabelFilter(key, labelMatches, regexStr.String())) - } - } -} - -// filterIPInLine assumes that we are searching for that IP addresses as part -// of the log line (not in the stream selector labels) -func (q *Query) processIPFilters(key string, values []string) { - for _, value := range values { - if q.currentGroup == nil { - q.labelFilters = append(q.labelFilters, ipLabelFilter(key, value)) - } else { - q.groupedLabelFilters[*q.currentGroup] = append(q.groupedLabelFilters[*q.currentGroup], - ipLabelFilter(key, value)) - } - } -} - -func (q *Query) ExportFormat() string { - if q.export == nil { - return "" - } - return q.export.format -} - -func (q *Query) ExportColumns() []string { - if q.export == nil { - return nil - } - return q.export.columns -} - -func (q *Query) processLineFilters(key string, values []string) error { - regexStr := strings.Builder{} - for i, value := range values { - if i > 0 { - regexStr.WriteByte('|') - } - //match end of KEY + regex VALUE: - //if numeric, KEY":VALUE - //if string KEY":"VALUE" - //ie 'Port' key will match both 'SrcPort":"XXX"' and 'DstPort":"XXX" - //VALUE can be quoted for exact match or contains * to inject regex any - regexStr.WriteString(key) - regexStr.WriteString(`":`) - if isNumeric(key) { - regexStr.WriteString(value) - } else { - regexStr.WriteString(`"`) - // match start any if not quoted - // and case insensitive - if !strings.HasPrefix(value, `"`) { - regexStr.WriteString("(?i).*") - } - //inject value with regex - regexStr.WriteString(valueReplacer.Replace(value)) - // match end any if not quoted - if !strings.HasSuffix(value, `"`) { - regexStr.WriteString(".*") - } - regexStr.WriteString(`"`) - } - } - - if regexStr.Len() > 0 { - if q.currentGroup == nil { - q.lineFilters = append(q.lineFilters, regexStr.String()) - } else { - lf, ok := lineToLabelFilter(regexStr.String()) - if ok { - q.groupedLabelFilters[*q.currentGroup] = append(q.groupedLabelFilters[*q.currentGroup], lf) - } else { - qlog.WithField("lineFilter", lf). - Warningf("line filter can't be parsed as json attribute. Ignoring it") - } - } - } - return nil -} - -func (q *Query) processCommonLabelFilter(key string, values []string) { - srcKey, dstKey := fields.ToSrcDst(key) - for _, value := range values { - regexStr := strings.Builder{} - // match start any if not quoted - if !strings.HasPrefix(value, `"`) { - regexStr.WriteString(".*") - } - //inject value with regex - regexStr.WriteString(valueReplacer.Replace(value)) - // match end any if not quoted - if !strings.HasSuffix(value, `"`) { - regexStr.WriteString(".*") - } - // apply filter on both Src and Dst fields - if q.currentGroup == nil { - q.labelFilters = append(q.labelFilters, regexLabelFilter(srcKey, labelMatches, regexStr.String())) - q.labelFilters = append(q.labelFilters, regexLabelFilter(dstKey, labelMatches, regexStr.String())) - } else { - q.groupedLabelFilters[*q.currentGroup] = append(q.groupedLabelFilters[*q.currentGroup], regexLabelFilter(srcKey, labelMatches, regexStr.String())) - q.groupedLabelFilters[*q.currentGroup] = append(q.groupedLabelFilters[*q.currentGroup], regexLabelFilter(dstKey, labelMatches, regexStr.String())) - } - } -} - -func getPrefix(field string) string { - if strings.HasPrefix(field, fields.Src) { - return fields.Src - } else if strings.HasPrefix(field, fields.Dst) { - return fields.Dst - } - return "" -} - -func (q *Query) processK8SObjectFilter(key string, values []string) error { - prefix := getPrefix((key)) - for _, value := range values { - //expected value is Kind.Namespace.ObjectName - if strings.Contains(value, ".") { - splittedValue := strings.Split(value, ".") - if strings.Contains(key, "Owner") { - q.AddParamSrcDst(prefix, fields.OwnerType, splittedValue[0]) - q.AddParamSrcDst(prefix, fields.Namespace, splittedValue[1]) - q.AddParamSrcDst(prefix, fields.OwnerName, splittedValue[2]) - } else { - q.AddParamSrcDst(prefix, fields.Type, splittedValue[0]) - q.AddParamSrcDst(prefix, fields.Namespace, splittedValue[1]) - q.AddParamSrcDst(prefix, fields.Name, splittedValue[2]) - } - } else { - return fmt.Errorf("invalid kubeObject filter: %s", value) - } - } - return nil -} - -func (q *Query) processAddressPortFilter(key string, values []string) { - prefix := getPrefix((key)) - for _, value := range values { - //can either be ipaddress / port / ipaddress:port - if strings.Contains(value, ":") { - ipAndPort := strings.Split(value, ":") - q.AddParamSrcDst(prefix, fields.Addr, ipAndPort[0]) - q.AddParamSrcDst(prefix, fields.Port, ipAndPort[1]) - } else if strings.Contains(value, ".") { - q.AddParamSrcDst(prefix, fields.Addr, value) - } else if _, err := strconv.Atoi(value); err == nil { - q.AddParamSrcDst(prefix, fields.Port, value) - } - } -} - -func (q *Query) AddParamSrcDst(prefix, key, value string) { - if len(prefix) > 0 { - q.currentGroup = &prefix - err := q.AddParam(prefix+key, value) - if err != nil { - qlog.Error(err) - } - q.currentGroup = nil - } else { - srcPrefix := fields.Src - dstPrefix := fields.Dst - q.currentGroup = &srcPrefix - err := q.AddParam(srcPrefix+key, value) - if err != nil { - qlog.Error(err) - } - q.currentGroup = &dstPrefix - err = q.AddParam(dstPrefix+key, value) - if err != nil { - qlog.Error(err) - } - q.currentGroup = nil - } -} - -func isNumeric(v string) bool { - switch v { - case - fields.Port, - fields.SrcPort, - fields.DstPort, - fields.Packets, - fields.Proto, - fields.Bytes: - return true - default: - return false - } -} diff --git a/pkg/loki/query_test.go b/pkg/loki/query_test.go index aed5ae26a..9678b5006 100644 --- a/pkg/loki/query_test.go +++ b/pkg/loki/query_test.go @@ -1,91 +1,31 @@ package loki import ( + "net/url" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func TestQuery_ToURL_ConvertToAnyMatch(t *testing.T) { - type testCase struct { - title string - in Query - expect string - expectAny string - } - for _, tc := range []testCase{{ - title: "streamSelector only", - in: Query{streamSelector: []labelFilter{ - stringLabelFilter("app", labelEqual, "flows"), - stringLabelFilter("foo", labelMatches, ".*bar.*"), - stringLabelFilter("baz", labelMatches, ".*bae.*"), - }}, - expect: `/loki/api/v1/query_range?query={app="flows",foo=~".*bar.*",baz=~".*bae.*"}`, - expectAny: `/loki/api/v1/query_range?query={dont="fail"}|json|app="flows"+or+foo=~".*bar.*"+or+baz=~".*bae.*"`, - }, { - title: "streamSelector with line filters", - in: Query{ - streamSelector: []labelFilter{ - stringLabelFilter("app", labelEqual, "netobs"), - }, - lineFilters: []string{`"DstPort":1234`, `"Namespace":".*hiya`}, - }, - expect: "/loki/api/v1/query_range?query={app=\"netobs\"}|~`\"DstPort\":1234`|~`\"Namespace\":\".*hiya`", - expectAny: "/loki/api/v1/query_range?query={dont=\"fail\"}|json|app=\"netobs\"+or+DstPort=1234+or+Namespace=~`.*hiya.*`", - }, { - title: "streamSelector with label filters", - in: Query{ - streamSelector: []labelFilter{stringLabelFilter("app", labelEqual, "some-app")}, - labelJoiner: joinOr, - labelFilters: []labelFilter{ - stringLabelFilter("foo", labelMatches, "bar"), - intLabelFilter("port", 1234), - ipLabelFilter("SrcAddr", "123.0.0.0/16"), - }, - }, - expect: `/loki/api/v1/query_range?query={app="some-app"}|json|foo=~"bar"+or+port=1234+or+SrcAddr=ip("123.0.0.0/16")`, - expectAny: `/loki/api/v1/query_range?query={dont="fail"}|json|app="some-app"+or+foo=~"bar"+or+port=1234+or+SrcAddr=ip("123.0.0.0/16")`, - }, { - title: "streamSelector + line filters + label filters", - in: Query{ - streamSelector: []labelFilter{stringLabelFilter("app", labelEqual, "the-app")}, - labelJoiner: joinOr, - labelFilters: []labelFilter{stringLabelFilter("foo", labelMatches, "bar")}, - lineFilters: []string{`"DstPod":".*podaco"`}, - }, - expect: "/loki/api/v1/query_range?query={app=\"the-app\"}|~`\"DstPod\":\".*podaco\"`|json|foo=~\"bar\"", - expectAny: "/loki/api/v1/query_range?query={dont=\"fail\"}|json|app=\"the-app\"+or+foo=~\"bar\"+or+DstPod=~`.*podaco.*`", - }} { - t.Run(tc.title, func(t *testing.T) { - urlQuery, err := tc.in.URLQuery() - require.NoError(t, err) - assert.Equal(t, tc.expect, urlQuery) - - anyMatchQuery := tc.in.convertToAnyMatch() - // we need to have at least a stream selector, so we add a label for testing purposes - // (in production, we add the app label) - anyMatchQuery.streamSelector = append(anyMatchQuery.streamSelector, - stringLabelFilter("dont", labelEqual, "fail")) - anyMatchURL, err := anyMatchQuery.URLQuery() - require.NoError(t, err) - assert.Equal(t, tc.expectAny, anyMatchURL) - }) - } -} - -func TestQuery_AddURLParam(t *testing.T) { - query := Query{ - streamSelector: []labelFilter{stringLabelFilter("app", labelEqual, "the-app")}, - } - query.addURLParam("foo", "bar") - query.addURLParam("flis", "flas") - urlQuery, err := query.URLQuery() +func TestFlowQuery_AddLabelFilters(t *testing.T) { + lokiURL, err := url.Parse("/") require.NoError(t, err) - assert.Equal(t, `/loki/api/v1/query_range?query={app="the-app"}&foo=bar&flis=flas`, urlQuery) + cfg := NewConfig(lokiURL, time.Second, "", []string{"foo", "flis"}) + query := NewFlowQueryBuilderWithDefaults(&cfg) + err = query.AddFilter("foo", `"bar"`) + require.NoError(t, err) + err = query.AddFilter("flis", `"flas"`) + require.NoError(t, err) + urlQuery := query.Build() + assert.Equal(t, `/loki/api/v1/query_range?query={app="netobserv-flowcollector",foo="bar",flis="flas"}`, urlQuery) } func TestQuery_BackQuote_Error(t *testing.T) { - query := NewQuery("/", []string{"lab1", "lab2"}, false) - assert.Error(t, query.AddParam("key", "backquoted`val")) + lokiURL, err := url.Parse("/") + require.NoError(t, err) + cfg := NewConfig(lokiURL, time.Second, "", []string{"lab1", "lab2"}) + query := NewFlowQueryBuilderWithDefaults(&cfg) + assert.Error(t, query.AddFilter("key", "backquoted`val")) } diff --git a/pkg/loki/topology_query.go b/pkg/loki/topology_query.go index c20692f78..f79461a67 100644 --- a/pkg/loki/topology_query.go +++ b/pkg/loki/topology_query.go @@ -3,7 +3,8 @@ package loki import ( "fmt" "strconv" - "time" + + "github.com/netobserv/network-observability-console-plugin/pkg/model/fields" ) const ( @@ -14,130 +15,84 @@ const ( type Topology struct { limit string - startTime string - endTime string timeRange string function string dataField string } type TopologyQueryBuilder struct { - *Query + *FlowQueryBuilder topology *Topology } -func NewTopologyQuery(baseURL string, labels []string) *TopologyQueryBuilder { - return &TopologyQueryBuilder{ - Query: NewQuery(baseURL, labels, false), - topology: &Topology{}, - } -} - -func (q *TopologyQueryBuilder) AddParams(params map[string][]string) error { - for key, values := range params { - if len(values) == 0 { - // Silently ignore - continue - } - - // Note: empty string allowed - if err := q.AddParam(key, values[0]); err != nil { - return err - } - } - return nil -} - -func (q *TopologyQueryBuilder) AddParam(key, value string) error { - if !filterRegexpValidation.MatchString(value) { - return fmt.Errorf("unauthorized sign in flows request: %s", value) - } - switch key { - case startTimeKey: - q.topology.startTime = value - case endTimeTimeKey: - q.topology.endTime = value - case timeRangeKey: - start, err := timeRangeToStart(value) - if err != nil { - return err - } - q.topology.startTime = start - return q.addParamTime(value) - case limitKey: - q.topology.limit = value - default: - return q.Query.AddParam(key, value) - } - return nil -} - -func timeRangeToStart(value string) (string, error) { - r, err := strconv.ParseInt(value, 10, 64) - if err != nil { - return "", err - } - return strconv.FormatInt(time.Now().Unix()-r, 10), nil -} - -// PrepareToSubmit returns a new TopologyQueryBuilder that already handles the special behavior of some attributes -// that mustn't be used as part of a generic query. -func (q *TopologyQueryBuilder) PrepareToSubmit() (*TopologyQueryBuilder, error) { - newQuery, err := q.Query.PrepareToSubmit() - if err != nil { - return nil, err - } - return &TopologyQueryBuilder{ - Query: newQuery, - topology: q.topology, - }, nil -} - -func (q *TopologyQueryBuilder) URLQuery() (string, error) { - endpoint, mainPart, jsonPart, params := q.urlQueryParts() - err := q.setTopologyParams() - if err != nil { - return "", err - } - - if len(jsonPart) > 0 { - jsonPart = "|" + jsonPart +func NewTopologyQuery(cfg *Config, start, end, limit, reporter string) (*TopologyQueryBuilder, error) { + l := limit + if len(l) == 0 { + l = topologyDefaultLimit } - return endpoint + fmt.Sprintf(`topk(%s,sum by(%s) (%s(%s|json%s|unwrap %s|__error__=""[%ss])))%s&step=%ss`, - q.topology.limit, topologyMetrics, q.topology.function, mainPart, jsonPart, q.topology.dataField, q.topology.timeRange, params, q.topology.timeRange), nil -} -func (q *TopologyQueryBuilder) setTopologyParams() error { - if len(q.topology.timeRange) == 0 { + timeRange := topologyDefaultRange + if len(start) > 0 && len(end) > 0 { var startTime, endTime int64 var err error - for _, p := range q.urlParams { - switch p[0] { - case startParam: - startTime, err = strconv.ParseInt(p[1], 10, 64) - case endParam: - endTime, err = strconv.ParseInt(p[1], 10, 64) - } - if err != nil { - return fmt.Errorf("%s param should be int64", p[0]) - } + startTime, err = strconv.ParseInt(start, 10, 64) + if err != nil { + return nil, fmt.Errorf("can't parse start param: %s", start) + } + endTime, err = strconv.ParseInt(end, 10, 64) + if err != nil { + return nil, fmt.Errorf("can't parse end param: %s", end) } rng := endTime - startTime if rng > 0 { - q.topology.timeRange = strconv.FormatInt(rng, 10) - } else { - q.topology.timeRange = topologyDefaultRange + timeRange = strconv.FormatInt(rng, 10) } } - if len(q.topology.limit) == 0 { - q.topology.limit = topologyDefaultLimit - } - - //TODO: allow rate / sum_over_time / avg_over_time / max_over_time / min_over_time - q.topology.function = "sum_over_time" - //TODO: allow other values than bytes like Packets - q.topology.dataField = "Bytes" + return &TopologyQueryBuilder{ + FlowQueryBuilder: NewFlowQueryBuilder(cfg, start, end, limit, reporter), + topology: &Topology{ + timeRange: timeRange, + limit: l, + function: "sum_over_time", + dataField: fields.Bytes, + }, + }, nil +} - return nil +func (q *TopologyQueryBuilder) Build() string { + // Build something like: + // /?query= + // topk( + // , + // sum by() ( + // sum_over_time( + // {