From ef244dc5f8898a7b4d6f05b30b29b04e2e90bdf0 Mon Sep 17 00:00:00 2001 From: Chris Grindstaff Date: Tue, 19 Nov 2024 08:08:33 -0500 Subject: [PATCH] perf: reduce the memory footprint of REST collector (#3303) * perf: reduce the memory footprint of REST collector When the REST collector walks pagination links, it does so recursively. Each of these recursive calls retains the previous REST response on the stack, and when there are many recursive calls, a lot of memory can be used. Replace the recursive calls with iterative ones. * perf: reduce the memory footprint of REST collector When the REST collector walks pagination links, it does so recursively. Each of these recursive calls retains the previous REST response on the stack, and when there are many recursive calls, a lot of memory can be used. Replace the recursive calls with iterative ones. --- cmd/tools/rest/rest.go | 334 +++++++++++++++++++++-------------------- 1 file changed, 172 insertions(+), 162 deletions(-) diff --git a/cmd/tools/rest/rest.go b/cmd/tools/rest/rest.go index ff347ae06..75488c18c 100644 --- a/cmd/tools/rest/rest.go +++ b/cmd/tools/rest/rest.go @@ -347,39 +347,46 @@ func GetPollerAndAddr(pName string) (*conf.Poller, string, error) { // FetchForCli used for CLI only func FetchForCli(client *Client, href string, records *[]any, downloadAll bool, curls *[]string) error { - getRest, err := client.GetRest(href) - if err != nil { - return fmt.Errorf("error making request %w", err) - } - - pollerAuth, err := client.auth.GetPollerAuth() - if err != nil { - return err - } - *curls = append(*curls, fmt.Sprintf("curl --user %s --insecure '%s%s'", pollerAuth.Username, client.baseURL, href)) - isNonIterRestCall := false - value := gjson.GetBytes(getRest, "records") - if value.String() == "" { - isNonIterRestCall = true - } + var prevLink string + nextLink := href - if isNonIterRestCall { - contentJSON := `{"records":[]}` - response, err := sjson.SetRawBytes([]byte(contentJSON), "records.-1", getRest) + for { + getRest, err := client.GetRest(nextLink) if err != nil { - return fmt.Errorf("error setting record %w", err) + return fmt.Errorf("error making request %w", err) } - var page Pagination - err = json.Unmarshal(response, &page) + + pollerAuth, err := client.auth.GetPollerAuth() if err != nil { - return fmt.Errorf("error unmarshalling json %w", err) + return err } - *records = append(*records, page.Records...) - } else { - // extract returned records since paginated records need to be merged into a single list + *curls = append(*curls, fmt.Sprintf("curl --user %s --insecure '%s%s'", pollerAuth.Username, client.baseURL, nextLink)) + + isNonIterRestCall := false + value := gjson.GetBytes(getRest, "records") + if value.String() == "" { + isNonIterRestCall = true + } + + if isNonIterRestCall { + contentJSON := `{"records":[]}` + response, err := sjson.SetRawBytes([]byte(contentJSON), "records.-1", getRest) + if err != nil { + return fmt.Errorf("error setting record %w", err) + } + var page Pagination + err = json.Unmarshal(response, &page) + if err != nil { + return fmt.Errorf("error unmarshalling json %w", err) + } + *records = append(*records, page.Records...) + break + } + + // The pagination struct is used to pretty print the JSON output var page Pagination - err := json.Unmarshal(getRest, &page) + err = json.Unmarshal(getRest, &page) if err != nil { return fmt.Errorf("error unmarshalling json %w", err) } @@ -387,22 +394,22 @@ func FetchForCli(client *Client, href string, records *[]any, downloadAll bool, *records = append(*records, page.Records...) // If all results are desired and there is a next link, follow it - if downloadAll && page.Links != nil { - nextLink, _ := url.QueryUnescape(page.Links.Next.Href) - if nextLink != "" { - // strip leading slash - nextLink = strings.TrimPrefix(nextLink, "/") - if nextLink == href { - // if nextLink is the same as the previous link, no progress is being made, exit - return nil - } - err := FetchForCli(client, nextLink, records, downloadAll, curls) - if err != nil { - return err - } - } + next := "" + if page.Links != nil { + next = page.Links.Next.Href + } + + prevLink = nextLink + nextLink = next + // strip leading slash + nextLink = strings.TrimPrefix(nextLink, "/") + + if nextLink == "" || nextLink == prevLink || !downloadAll { + // no nextLink, nextLink is the same as the previous link, or not all records are desired, exit + break } } + return nil } @@ -464,45 +471,46 @@ func FetchAnalytics(client *Client, href string) ([]gjson.Result, gjson.Result, } func fetchAll(client *Client, href string, records *[]gjson.Result, headers ...map[string]string) error { - getRest, err := client.GetRest(href, headers...) - if err != nil { - return fmt.Errorf("error making request %w", err) - } - output := gjson.ParseBytes(getRest) - data := output.Get("records") - numRecords := output.Get("num_records") - next := output.Get("_links.next.href") + var prevLink string + nextLink := href - if !data.Exists() { - contentJSON := `{"records":[]}` - response, err := sjson.SetRawBytes([]byte(contentJSON), "records.-1", getRest) + for { + response, err := client.GetRest(nextLink, headers...) if err != nil { - return fmt.Errorf("error setting record %w", err) - } - value := gjson.GetBytes(response, "records") - *records = append(*records, value) - } else { - // extract returned records since paginated records need to be merged into a single lists - if numRecords.Exists() && numRecords.Int() > 0 { - *records = append(*records, data) + return fmt.Errorf("error making request %w", err) } - // If all results are desired and there is a next link, follow it - if next.Exists() { - nextLink := next.String() - if nextLink != "" { - if nextLink == href { - // nextLink is the same as the previous link, no progress is being made, exit - return nil - } - err := fetchAll(client, nextLink, records) - if err != nil { - return err - } + output := gjson.ParseBytes(response) + data := output.Get("records") + numRecords := output.Get("num_records") + next := output.Get("_links.next.href") + + if data.Exists() { + // extract returned records since paginated records need to be merged into a single lists + if numRecords.Int() > 0 { + *records = append(*records, data) } + + prevLink = nextLink + // If there is a next link, follow it + nextLink = next.String() + if nextLink == "" || nextLink == prevLink { + // no nextLink or nextLink is the same as the previous link, no progress is being made, exit + break + } + } else { + contentJSON := `{"records":[]}` + response, err := sjson.SetRawBytes([]byte(contentJSON), "records.-1", response) + if err != nil { + return fmt.Errorf("error setting record %w", err) + } + value := gjson.GetBytes(response, "records") + *records = append(*records, value) + break } } + return nil } @@ -544,90 +552,92 @@ func FetchSome(client *Client, href string, recordsWanted int, batchSize string) } func fetchLimit(client *Client, href string, records *[]gjson.Result, recordsWanted int) error { - getRest, err := client.GetRest(href) - if err != nil { - return fmt.Errorf("error making request %w", err) - } - output := gjson.ParseBytes(getRest) - data := output.Get("records") - numRecords := output.Get("num_records") - next := output.Get("_links.next.href") + var prevLink string + nextLink := href - if !data.Exists() { - contentJSON := `{"records":[]}` - response, err := sjson.SetRawBytes([]byte(contentJSON), "records.-1", getRest) + for { + getRest, err := client.GetRest(nextLink) if err != nil { - return fmt.Errorf("error setting record %w", err) + return fmt.Errorf("error making request %w", err) } - value := gjson.GetBytes(response, "records") - *records = append(*records, value) - } else { - // extract returned records since paginated records need to be merged into a single lists - if numRecords.Exists() && numRecords.Int() > 0 { - *records = append(*records, data) - if recordsWanted != -1 { - recordsWanted -= int(numRecords.Int()) - if recordsWanted <= 0 { - return nil + + output := gjson.ParseBytes(getRest) + data := output.Get("records") + numRecords := output.Get("num_records") + next := output.Get("_links.next.href") + + if data.Exists() { + // extract returned records since paginated records need to be merged into a single lists + if numRecords.Int() > 0 { + *records = append(*records, data) + + if recordsWanted != -1 { + recordsWanted -= int(numRecords.Int()) + if recordsWanted <= 0 { + return nil + } } } - } - // Follow the next link - if next.Exists() { - nextLink := next.String() - if nextLink != "" { - if nextLink == href { - // nextLink is the same as the previous link, no progress is being made, exit - return nil - } - err := fetchLimit(client, nextLink, records, recordsWanted) - if err != nil { - return err - } + prevLink = nextLink + nextLink = next.String() + + if nextLink == "" || nextLink == prevLink { + // no nextLink or nextLink is the same as the previous link, no progress is being made, exit + break } + // Follow the next link + } else { + contentJSON := `{"records":[]}` + response, err := sjson.SetRawBytes([]byte(contentJSON), "records.-1", getRest) + if err != nil { + return fmt.Errorf("error setting record %w", err) + } + value := gjson.GetBytes(response, "records") + *records = append(*records, value) + break } } + return nil } func fetchAnalytics(client *Client, href string, records *[]gjson.Result, analytics *gjson.Result, downloadAll bool, maxRecords int64) error { - getRest, err := client.GetRest(href) - if err != nil { - return fmt.Errorf("error making request %w", err) - } - - output := gjson.ParseBytes(getRest) - data := output.Get("records") - numRecords := output.Get("num_records") - next := output.Get("_links.next.href") - *analytics = output.Get("analytics") - - // extract returned records since paginated records need to be merged into a single lists - if numRecords.Exists() && numRecords.Int() > 0 { - *records = append(*records, data) - if !downloadAll { - maxRecords -= numRecords.Int() - if maxRecords <= 0 { - return nil - } + + var prevLink string + nextLink := href + + for { + getRest, err := client.GetRest(nextLink) + if err != nil { + return fmt.Errorf("error making request %w", err) } - } - // If all results are desired and there is a next link, follow it - if next.Exists() && downloadAll { - nextLink := next.String() - if nextLink != "" { - if nextLink == href { - // nextLink is same as previous link, no progress is being made, exit - return nil - } - err := fetchAnalytics(client, nextLink, records, analytics, downloadAll, maxRecords) - if err != nil { - return err + output := gjson.ParseBytes(getRest) + data := output.Get("records") + numRecords := output.Get("num_records") + next := output.Get("_links.next.href") + *analytics = output.Get("analytics") + + // extract returned records since paginated records need to be merged into a single lists + if numRecords.Int() > 0 { + *records = append(*records, data) + if !downloadAll { + maxRecords -= numRecords.Int() + if maxRecords <= 0 { + return nil + } } } + + prevLink = nextLink + nextLink = next.String() + + if nextLink == "" || nextLink == prevLink || !downloadAll { + // no nextLink, nextLink is the same as the previous link, or not all records are desired, exit + break + } } return nil @@ -635,36 +645,36 @@ func fetchAnalytics(client *Client, href string, records *[]gjson.Result, analyt // FetchRestPerfData This method is used in PerfRest collector. This method returns timestamp per batch func FetchRestPerfData(client *Client, href string, perfRecords *[]PerfRecord, headers ...map[string]string) error { - getRest, err := client.GetRest(href, headers...) - if err != nil { - return fmt.Errorf("error making request %w", err) - } - // extract returned records since paginated records need to be merged into a single list - output := gjson.ParseBytes(getRest) - data := output.Get("records") - numRecords := output.Get("num_records") - next := output.Get("_links.next.href") + var prevLink string + nextLink := href - if numRecords.Exists() && numRecords.Int() > 0 { - p := PerfRecord{Records: data, Timestamp: time.Now().UnixNano()} - *perfRecords = append(*perfRecords, p) - } + for { + response, err := client.GetRest(nextLink, headers...) + if err != nil { + return fmt.Errorf("error making request %w", err) + } - // If all results are desired and there is a next link, follow it - if next.Exists() { - nextLink := next.String() - if nextLink != "" { - if nextLink == href { - // nextLink is same as previous link, no progress is being made, exit - return nil - } - err := FetchRestPerfData(client, strings.Clone(next.String()), perfRecords) - if err != nil { - return err - } + // extract returned records since paginated records need to be merged into a single list + output := gjson.ParseBytes(response) + data := output.Get("records") + numRecords := output.Get("num_records") + next := output.Get("_links.next.href") + + if numRecords.Int() > 0 { + p := PerfRecord{Records: data, Timestamp: time.Now().UnixNano()} + *perfRecords = append(*perfRecords, p) + } + + prevLink = nextLink + nextLink = next.String() + + if nextLink == "" || nextLink == prevLink { + // no nextLink or nextLink is the same as the previous link, no progress is being made, exit + break } } + return nil }