diff --git a/cmd/plugin-backend.go b/cmd/plugin-backend.go index b3466cd5f..069a231f0 100644 --- a/cmd/plugin-backend.go +++ b/cmd/plugin-backend.go @@ -10,7 +10,7 @@ import ( "github.com/sirupsen/logrus" - "github.com/netobserv/network-observability-console-plugin/pkg/handler" + "github.com/netobserv/network-observability-console-plugin/pkg/loki" "github.com/netobserv/network-observability-console-plugin/pkg/server" ) @@ -71,11 +71,6 @@ func main() { CORSAllowMethods: *corsMethods, CORSAllowHeaders: *corsHeaders, CORSMaxAge: *corsMaxAge, - Loki: handler.LokiConfig{ - URL: lURL, - Timeout: *lokiTimeout, - TenantID: *lokiTenantID, - Labels: strings.Split(lLabels, ","), - }, + Loki: loki.NewConfig(lURL, *lokiTimeout, *lokiTenantID, strings.Split(lLabels, ",")), }) } diff --git a/pkg/handler/flows.go b/pkg/handler/flows.go new file mode 100644 index 000000000..7c5185ee0 --- /dev/null +++ b/pkg/handler/flows.go @@ -0,0 +1,346 @@ +package handler + +import ( + "encoding/json" + "errors" + "fmt" + "net/http" + "net/url" + "strconv" + "strings" + "sync" + "time" + + "github.com/netobserv/network-observability-console-plugin/pkg/httpclient" + "github.com/netobserv/network-observability-console-plugin/pkg/loki" + "github.com/netobserv/network-observability-console-plugin/pkg/model" + "github.com/netobserv/network-observability-console-plugin/pkg/model/fields" + "github.com/netobserv/network-observability-console-plugin/pkg/utils" +) + +const ( + startTimeKey = "startTime" + endTimeKey = "endTime" + timeRangeKey = "timeRange" + limitKey = "limit" + matchKey = "match" + reporterKey = "reporter" + filtersKey = "filters" + anyMatchValue = "any" + exportFormatKey = "format" + exportcolumnsKey = "columns" +) + +type errorWithCode struct { + err error + code int +} + +// Example of raw filters: +// &filters=foo=a,b;bar=c +func parseFilters(raw string) map[string]string { + parsed := make(map[string]string) + list := strings.Split(raw, ";") + for _, filter := range list { + pair := strings.Split(filter, "=") + if len(pair) == 2 { + parsed[pair[0]] = pair[1] + } + } + return parsed +} + +// groupFilters creates groups of filters for fetching strategy, that depends on the match all/any param +// Filters in the same group are fetched in a single query +// Each group is fetched via its own query +func groupFilters(cfg *loki.Config, filters map[string]string, matchAny bool) ([]map[string]string, error) { + var groups []map[string]string + if matchAny { + // Every filter is a group and will be fetched independently + for k, v := range filters { + // Check for grouped K8S Resource fields (kind.namespace.name) + if fields.IsK8SResourcePath(k) { + // Although we are in "match any", kind/namespace/name filters have to be in a single group + // or two different groups if there's Src+Dst + pathGroup1, pathGroup2, err := expandK8SResourcePath(k, v) + if err != nil { + return nil, err + } + groups = append(groups, pathGroup1) + if pathGroup2 != nil { + groups = append(groups, pathGroup2) + } + continue + } + + // Check if this is a common filter Src+Dst that must be expanded in two filters + srcKey, dstKey := fields.ToSrcDst(k) + if cfg.IsLabel(srcKey) || cfg.IsLabel(dstKey) { + // Add them as separate filters (note: line filters support standing as a single/common filter) + groups = append(groups, map[string]string{srcKey: v}) + groups = append(groups, map[string]string{dstKey: v}) + } else { + groups = append(groups, map[string]string{k: v}) + } + } + } else { + // Match all => most filters are fetched in a single query, except when there's common Src+Dst filters + group1 := make(map[string]string) + group2 := make(map[string]string) + needSrcDstSplit := false + for k, v := range filters { + // Check for grouped K8S Resource fields (kind.namespace.name) + if fields.IsK8SResourcePath(k) { + pathGroup1, pathGroup2, err := expandK8SResourcePath(k, v) + if err != nil { + return nil, err + } + if pathGroup2 == nil { + // Merge pathGroup1 into both group1 (src) and group2 (dst) + utils.MergeMaps(group1, pathGroup1) + utils.MergeMaps(group2, pathGroup1) + } else { + // Merge first into group1 (src), second into group2 (dst) + utils.MergeMaps(group1, pathGroup1) + utils.MergeMaps(group2, pathGroup2) + needSrcDstSplit = true + } + continue + } + // Check if this is a common filter Src+Dst that must be expanded in two filters + srcKey, dstKey := fields.ToSrcDst(k) + if cfg.IsLabel(srcKey) || cfg.IsLabel(dstKey) { + // Add them as separate filters (note: line filters support standing as a single/common filter) + group1[srcKey] = v + group2[dstKey] = v + needSrcDstSplit = true + } else { + group1[k] = v + group2[k] = v + } + } + if needSrcDstSplit { + groups = []map[string]string{group1, group2} + } else { + // Simplest case, no split => just return the src filters (it's actually identical to dst filters) + groups = []map[string]string{group1} + } + } + return groups, nil +} + +// Expand K8SResourcePath "Kind.Namespace.ObjectName" into three filters, +// either in a single group or in two groups Src+Dst +func expandK8SResourcePath(key, value string) (map[string]string, map[string]string, error) { + prefix := fields.GetPrefix(key) + // Expected value is Kind.Namespace.ObjectName + parts := strings.Split(value, ".") + if len(parts) != 3 { + return nil, nil, fmt.Errorf("invalid resource path: %s=%s", key, value) + } + kind := parts[0] + ns := parts[1] + name := parts[2] + if prefix == "" { + groupSrc := createResourcePathFilter(key, fields.Src, kind, ns, name) + groupDst := createResourcePathFilter(key, fields.Dst, kind, ns, name) + return groupSrc, groupDst, nil + } + return createResourcePathFilter(key, prefix, kind, ns, name), nil, nil +} + +func createResourcePathFilter(key, prefix, kind, ns, name string) map[string]string { + if strings.Contains(key, "Owner") { + return map[string]string{ + prefix + fields.OwnerType: exact(kind), + prefix + fields.Namespace: exact(ns), + prefix + fields.OwnerName: exact(name), + } + } + return map[string]string{ + prefix + fields.Type: exact(kind), + prefix + fields.Namespace: exact(ns), + prefix + fields.Name: exact(name), + } +} + +func getStartTime(params url.Values) (string, error) { + start := params.Get(startTimeKey) + if len(start) == 0 { + tr := params.Get(timeRangeKey) + if len(tr) > 0 { + r, err := strconv.ParseInt(tr, 10, 64) + if err != nil { + return "", errors.New("Could not parse time range: " + err.Error()) + } + start = strconv.FormatInt(time.Now().Unix()-r, 10) + } + } + return start, nil +} + +func GetFlows2(cfg loki.Config) func(w http.ResponseWriter, r *http.Request) { + lokiClient := newLokiClient(&cfg) + + return func(w http.ResponseWriter, r *http.Request) { + params := r.URL.Query() + + flows, code, err := getFlows(cfg, lokiClient, params) + if err != nil { + writeError(w, code, err.Error()) + return + } + + writeRawJSON(w, http.StatusOK, flows) + } +} + +func ExportFlows(cfg loki.Config) func(w http.ResponseWriter, r *http.Request) { + lokiClient := newLokiClient(&cfg) + + return func(w http.ResponseWriter, r *http.Request) { + params := r.URL.Query() + + flows, code, err := getFlows(cfg, lokiClient, params) + if err != nil { + writeError(w, code, err.Error()) + return + } + + exportFormat := params.Get(exportFormatKey) + exportColumns := strings.Split(params.Get(exportcolumnsKey), ",") + + switch exportFormat { + case exportCSVFormat: + writeCSV(w, http.StatusOK, flows, exportColumns) + default: + writeError(w, http.StatusBadRequest, fmt.Sprintf("export format %q is not valid", exportFormat)) + } + } +} + +func getFlows(cfg loki.Config, client httpclient.HTTPClient, params url.Values) ([]byte, int, error) { + hlog.Debugf("getFlows query params: %s", params) + + start, err := getStartTime(params) + if err != nil { + return nil, http.StatusBadRequest, err + } + end := params.Get(endTimeKey) + limit := params.Get(limitKey) + matchAny := params.Get(matchKey) == anyMatchValue + reporter := params.Get(reporterKey) + rawFilters := params.Get(filtersKey) + filters := parseFilters(rawFilters) + grouped, err := groupFilters(&cfg, filters, matchAny) + if err != nil { + return nil, http.StatusBadRequest, err + } + + var rawJSON []byte + if len(grouped) > 1 { + // match any, and multiple filters => run in parallel then aggregate + res, code, err := fetchParallel(&cfg, client, grouped, start, end, limit, reporter) + if err != nil { + return nil, code, errors.New("Error while fetching flows from Loki: " + err.Error()) + } + rawJSON = res + } else { + // else, run all at once + qb := loki.NewFlowQueryBuilder(&cfg, start, end, limit, reporter) + if len(grouped) > 0 { + err := qb.Filters(grouped[0]) + if err != nil { + return nil, http.StatusBadRequest, err + } + } + query := qb.Build() + resp, code, err := fetchSingle(query, client) + if err != nil { + return nil, code, errors.New("Error while fetching flows from Loki: " + err.Error()) + } + rawJSON = resp + } + + hlog.Tracef("GetFlows raw response: %v", rawJSON) + return rawJSON, http.StatusOK, nil +} + +func fetchParallel(cfg *loki.Config, lokiClient httpclient.HTTPClient, groupedFilters []map[string]string, start, end, limit, reporter string) ([]byte, int, error) { + // Run queries in parallel, then aggregate them + resChan := make(chan model.QueryResponse, len(groupedFilters)) + errChan := make(chan errorWithCode, len(groupedFilters)) + var wg sync.WaitGroup + wg.Add(len(groupedFilters)) + + for _, group := range groupedFilters { + go func(filters map[string]string) { + defer wg.Done() + qb := loki.NewFlowQueryBuilder(cfg, start, end, limit, reporter) + err := qb.Filters(filters) + if err != nil { + errChan <- errorWithCode{err: err, code: http.StatusBadRequest} + return + } + query := qb.Build() + resp, code, err := fetchSingle(query, lokiClient) + if err != nil { + errChan <- errorWithCode{err: err, code: code} + } else { + var qr model.QueryResponse + err := json.Unmarshal(resp, &qr) + if err != nil { + errChan <- errorWithCode{err: err, code: http.StatusInternalServerError} + } else { + resChan <- qr + } + } + }(group) + } + + wg.Wait() + close(resChan) + close(errChan) + + for errWithCode := range errChan { + return nil, errWithCode.code, errWithCode.err + } + + // Aggregate results + var aggregated model.QueryResponse + var aggStreams model.Streams + for r := range resChan { + if streams, ok := r.Data.Result.(model.Streams); ok { + if len(aggStreams) == 0 { + aggStreams = streams + aggregated = r + } else { + aggStreams = append(aggStreams, streams...) + aggregated.Data.Result = aggStreams + } + } else { + return nil, http.StatusInternalServerError, fmt.Errorf("loki returned an unexpected type: %T", r.Data.Result) + } + } + + // Encode back to json + encoded, err := json.Marshal(aggregated) + if err != nil { + return nil, http.StatusInternalServerError, err + } + + return encoded, http.StatusOK, nil +} + +func fetchSingle(query string, lokiClient httpclient.HTTPClient) ([]byte, int, error) { + hlog.Debugf("GetFlows query: %s", query) + resp, code, err := lokiClient.Get(query) + if err != nil { + return nil, http.StatusServiceUnavailable, err + } + if code != http.StatusOK { + msg := getLokiError(resp, code) + return nil, http.StatusBadRequest, errors.New("Loki backend responded: " + msg) + } + return resp, http.StatusOK, nil +} diff --git a/pkg/handler/flows_test.go b/pkg/handler/flows_test.go new file mode 100644 index 000000000..b870cc70c --- /dev/null +++ b/pkg/handler/flows_test.go @@ -0,0 +1,101 @@ +package handler + +import ( + "net/url" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/netobserv/network-observability-console-plugin/pkg/loki" +) + +func getConfig() loki.Config { + lokiURL, _ := url.Parse("/") + return loki.NewConfig( + lokiURL, + time.Second, + "", + []string{"SrcK8S_Namespace", "SrcK8S_OwnerName", "DstK8S_Namespace", "DstK8S_OwnerName", "FlowDirection"}, + ) +} + +func TestGroupK8SFilters(t *testing.T) { + config := getConfig() + + // Test match all + groups, err := groupFilters(&config, map[string]string{ + "SrcK8S_Object": "Pod.default.test", + "Port": "8080", + }, false) + require.NoError(t, err) + + assert.Len(t, groups, 1) + assert.Equal(t, map[string]string{ + "SrcK8S_Name": `"test"`, + "SrcK8S_Namespace": `"default"`, + "SrcK8S_Type": `"Pod"`, + "Port": "8080", + }, groups[0]) + + // Test match any + groups, err = groupFilters(&config, map[string]string{ + "SrcK8S_Object": "Pod.default.test", + "Port": "8080", + }, true) + require.NoError(t, err) + + assert.Len(t, groups, 2) + assert.Equal(t, map[string]string{ + "SrcK8S_Name": `"test"`, + "SrcK8S_Namespace": `"default"`, + "SrcK8S_Type": `"Pod"`, + }, groups[0]) + assert.Equal(t, map[string]string{ + "Port": "8080", + }, groups[1]) + + // Test Src+Dst match all + groups, err = groupFilters(&config, map[string]string{ + "K8S_Object": "Pod.default.test", + "Port": "8080", + }, false) + require.NoError(t, err) + + assert.Len(t, groups, 2) + assert.Equal(t, map[string]string{ + "SrcK8S_Name": `"test"`, + "SrcK8S_Namespace": `"default"`, + "SrcK8S_Type": `"Pod"`, + "Port": "8080", + }, groups[0]) + assert.Equal(t, map[string]string{ + "DstK8S_Name": `"test"`, + "DstK8S_Namespace": `"default"`, + "DstK8S_Type": `"Pod"`, + "Port": "8080", + }, groups[1]) + + // Test Src+Dst match any + groups, err = groupFilters(&config, map[string]string{ + "K8S_Object": "Pod.default.test", + "Port": "8080", + }, true) + require.NoError(t, err) + + assert.Len(t, groups, 3) + assert.Equal(t, map[string]string{ + "SrcK8S_Name": `"test"`, + "SrcK8S_Namespace": `"default"`, + "SrcK8S_Type": `"Pod"`, + }, groups[0]) + assert.Equal(t, map[string]string{ + "DstK8S_Name": `"test"`, + "DstK8S_Namespace": `"default"`, + "DstK8S_Type": `"Pod"`, + }, groups[1]) + assert.Equal(t, map[string]string{ + "Port": "8080", + }, groups[2]) +} diff --git a/pkg/handler/loki.go b/pkg/handler/loki.go index 588b4c13a..7f9df4b8b 100644 --- a/pkg/handler/loki.go +++ b/pkg/handler/loki.go @@ -5,9 +5,7 @@ import ( "errors" "fmt" "net/http" - "net/url" "strings" - "time" "github.com/sirupsen/logrus" @@ -22,14 +20,7 @@ const ( lokiOrgIDHeader = "X-Scope-OrgID" ) -type LokiConfig struct { - URL *url.URL - Timeout time.Duration - TenantID string - Labels []string -} - -func newLokiClient(cfg *LokiConfig) httpclient.HTTPClient { +func newLokiClient(cfg *loki.Config) httpclient.HTTPClient { var headers map[string][]string if cfg.TenantID != "" { headers = map[string][]string{ @@ -40,7 +31,7 @@ func newLokiClient(cfg *LokiConfig) httpclient.HTTPClient { return httpclient.NewHTTPClient(cfg.Timeout, headers) } -func GetFlows(cfg LokiConfig, allowExport bool) func(w http.ResponseWriter, r *http.Request) { +func GetFlows(cfg loki.Config, allowExport bool) func(w http.ResponseWriter, r *http.Request) { lokiClient := newLokiClient(&cfg) // TODO: improve search mecanism: @@ -52,7 +43,7 @@ func GetFlows(cfg LokiConfig, allowExport bool) func(w http.ResponseWriter, r *h hlog.Debugf("GetFlows query params: %s", params) //allow export only on specific endpoints - queryBuilder := loki.NewQuery(cfg.URL.String(), cfg.Labels, allowExport) + queryBuilder := loki.NewQuery(&cfg, allowExport) if err := queryBuilder.AddParams(params); err != nil { writeError(w, http.StatusBadRequest, err.Error()) return diff --git a/pkg/handler/resources.go b/pkg/handler/resources.go index f23f63dee..cdc5b702e 100644 --- a/pkg/handler/resources.go +++ b/pkg/handler/resources.go @@ -16,7 +16,7 @@ import ( "github.com/netobserv/network-observability-console-plugin/pkg/utils" ) -func GetNamespaces(cfg LokiConfig) func(w http.ResponseWriter, r *http.Request) { +func GetNamespaces(cfg loki.Config) func(w http.ResponseWriter, r *http.Request) { lokiClient := newLokiClient(&cfg) return func(w http.ResponseWriter, r *http.Request) { // Fetch and merge values for SrcK8S_Namespace and DstK8S_Namespace @@ -37,7 +37,7 @@ func GetNamespaces(cfg LokiConfig) func(w http.ResponseWriter, r *http.Request) } } -func getLabelValues(cfg *LokiConfig, lokiClient httpclient.HTTPClient, label string) ([]string, int, error) { +func getLabelValues(cfg *loki.Config, lokiClient httpclient.HTTPClient, label string) ([]string, int, error) { baseURL := strings.TrimRight(cfg.URL.String(), "/") url := fmt.Sprintf("%s/loki/api/v1/label/%s/values", baseURL, label) hlog.Debugf("getLabelValues URL: %s", url) @@ -59,7 +59,7 @@ func getLabelValues(cfg *LokiConfig, lokiClient httpclient.HTTPClient, label str return lvr.Data, http.StatusOK, nil } -func GetNames(cfg LokiConfig) func(w http.ResponseWriter, r *http.Request) { +func GetNames(cfg loki.Config) func(w http.ResponseWriter, r *http.Request) { lokiClient := newLokiClient(&cfg) return func(w http.ResponseWriter, r *http.Request) { params := mux.Vars(r) @@ -82,7 +82,7 @@ func GetNames(cfg LokiConfig) func(w http.ResponseWriter, r *http.Request) { } } -func getNamesForPrefix(cfg LokiConfig, lokiClient httpclient.HTTPClient, prefix, kind, namespace string) ([]string, int, error) { +func getNamesForPrefix(cfg loki.Config, lokiClient httpclient.HTTPClient, prefix, kind, namespace string) ([]string, int, error) { lokiParams := map[string][]string{ prefix + fields.Namespace: {exact(namespace)}, } @@ -95,7 +95,7 @@ func getNamesForPrefix(cfg LokiConfig, lokiClient httpclient.HTTPClient, prefix, fieldToExtract = prefix + fields.Name } - queryBuilder := loki.NewQuery(cfg.URL.String(), cfg.Labels, false) + queryBuilder := loki.NewQuery(&cfg, false) if err := queryBuilder.AddParams(lokiParams); err != nil { return nil, http.StatusBadRequest, err } diff --git a/pkg/handler/topology.go b/pkg/handler/topology.go index 108eff261..14e0b7b90 100644 --- a/pkg/handler/topology.go +++ b/pkg/handler/topology.go @@ -6,14 +6,14 @@ import ( "github.com/netobserv/network-observability-console-plugin/pkg/loki" ) -func GetTopology(cfg LokiConfig) func(w http.ResponseWriter, r *http.Request) { +func GetTopology(cfg loki.Config) func(w http.ResponseWriter, r *http.Request) { lokiClient := newLokiClient(&cfg) return func(w http.ResponseWriter, r *http.Request) { params := r.URL.Query() hlog.Debugf("GetTopology query params: %s", params) - queryBuilder := loki.NewTopologyQuery(cfg.URL.String(), cfg.Labels) + queryBuilder := loki.NewTopologyQuery(&cfg) if err := queryBuilder.AddParams(params); err != nil { writeError(w, http.StatusBadRequest, err.Error()) return diff --git a/pkg/loki/config.go b/pkg/loki/config.go new file mode 100644 index 000000000..0d9798f98 --- /dev/null +++ b/pkg/loki/config.go @@ -0,0 +1,29 @@ +package loki + +import ( + "net/url" + "time" + + "github.com/netobserv/network-observability-console-plugin/pkg/utils" +) + +type Config struct { + URL *url.URL + Timeout time.Duration + TenantID string + Labels map[string]struct{} +} + +func NewConfig(url *url.URL, timeout time.Duration, tenantID string, labels []string) Config { + return Config{ + URL: url, + Timeout: timeout, + TenantID: tenantID, + Labels: utils.GetMapInterface(labels), + } +} + +func (c *Config) IsLabel(key string) bool { + _, isLabel := c.Labels[key] + return isLabel +} diff --git a/pkg/loki/convert.go b/pkg/loki/convert.go index 939b19e50..1bf9bf408 100644 --- a/pkg/loki/convert.go +++ b/pkg/loki/convert.go @@ -24,7 +24,7 @@ func (q *Query) convertToAnyMatch() *Query { } out := Query{ - baseURL: q.baseURL, + config: q.config, urlParams: q.urlParams, labelJoiner: joinOr, specialAttrs: q.specialAttrs, diff --git a/pkg/loki/filter.go b/pkg/loki/filter.go index a74407828..f8a524016 100644 --- a/pkg/loki/filter.go +++ b/pkg/loki/filter.go @@ -31,16 +31,16 @@ type labelFilter struct { valueType valueType } -func stringLabelFilter(labelKey string, matcher labelMatcher, value string) labelFilter { +func stringLabelFilter(labelKey string, value string) labelFilter { return labelFilter{ key: labelKey, - matcher: matcher, + matcher: labelEqual, value: value, valueType: typeString, } } -func regexLabelFilter(labelKey string, matcher labelMatcher, value string) labelFilter { +func regexLabelFilter(labelKey string, value string) labelFilter { return labelFilter{ key: labelKey, matcher: labelMatches, diff --git a/pkg/loki/query.go b/pkg/loki/query.go index 3e7fdd764..0675c189a 100644 --- a/pkg/loki/query.go +++ b/pkg/loki/query.go @@ -12,7 +12,6 @@ import ( "github.com/sirupsen/logrus" "github.com/netobserv/network-observability-console-plugin/pkg/model/fields" - "github.com/netobserv/network-observability-console-plugin/pkg/utils" ) const ( @@ -52,9 +51,8 @@ const ( // {streamSelector}|lineFilters|json|labelFilters type Query struct { // urlParams for the HTTP call - baseURL string + config *Config urlParams [][2]string - labelMap map[string]struct{} streamSelector []labelFilter lineFilters []string labelFilters []labelFilter @@ -71,17 +69,16 @@ type Export struct { columns []string } -func NewQuery(baseURL string, labels []string, export bool) *Query { +func NewQuery(config *Config, export bool) *Query { var exp *Export if export { exp = &Export{} } return &Query{ - baseURL: baseURL, + config: config, specialAttrs: map[string]string{}, labelJoiner: joinPipeAnd, export: exp, - labelMap: utils.GetMapInterface(labels), groupedLabelFilters: map[string][]labelFilter{}, } } @@ -164,12 +161,12 @@ func (q *Query) addParamTime(value string) error { func (q *Query) addParamDefault(key, value string) error { // Stream selector labels - if _, ok := q.labelMap[key]; ok { + if q.config.IsLabel(key) { q.processStreamSelector(key, strings.Split(value, ",")) } else { srcKey, dstKey := fields.ToSrcDst(key) - if _, ok := q.labelMap[srcKey]; ok { - if _, ok := q.labelMap[dstKey]; !ok { + if q.config.IsLabel(srcKey) { + if !q.config.IsLabel(dstKey) { qlog.WithField("label", key). Warningf("can't run common label filter as Src field is defined as a label, but Dst is not. Ignoring it") } else { @@ -200,7 +197,7 @@ func (q *Query) urlQueryParts() (string, string, string, string) { jsonSb := strings.Builder{} paramSb := strings.Builder{} - endpointSb.WriteString(strings.TrimRight(q.baseURL, "/")) + endpointSb.WriteString(strings.TrimRight(q.config.URL.String(), "/")) endpointSb.WriteString(queryRangePath) querySb.WriteByte('{') for i, ss := range q.streamSelector { @@ -269,12 +266,12 @@ func (q *Query) PrepareToSubmit() (*Query, error) { // Append app stream selector, which will apply whichever matching criteria (any or all) out.streamSelector = append(out.streamSelector, - stringLabelFilter("app", labelEqual, "netobserv-flowcollector")) + stringLabelFilter("app", "netobserv-flowcollector")) // Filter by flow direction independently of the matching criteria (any or all) if flowDir, ok := out.specialAttrs[flowDirParam]; ok { out.streamSelector = append(out.streamSelector, - stringLabelFilter(flowDirParam, labelEqual, flowDir)) + stringLabelFilter(flowDirParam, flowDir)) } return out, nil } @@ -309,10 +306,10 @@ func (q *Query) processStreamSelector(key string, values []string) { if regexStr.Len() > 0 { if q.currentGroup == nil { q.streamSelector = append(q.streamSelector, - stringLabelFilter(key, labelMatches, regexStr.String())) + regexLabelFilter(key, regexStr.String())) } else { q.groupedLabelFilters[*q.currentGroup] = append(q.groupedLabelFilters[*q.currentGroup], - stringLabelFilter(key, labelMatches, regexStr.String())) + regexLabelFilter(key, regexStr.String())) } } } @@ -357,7 +354,7 @@ func (q *Query) processLineFilters(key string, values []string) error { //VALUE can be quoted for exact match or contains * to inject regex any regexStr.WriteString(key) regexStr.WriteString(`":`) - if isNumeric(key) { + if fields.IsNumeric(key) { regexStr.WriteString(value) } else { regexStr.WriteString(`"`) @@ -408,11 +405,11 @@ func (q *Query) processCommonLabelFilter(key string, values []string) { } // apply filter on both Src and Dst fields if q.currentGroup == nil { - q.labelFilters = append(q.labelFilters, regexLabelFilter(srcKey, labelMatches, regexStr.String())) - q.labelFilters = append(q.labelFilters, regexLabelFilter(dstKey, labelMatches, regexStr.String())) + q.labelFilters = append(q.labelFilters, regexLabelFilter(srcKey, regexStr.String())) + q.labelFilters = append(q.labelFilters, regexLabelFilter(dstKey, regexStr.String())) } else { - q.groupedLabelFilters[*q.currentGroup] = append(q.groupedLabelFilters[*q.currentGroup], regexLabelFilter(srcKey, labelMatches, regexStr.String())) - q.groupedLabelFilters[*q.currentGroup] = append(q.groupedLabelFilters[*q.currentGroup], regexLabelFilter(dstKey, labelMatches, regexStr.String())) + q.groupedLabelFilters[*q.currentGroup] = append(q.groupedLabelFilters[*q.currentGroup], regexLabelFilter(srcKey, regexStr.String())) + q.groupedLabelFilters[*q.currentGroup] = append(q.groupedLabelFilters[*q.currentGroup], regexLabelFilter(dstKey, regexStr.String())) } } } @@ -488,18 +485,3 @@ func (q *Query) AddParamSrcDst(prefix, key, value string) { q.currentGroup = nil } } - -func isNumeric(v string) bool { - switch v { - case - fields.Port, - fields.SrcPort, - fields.DstPort, - fields.Packets, - fields.Proto, - fields.Bytes: - return true - default: - return false - } -} diff --git a/pkg/loki/query2.go b/pkg/loki/query2.go new file mode 100644 index 000000000..018d3a70e --- /dev/null +++ b/pkg/loki/query2.go @@ -0,0 +1,204 @@ +// Package loki provides functionalities for interacting with Loki +package loki + +import ( + "fmt" + "strings" + + "github.com/netobserv/network-observability-console-plugin/pkg/model/fields" + "github.com/netobserv/network-observability-console-plugin/pkg/utils/constants" +) + +// FlowQueryBuilder stores a state to build a LogQL query +type FlowQueryBuilder struct { + config *Config + startTime string + endTime string + limit string + labelFilters []labelFilter + lineFilters []string + jsonFilters []labelFilter +} + +func NewFlowQueryBuilder(cfg *Config, start, end, limit, reporter string) *FlowQueryBuilder { + // Always use app stream selector, which will apply whichever matching criteria (any or all) + labelFilters := []labelFilter{ + stringLabelFilter(constants.AppLabel, constants.AppLabelValue), + } + if len(reporter) > 0 { + labelFilters = append(labelFilters, stringLabelFilter(fields.FlowDirection, reporter)) + } + return &FlowQueryBuilder{ + config: cfg, + startTime: start, + endTime: end, + limit: limit, + labelFilters: labelFilters, + } +} + +func (q *FlowQueryBuilder) Filters(filters map[string]string) error { + for key, values := range filters { + if err := q.AddFilter(key, values); err != nil { + return err + } + } + return nil +} + +func (q *FlowQueryBuilder) AddFilter(key, joinedValues string) error { + if !filterRegexpValidation.MatchString(joinedValues) { + return fmt.Errorf("unauthorized sign in flows request: %s", joinedValues) + } + + values := strings.Split(joinedValues, ",") + + // Stream selector labels + if q.config.IsLabel(key) { + if len(values) == 1 && isExactMatch(values[0]) { + q.addExactMatchSingleLabel(key, trimExactMatch(values[0])) + } else { + q.addLabelRegex(key, values) + } + } else if fields.IsIP(key) { + q.addIPFilters(key, values) + } else { + q.addLineFilters(key, values) + } + + return nil +} + +func (q *FlowQueryBuilder) addExactMatchSingleLabel(key string, value string) { + q.labelFilters = append(q.labelFilters, stringLabelFilter(key, value)) +} + +func (q *FlowQueryBuilder) addLabelRegex(key string, values []string) { + regexStr := strings.Builder{} + for i, value := range values { + if i > 0 { + regexStr.WriteByte('|') + } + //match the beginning of string if quoted without a star + //and case insensitive if no quotes + if !strings.HasPrefix(value, `"`) { + regexStr.WriteString("(?i).*") + } else if !strings.HasPrefix(value, `"*`) { + regexStr.WriteString("^") + } + //inject value with regex + regexStr.WriteString(valueReplacer.Replace(value)) + //match the end of string if quoted without a star + if !strings.HasSuffix(value, `"`) { + regexStr.WriteString(".*") + } else if !strings.HasSuffix(value, `*"`) { + regexStr.WriteString("$") + } + } + + if regexStr.Len() > 0 { + q.labelFilters = append(q.labelFilters, regexLabelFilter(key, regexStr.String())) + } +} + +func (q *FlowQueryBuilder) addLineFilters(key string, values []string) { + regexStr := strings.Builder{} + for i, value := range values { + if i > 0 { + regexStr.WriteByte('|') + } + //match end of KEY + regex VALUE: + //if numeric, KEY":VALUE + //if string KEY":"VALUE" + //ie 'Port' key will match both 'SrcPort":"XXX"' and 'DstPort":"XXX" + //VALUE can be quoted for exact match or contains * to inject regex any + regexStr.WriteString(key) + regexStr.WriteString(`":`) + if fields.IsNumeric(key) { + regexStr.WriteString(value) + } else { + regexStr.WriteString(`"`) + // match start any if not quoted + // and case insensitive + if !strings.HasPrefix(value, `"`) { + regexStr.WriteString("(?i).*") + } + //inject value with regex + regexStr.WriteString(valueReplacer.Replace(value)) + // match end any if not quoted + if !strings.HasSuffix(value, `"`) { + regexStr.WriteString(".*") + } + regexStr.WriteString(`"`) + } + } + + if regexStr.Len() > 0 { + q.lineFilters = append(q.lineFilters, regexStr.String()) + } +} + +// addIPFilters assumes that we are searching for that IP addresses as part +// of the log line (not in the stream selector labels) +func (q *FlowQueryBuilder) addIPFilters(key string, values []string) { + for _, value := range values { + q.jsonFilters = append(q.jsonFilters, ipLabelFilter(key, value)) + } +} + +func (q *FlowQueryBuilder) Build() string { + sb := strings.Builder{} + sb.WriteString(strings.TrimRight(q.config.URL.String(), "/")) + sb.WriteString(queryRangePath) + sb.WriteString("{") + for i, ss := range q.labelFilters { + if i > 0 { + sb.WriteByte(',') + } + ss.writeInto(&sb) + } + sb.WriteByte('}') + for _, lf := range q.lineFilters { + sb.WriteString("|~`") + sb.WriteString(lf) + sb.WriteByte('`') + } + if len(q.jsonFilters) > 0 { + sb.WriteString("|json|") + for i, lf := range q.jsonFilters { + if i > 0 { + sb.WriteByte('|') + } + lf.writeInto(&sb) + } + } + q.appendQueryParams(&sb) + return sb.String() +} + +func (q *FlowQueryBuilder) appendQueryParams(sb *strings.Builder) { + if len(q.startTime) > 0 { + appendQueryParam(sb, startParam, q.startTime) + } + if len(q.endTime) > 0 { + appendQueryParam(sb, endParam, q.endTime) + } + if len(q.limit) > 0 { + appendQueryParam(sb, limitParam, q.limit) + } +} + +func appendQueryParam(sb *strings.Builder, key, value string) { + sb.WriteByte('&') + sb.WriteString(key) + sb.WriteByte('=') + sb.WriteString(value) +} + +func isExactMatch(value string) bool { + return strings.HasPrefix(value, `"`) && strings.HasSuffix(value, `"`) +} + +func trimExactMatch(value string) string { + return strings.TrimPrefix(strings.TrimSuffix(value, `"`), `"`) +} diff --git a/pkg/loki/query_test.go b/pkg/loki/query_test.go index aed5ae26a..b68372eac 100644 --- a/pkg/loki/query_test.go +++ b/pkg/loki/query_test.go @@ -1,7 +1,9 @@ package loki import ( + "net/url" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -14,20 +16,24 @@ func TestQuery_ToURL_ConvertToAnyMatch(t *testing.T) { expect string expectAny string } + lokiURL, err := url.Parse("/") + require.NoError(t, err) + cfg := NewConfig(lokiURL, time.Second, "", []string{}) for _, tc := range []testCase{{ title: "streamSelector only", - in: Query{streamSelector: []labelFilter{ - stringLabelFilter("app", labelEqual, "flows"), - stringLabelFilter("foo", labelMatches, ".*bar.*"), - stringLabelFilter("baz", labelMatches, ".*bae.*"), + in: Query{config: &cfg, streamSelector: []labelFilter{ + stringLabelFilter("app", "flows"), + regexLabelFilter("foo", ".*bar.*"), + regexLabelFilter("baz", ".*bae.*"), }}, expect: `/loki/api/v1/query_range?query={app="flows",foo=~".*bar.*",baz=~".*bae.*"}`, expectAny: `/loki/api/v1/query_range?query={dont="fail"}|json|app="flows"+or+foo=~".*bar.*"+or+baz=~".*bae.*"`, }, { title: "streamSelector with line filters", in: Query{ + config: &cfg, streamSelector: []labelFilter{ - stringLabelFilter("app", labelEqual, "netobs"), + stringLabelFilter("app", "netobs"), }, lineFilters: []string{`"DstPort":1234`, `"Namespace":".*hiya`}, }, @@ -36,10 +42,11 @@ func TestQuery_ToURL_ConvertToAnyMatch(t *testing.T) { }, { title: "streamSelector with label filters", in: Query{ - streamSelector: []labelFilter{stringLabelFilter("app", labelEqual, "some-app")}, + config: &cfg, + streamSelector: []labelFilter{stringLabelFilter("app", "some-app")}, labelJoiner: joinOr, labelFilters: []labelFilter{ - stringLabelFilter("foo", labelMatches, "bar"), + regexLabelFilter("foo", "bar"), intLabelFilter("port", 1234), ipLabelFilter("SrcAddr", "123.0.0.0/16"), }, @@ -49,9 +56,10 @@ func TestQuery_ToURL_ConvertToAnyMatch(t *testing.T) { }, { title: "streamSelector + line filters + label filters", in: Query{ - streamSelector: []labelFilter{stringLabelFilter("app", labelEqual, "the-app")}, + config: &cfg, + streamSelector: []labelFilter{stringLabelFilter("app", "the-app")}, labelJoiner: joinOr, - labelFilters: []labelFilter{stringLabelFilter("foo", labelMatches, "bar")}, + labelFilters: []labelFilter{regexLabelFilter("foo", "bar")}, lineFilters: []string{`"DstPod":".*podaco"`}, }, expect: "/loki/api/v1/query_range?query={app=\"the-app\"}|~`\"DstPod\":\".*podaco\"`|json|foo=~\"bar\"", @@ -66,7 +74,7 @@ func TestQuery_ToURL_ConvertToAnyMatch(t *testing.T) { // we need to have at least a stream selector, so we add a label for testing purposes // (in production, we add the app label) anyMatchQuery.streamSelector = append(anyMatchQuery.streamSelector, - stringLabelFilter("dont", labelEqual, "fail")) + stringLabelFilter("dont", "fail")) anyMatchURL, err := anyMatchQuery.URLQuery() require.NoError(t, err) assert.Equal(t, tc.expectAny, anyMatchURL) @@ -75,8 +83,12 @@ func TestQuery_ToURL_ConvertToAnyMatch(t *testing.T) { } func TestQuery_AddURLParam(t *testing.T) { + lokiURL, err := url.Parse("/") + require.NoError(t, err) + cfg := NewConfig(lokiURL, time.Second, "", []string{}) query := Query{ - streamSelector: []labelFilter{stringLabelFilter("app", labelEqual, "the-app")}, + config: &cfg, + streamSelector: []labelFilter{stringLabelFilter("app", "the-app")}, } query.addURLParam("foo", "bar") query.addURLParam("flis", "flas") @@ -86,6 +98,9 @@ func TestQuery_AddURLParam(t *testing.T) { } func TestQuery_BackQuote_Error(t *testing.T) { - query := NewQuery("/", []string{"lab1", "lab2"}, false) + lokiURL, err := url.Parse("/") + require.NoError(t, err) + cfg := NewConfig(lokiURL, time.Second, "", []string{"lab1", "lab2"}) + query := NewQuery(&cfg, false) assert.Error(t, query.AddParam("key", "backquoted`val")) } diff --git a/pkg/loki/topology_query.go b/pkg/loki/topology_query.go index c20692f78..923771ab9 100644 --- a/pkg/loki/topology_query.go +++ b/pkg/loki/topology_query.go @@ -26,9 +26,9 @@ type TopologyQueryBuilder struct { topology *Topology } -func NewTopologyQuery(baseURL string, labels []string) *TopologyQueryBuilder { +func NewTopologyQuery(cfg *Config) *TopologyQueryBuilder { return &TopologyQueryBuilder{ - Query: NewQuery(baseURL, labels, false), + Query: NewQuery(cfg, false), topology: &Topology{}, } } diff --git a/pkg/model/fields/fields.go b/pkg/model/fields/fields.go index 14cba03f9..8a5849162 100644 --- a/pkg/model/fields/fields.go +++ b/pkg/model/fields/fields.go @@ -1,35 +1,40 @@ package fields +import ( + "strings" +) + const ( - Src = "Src" - Dst = "Dst" - Namespace = "K8S_Namespace" - SrcNamespace = Src + Namespace - DstNamespace = Dst + Namespace - OwnerType = "K8S_OwnerType" - SrcOwnerType = Src + OwnerType - DstOwnerType = Dst + OwnerType - OwnerName = "K8S_OwnerName" - SrcOwnerName = Src + OwnerName - DstOwnerName = Dst + OwnerName - Type = "K8S_Type" - SrcType = Src + Type - DstType = Dst + Type - Name = "K8S_Name" - SrcName = Src + Name - DstName = Dst + Name - Addr = "Addr" - SrcAddr = Src + Addr - DstAddr = Dst + Addr - Port = "Port" - SrcPort = Src + Port - DstPort = Dst + Port - HostIP = "K8S_HostIP" - SrcHostIP = Src + HostIP - DstHostIP = Dst + HostIP - K8SObject = "K8S_Object" - SrcK8SObject = Src + K8SObject - DstK8SObject = Dst + K8SObject + Src = "Src" + Dst = "Dst" + Namespace = "K8S_Namespace" + SrcNamespace = Src + Namespace + DstNamespace = Dst + Namespace + OwnerType = "K8S_OwnerType" + SrcOwnerType = Src + OwnerType + DstOwnerType = Dst + OwnerType + OwnerName = "K8S_OwnerName" + SrcOwnerName = Src + OwnerName + DstOwnerName = Dst + OwnerName + Type = "K8S_Type" + SrcType = Src + Type + DstType = Dst + Type + Name = "K8S_Name" + SrcName = Src + Name + DstName = Dst + Name + Addr = "Addr" + SrcAddr = Src + Addr + DstAddr = Dst + Addr + Port = "Port" + SrcPort = Src + Port + DstPort = Dst + Port + HostIP = "K8S_HostIP" + SrcHostIP = Src + HostIP + DstHostIP = Dst + HostIP + K8SObject = "K8S_Object" + SrcK8SObject = Src + K8SObject + DstK8SObject = Dst + K8SObject + // TODO: remove OwnerObject K8SOwnerObject = "K8S_OwnerObject" SrcK8SOwnerObject = Src + K8SOwnerObject DstK8SOwnerObject = Dst + K8SOwnerObject @@ -39,8 +44,73 @@ const ( Packets = "Packets" Proto = "Proto" Bytes = "Bytes" + FlowDirection = "FlowDirection" ) func ToSrcDst(key string) (string, string) { return Src + key, Dst + key } + +func IsNumeric(v string) bool { + switch v { + case + Port, + SrcPort, + DstPort, + Packets, + Proto, + Bytes: + return true + default: + return false + } +} + +func IsIP(f string) bool { + switch f { + case + DstAddr, + SrcAddr, + DstHostIP, + SrcHostIP: + return true + default: + return false + } +} + +func IsK8SResourcePath(f string) bool { + switch f { + case + SrcK8SObject, + SrcK8SOwnerObject, + DstK8SObject, + DstK8SOwnerObject, + K8SObject, + K8SOwnerObject: + return true + default: + return false + } +} + +func IsAddrPort(f string) bool { + switch f { + case + AddrPort, + SrcAddrPort, + DstAddrPort: + return true + default: + return false + } +} + +func GetPrefix(field string) string { + if strings.HasPrefix(field, Src) { + return Src + } else if strings.HasPrefix(field, Dst) { + return Dst + } + return "" +} diff --git a/pkg/server/flows_test.go b/pkg/server/flows_test.go new file mode 100644 index 000000000..8018f44b3 --- /dev/null +++ b/pkg/server/flows_test.go @@ -0,0 +1,314 @@ +package server + +import ( + "encoding/json" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/netobserv/network-observability-console-plugin/pkg/loki" + "github.com/netobserv/network-observability-console-plugin/pkg/model" +) + +func TestLokiFiltering2(t *testing.T) { + testCases := []struct { + inputPath string + // Either outputQueries or outputQueryParts should be defined + // Use outputQueries when multiple queries are expected (parallel queries for match any) + // Use outputQueryParts when single query is expected but the filters order isn't predictable + outputQueries []string + outputQueryParts []string + }{{ + inputPath: "?filters=SrcK8S_Name=test-pod", + outputQueries: []string{ + "?query={app=\"netobserv-flowcollector\"}|~`SrcK8S_Name\":\"(?i).*test-pod.*\"`", + }, + }, { + inputPath: "?filters=SrcK8S_Name=test-pod&match=any", + outputQueries: []string{ + "?query={app=\"netobserv-flowcollector\"}|~`SrcK8S_Name\":\"(?i).*test-pod.*\"`", + }, + }, { + inputPath: "?filters=Proto=6&match=all", + outputQueries: []string{ + "?query={app=\"netobserv-flowcollector\"}|~`Proto\":6`", + }, + }, { + inputPath: "?filters=Proto=6&match=any", + outputQueries: []string{ + "?query={app=\"netobserv-flowcollector\"}|~`Proto\":6`", + }, + }, { + inputPath: "?filters=" + url.QueryEscape("Proto=6;SrcK8S_Name=test"), + outputQueryParts: []string{ + "?query={app=\"netobserv-flowcollector\"}", + "|~`Proto\":6`", + "|~`SrcK8S_Name\":\"(?i).*test.*\"`", + }, + }, { + inputPath: "?filters=" + url.QueryEscape("Proto=6;SrcK8S_Name=test") + "&match=any", + outputQueries: []string{ + "?query={app=\"netobserv-flowcollector\"}|~`Proto\":6`", + "?query={app=\"netobserv-flowcollector\"}|~`SrcK8S_Name\":\"(?i).*test.*\"`", + }, + }, { + inputPath: "?filters=" + url.QueryEscape("Proto=6;SrcK8S_Name=test") + "&reporter=1&match=any", + outputQueries: []string{ + "?query={app=\"netobserv-flowcollector\",FlowDirection=\"1\"}|~`Proto\":6`", + "?query={app=\"netobserv-flowcollector\",FlowDirection=\"1\"}|~`SrcK8S_Name\":\"(?i).*test.*\"`", + }, + }, { + inputPath: "?filters=" + url.QueryEscape("SrcK8S_Namespace=test-namespace") + "&match=all", + outputQueries: []string{ + `?query={app="netobserv-flowcollector",SrcK8S_Namespace=~"(?i).*test-namespace.*"}`, + }, + }, { + inputPath: "?filters=" + url.QueryEscape("SrcK8S_Namespace=test-namespace") + "&match=any", + outputQueries: []string{ + `?query={app="netobserv-flowcollector",SrcK8S_Namespace=~"(?i).*test-namespace.*"}`, + }, + }, { + inputPath: "?filters=" + url.QueryEscape("SrcPort=8080;SrcAddr=10.128.0.1;SrcK8S_Namespace=default"), + outputQueries: []string{ + "?query={app=\"netobserv-flowcollector\",SrcK8S_Namespace=~\"(?i).*default.*\"}|~`SrcPort\":8080`|json|SrcAddr=ip(\"10.128.0.1\")", + }, + }, { + inputPath: "?filters=" + url.QueryEscape("SrcAddr=10.128.0.1;DstAddr=10.128.0.2"), + outputQueryParts: []string{ + "?query={app=\"netobserv-flowcollector\"}|json", + "|SrcAddr=ip(\"10.128.0.1\")", + "|DstAddr=ip(\"10.128.0.2\")", + }, + }, { + inputPath: "?filters=" + url.QueryEscape("SrcPort=8080;SrcAddr=10.128.0.1;SrcK8S_Namespace=default") + "&match=any", + outputQueries: []string{ + "?query={app=\"netobserv-flowcollector\",SrcK8S_Namespace=~\"(?i).*default.*\"}", + "?query={app=\"netobserv-flowcollector\"}|json|SrcAddr=ip(\"10.128.0.1\")", + "?query={app=\"netobserv-flowcollector\"}|~`SrcPort\":8080`", + }, + }, { + inputPath: "?filters=" + url.QueryEscape("SrcPort=8080;SrcAddr=10.128.0.1;SrcK8S_Namespace=default") + "&match=any&reporter=0", + outputQueries: []string{ + "?query={app=\"netobserv-flowcollector\",FlowDirection=\"0\",SrcK8S_Namespace=~\"(?i).*default.*\"}", + "?query={app=\"netobserv-flowcollector\",FlowDirection=\"0\"}|json|SrcAddr=ip(\"10.128.0.1\")", + "?query={app=\"netobserv-flowcollector\",FlowDirection=\"0\"}|~`SrcPort\":8080`", + }, + }, { + inputPath: "?startTime=1640991600&match=all", + outputQueries: []string{`?query={app="netobserv-flowcollector"}&start=1640991600`}, + }, { + inputPath: "?startTime=1640991600&match=any", + outputQueries: []string{`?query={app="netobserv-flowcollector"}&start=1640991600`}, + }, { + inputPath: "?endTime=1641160800", + outputQueries: []string{`?query={app="netobserv-flowcollector"}&end=1641160800`}, + }, { + inputPath: "?endTime=1641160800&match=any", + outputQueries: []string{`?query={app="netobserv-flowcollector"}&end=1641160800`}, + }, { + inputPath: "?startTime=1640991600&endTime=1641160800&match=all", + outputQueries: []string{`?query={app="netobserv-flowcollector"}&start=1640991600&end=1641160800`}, + }, { + inputPath: "?startTime=1640991600&endTime=1641160800&match=any", + outputQueries: []string{`?query={app="netobserv-flowcollector"}&start=1640991600&end=1641160800`}, + // }, { + // inputPath: "?timeRange=300000", + // outputQueries: []string{`?query={app="netobserv-flowcollector"}&start=${timeNow-300000}`}, + // }, { + // inputPath: "?timeRange=300000&match=any", + // outputQueries: []string{`?query={app="netobserv-flowcollector"}&start=${timeNow-300000}`}, + // }, { + // inputPath: "?timeRange=86400000&match=all", + // outputQueries: []string{`?query={app="netobserv-flowcollector"}&start=${timeNow-86400000}`}, + // }, { + // inputPath: "?timeRange=86400000&match=any", + // outputQueries: []string{`?query={app="netobserv-flowcollector"}&start=${timeNow-86400000}`}, + }, { + inputPath: "?filters=" + url.QueryEscape("SrcK8S_Namespace=\"exact-namespace\""), + outputQueries: []string{ + `?query={app="netobserv-flowcollector",SrcK8S_Namespace="exact-namespace"}`, + }, + // }, { + // inputPath: "?filters=" + url.QueryEscape("SrcK8S_Namespace=\"start-namespace*\""), + // outputQueries: []string{ + // `?query={app="netobserv-flowcollector",SrcK8S_Namespace=~"^start-namespace.*"}`, + // }, + // }, { + // inputPath: "?filters=" + url.QueryEscape("SrcK8S_Namespace=\"*end-namespace\""), + // outputQueries: []string{ + // `?query={app="netobserv-flowcollector",SrcK8S_Namespace=~".*end-namespace$"}`, + // }, + // }, { + // inputPath: "?filters=" + url.QueryEscape("SrcK8S_Namespace=\"mid-n*e\""), + // outputQueries: []string{ + // `?query={app="netobserv-flowcollector",SrcK8S_Namespace=~"^mid-n.*e$"}`, + // }, + }, { + inputPath: "?filters=" + url.QueryEscape("SrcK8S_Name=\"exact-pod\""), + outputQueries: []string{ + "?query={app=\"netobserv-flowcollector\"}|~`SrcK8S_Name\":\"exact-pod\"`", + }, + // }, { + // inputPath: "?filters=" + url.QueryEscape("SrcK8S_Name=\"start-pod*\""), + // outputQueries: []string{ + // "?query={app=\"netobserv-flowcollector\"}|~`SrcK8S_Name\":\"start-pod.*\"`", + // }, + // }, { + // inputPath: "?filters=" + url.QueryEscape("SrcK8S_Name=\"*end-pod\""), + // outputQueries: []string{ + // "?query={app=\"netobserv-flowcollector\"}|~`SrcK8S_Name\":\".*end-pod\"`", + // }, + // }, { + // inputPath: "?filters=" + url.QueryEscape("SrcK8S_Name=\"mid-*d\""), + // outputQueries: []string{ + // "?query={app=\"netobserv-flowcollector\"}|~`SrcK8S_Name\":\"mid-.*d\"`", + // }, + }, { + inputPath: "?filters=" + url.QueryEscape("Port=8080;K8S_Name=test"), + outputQueryParts: []string{ + "?query={app=\"netobserv-flowcollector\"}", + "|~`Port\":8080`", + "|~`K8S_Name\":\"(?i).*test.*\"`", + }, + }, { + inputPath: "?filters=" + url.QueryEscape("Port=8080;K8S_Name=test") + "&match=any", + outputQueries: []string{ + "?query={app=\"netobserv-flowcollector\"}|~`K8S_Name\":\"(?i).*test.*\"`", + "?query={app=\"netobserv-flowcollector\"}|~`Port\":8080`", + }, + }, { + inputPath: "?filters=" + url.QueryEscape("Port=8080;K8S_Namespace=test"), + outputQueries: []string{ + "?query={app=\"netobserv-flowcollector\",SrcK8S_Namespace=~\"(?i).*test.*\"}|~`Port\":8080`", + "?query={app=\"netobserv-flowcollector\",DstK8S_Namespace=~\"(?i).*test.*\"}|~`Port\":8080`", + }, + }, { + inputPath: "?filters=" + url.QueryEscape("Port=8080;K8S_Namespace=test") + "&match=any", + outputQueries: []string{ + "?query={app=\"netobserv-flowcollector\",SrcK8S_Namespace=~\"(?i).*test.*\"}", + "?query={app=\"netobserv-flowcollector\",DstK8S_Namespace=~\"(?i).*test.*\"}", + "?query={app=\"netobserv-flowcollector\"}|~`Port\":8080`", + }, + }, { + inputPath: "?filters=" + url.QueryEscape("Port=8080;SrcK8S_Object=Pod.default.test"), + outputQueryParts: []string{ + `?query={app="netobserv-flowcollector",SrcK8S_Namespace="default"}`, + "|~`SrcK8S_Type\":\"Pod\"`", + "|~`SrcK8S_Name\":\"test\"`", + "|~`Port\":8080`", + }, + }, { + inputPath: "?filters=" + url.QueryEscape("SrcK8S_Object=Pod.default.test") + "&match=any", + outputQueryParts: []string{ + `?query={app="netobserv-flowcollector",SrcK8S_Namespace="default"}`, + "|~`SrcK8S_Type\":\"Pod\"`", + "|~`SrcK8S_Name\":\"test\"`", + }, + }} + + numberQueriesExpected := 0 + for _, tc := range testCases { + if len(tc.outputQueries) > 0 { + numberQueriesExpected += len(tc.outputQueries) + } else { + numberQueriesExpected++ + } + } + + // GIVEN a Loki service + lokiMock := httpMock{} + emptyResponse, _ := json.Marshal(model.QueryResponse{ + Status: "", + Data: model.QueryResponseData{ + ResultType: model.ResultTypeStream, + Result: model.Streams{}, + }, + }) + lokiMock.On("ServeHTTP", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + _, _ = args.Get(0).(http.ResponseWriter).Write(emptyResponse) + }).Times(numberQueriesExpected) + lokiSvc := httptest.NewServer(&lokiMock) + defer lokiSvc.Close() + lokiURL, err := url.Parse(lokiSvc.URL) + require.NoError(t, err) + + // THAT is accessed behind the NOO console plugin backend + backendRoutes := setupRoutes(&Config{ + Loki: loki.NewConfig( + lokiURL, + time.Second, + "", + []string{"SrcK8S_Namespace", "SrcK8S_OwnerName", "DstK8S_Namespace", "DstK8S_OwnerName", "FlowDirection"}, + ), + }) + backendSvc := httptest.NewServer(backendRoutes) + defer backendSvc.Close() + + // timeNowArg := regexp.MustCompile(`\${timeNow-(\d+)}`) + nCall := 0 + + for _, tc := range testCases { + t.Run(tc.inputPath, func(t *testing.T) { + // WHEN the Loki flows endpoint is queried in the backend + // now := time.Now().Unix() + res, err := backendSvc.Client().Get(backendSvc.URL + "/api/loki/flows2" + tc.inputPath) + require.NoError(t, err) + body, err := ioutil.ReadAll(res.Body) + require.NoError(t, err) + require.Equalf(t, http.StatusOK, res.StatusCode, + "unexpected return %s: %s", res.Status, string(body)) + + // THEN each filter argument has been properly forwarded to Loki + var expectedURLs []string + for _, out := range tc.outputQueries { + expectedURLs = append(expectedURLs, "/loki/api/v1/query_range"+out) + } + + // TODO: restore time mocking / testing + if len(expectedURLs) > 0 { + for range expectedURLs { + requestURL := lokiMock.Calls[nCall].Arguments[1].(*http.Request).URL.String() + assert.Contains(t, expectedURLs, requestURL) + nCall++ + } + } else { + for i, part := range tc.outputQueryParts { + requestURL := lokiMock.Calls[nCall].Arguments[1].(*http.Request).URL.String() + if i == 0 { + // First part always includes URL + part = "/loki/api/v1/query_range" + part + } + assert.Contains(t, requestURL, part) + } + nCall++ + } + + // if subMatches := timeNowArg.FindStringSubmatch(tc.outputQueries[0]); len(subMatches) == 0 { + // assert.Contains(t, expectedURLs, requestURL) + // } else { + // // replace ${timeNow-} by time.Now()- for arguments where the + // // value is dynamically calculated via the non-mockable time.Now() function + // timeNowDiff, err := strconv.ParseInt(subMatches[1], 10, 64) + // require.NoError(t, err) + // // giving 1-second room to the start time to avoid flaky tests + // expectedStart := int(now - timeNowDiff) + // var possibleQueries []string + // for _, exp := range expectedURLs { + // possibleQueries = append(possibleQueries, []string{ + // timeNowArg.ReplaceAllString(exp, strconv.Itoa(expectedStart-1)), + // timeNowArg.ReplaceAllString(exp, strconv.Itoa(expectedStart)), + // timeNowArg.ReplaceAllString(exp, strconv.Itoa(expectedStart+1)), + // }...) + // } + // assert.Contains(t, possibleQueries, requestURL) + // } + }) + } +} diff --git a/pkg/server/routes.go b/pkg/server/routes.go index a544f0ef7..9ed672723 100644 --- a/pkg/server/routes.go +++ b/pkg/server/routes.go @@ -12,6 +12,7 @@ func setupRoutes(cfg *Config) *mux.Router { r := mux.NewRouter() r.HandleFunc("/api/status", handler.Status) r.HandleFunc("/api/loki/flows", handler.GetFlows(cfg.Loki, false)) + r.HandleFunc("/api/loki/flows2", handler.GetFlows2(cfg.Loki)) r.HandleFunc("/api/loki/export", handler.GetFlows(cfg.Loki, true)) r.HandleFunc("/api/loki/topology", handler.GetTopology(cfg.Loki)) r.HandleFunc("/api/resources/namespaces", handler.GetNamespaces(cfg.Loki)) diff --git a/pkg/server/server.go b/pkg/server/server.go index 5b08f995a..7a0c072da 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -8,7 +8,7 @@ import ( "github.com/sirupsen/logrus" - "github.com/netobserv/network-observability-console-plugin/pkg/handler" + "github.com/netobserv/network-observability-console-plugin/pkg/loki" ) var slog = logrus.WithField("module", "server") @@ -21,7 +21,7 @@ type Config struct { CORSAllowMethods string CORSAllowHeaders string CORSMaxAge string - Loki handler.LokiConfig + Loki loki.Config } func Start(cfg *Config) { diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 1e7d6cc87..20e1b222c 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/require" "github.com/netobserv/network-observability-console-plugin/pkg/handler" + "github.com/netobserv/network-observability-console-plugin/pkg/loki" ) const ( @@ -54,7 +55,7 @@ func TestServerRunning(t *testing.T) { go func() { Start(&Config{ - Loki: handler.LokiConfig{ + Loki: loki.Config{ URL: &url.URL{Scheme: "http", Host: "localhost:3100"}, }, Port: testPort, @@ -129,7 +130,7 @@ func TestSecureComm(t *testing.T) { CertFile: testServerCertFile, PrivateKeyFile: testServerKeyFile, Port: testPort, - Loki: handler.LokiConfig{ + Loki: loki.Config{ URL: &url.URL{Scheme: "http", Host: "localhost:3100"}, }, } @@ -200,7 +201,7 @@ func TestLokiConfiguration(t *testing.T) { // THAT is accessed behind the NOO console plugin backend backendRoutes := setupRoutes(&Config{ - Loki: handler.LokiConfig{ + Loki: loki.Config{ URL: lokiURL, Timeout: time.Second, }, @@ -251,7 +252,7 @@ func TestLokiConfiguration_MultiTenant(t *testing.T) { // GIVEN a NOO console plugin backend configured for Multi tenant mode backendRoutes := setupRoutes(&Config{ - Loki: handler.LokiConfig{ + Loki: loki.Config{ URL: lokiURL, Timeout: time.Second, TenantID: "my-organisation", @@ -438,11 +439,12 @@ func TestLokiFiltering(t *testing.T) { // THAT is accessed behind the NOO console plugin backend backendRoutes := setupRoutes(&Config{ - Loki: handler.LokiConfig{ - URL: lokiURL, - Timeout: time.Second, - Labels: []string{"SrcK8S_Namespace", "SrcK8S_OwnerName", "DstK8S_Namespace", "DstK8S_OwnerName", "FlowDirection"}, - }, + Loki: loki.NewConfig( + lokiURL, + time.Second, + "", + []string{"SrcK8S_Namespace", "SrcK8S_OwnerName", "DstK8S_Namespace", "DstK8S_OwnerName", "FlowDirection"}, + ), }) backendSvc := httptest.NewServer(backendRoutes) defer backendSvc.Close() diff --git a/pkg/utils/constants/constants.go b/pkg/utils/constants/constants.go new file mode 100644 index 000000000..461462b70 --- /dev/null +++ b/pkg/utils/constants/constants.go @@ -0,0 +1,6 @@ +package constants + +const ( + AppLabel = "app" + AppLabelValue = "netobserv-flowcollector" +) diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index a1234412a..80410964a 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -48,3 +48,9 @@ func NonEmpty(s []string) []string { } return nonempty } + +func MergeMaps(into, other map[string]string) { + for k, v := range other { + into[k] = v + } +}