Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(turborepo) Add timeout to api client json methods #5350

Merged
merged 3 commits into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cli/internal/analytics/analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ type Client interface {
}

type Sink interface {
RecordAnalyticsEvents(events Events) error
RecordAnalyticsEvents(events Events, timeout time.Duration) error
}

type nullSink struct{}

func (n *nullSink) RecordAnalyticsEvents(events Events) error {
func (n *nullSink) RecordAnalyticsEvents(_ Events, _ time.Duration) error {
return nil
}

Expand Down Expand Up @@ -60,6 +60,7 @@ type worker struct {
const bufferThreshold = 10
const eventTimeout = 200 * time.Millisecond
const noTimeout = 24 * time.Hour
const requestTimeout = 10 * time.Second

func newWorker(ctx context.Context, ch <-chan EventPayload, sink Sink, logger hclog.Logger) *worker {
buffer := []EventPayload{}
Expand Down Expand Up @@ -154,7 +155,7 @@ func (w *worker) sendEvents(events []EventPayload) {
if err != nil {
w.logger.Debug("failed to encode cache usage analytics", "error", err)
}
err = w.sink.RecordAnalyticsEvents(payload)
err = w.sink.RecordAnalyticsEvents(payload, requestTimeout)
if err != nil {
w.logger.Debug("failed to record cache usage analytics", "error", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cli/internal/analytics/analytics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func newDummySink() *dummySink {
}
}

func (d *dummySink) RecordAnalyticsEvents(events Events) error {
func (d *dummySink) RecordAnalyticsEvents(events Events, _ time.Duration) error {
d.mu.Lock()
defer d.mu.Unlock()
// Make a copy in case a test is holding a copy too
Expand Down
5 changes: 3 additions & 2 deletions cli/internal/client/analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@ package client

import (
"encoding/json"
"time"
)

// RecordAnalyticsEvents is a specific method for POSTing events to Vercel
func (c *APIClient) RecordAnalyticsEvents(events []map[string]interface{}) error {
func (c *APIClient) RecordAnalyticsEvents(events []map[string]interface{}, timeout time.Duration) error {
body, err := json.Marshal(events)
if err != nil {
return err

}

// We don't care about the response here
if _, err := c.JSONPost("/v8/artifacts/events", body); err != nil {
if _, err := c.JSONPost("/v8/artifacts/events", body, timeout); err != nil {
return err
}

Expand Down
13 changes: 8 additions & 5 deletions cli/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ func (c *APIClient) addTeamParam(params *url.Values) {
}

// JSONPatch sends a byte array (json.marshalled payload) to a given endpoint with PATCH
func (c *APIClient) JSONPatch(endpoint string, body []byte) ([]byte, error) {
resp, err := c.request(endpoint, http.MethodPatch, body)
func (c *APIClient) JSONPatch(endpoint string, body []byte, timeout time.Duration) ([]byte, error) {
resp, err := c.request(endpoint, http.MethodPatch, body, timeout)
if err != nil {
return nil, err
}
Expand All @@ -217,8 +217,8 @@ func (c *APIClient) JSONPatch(endpoint string, body []byte) ([]byte, error) {
}

// JSONPost sends a byte array (json.marshalled payload) to a given endpoint with POST
func (c *APIClient) JSONPost(endpoint string, body []byte) ([]byte, error) {
resp, err := c.request(endpoint, http.MethodPost, body)
func (c *APIClient) JSONPost(endpoint string, body []byte, timeout time.Duration) ([]byte, error) {
resp, err := c.request(endpoint, http.MethodPost, body, timeout)
if err != nil {
return nil, err
}
Expand All @@ -236,7 +236,7 @@ func (c *APIClient) JSONPost(endpoint string, body []byte) ([]byte, error) {
return rawResponse, nil
}

func (c *APIClient) request(endpoint string, method string, body []byte) (*http.Response, error) {
func (c *APIClient) request(endpoint string, method string, body []byte, timeout time.Duration) (*http.Response, error) {
if err := c.okToRequest(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -266,6 +266,9 @@ func (c *APIClient) request(endpoint string, method string, body []byte) (*http.
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
req.WithContext(ctx)

// Set headers
req.Header.Set("Content-Type", "application/json")
Expand Down
26 changes: 25 additions & 1 deletion cli/internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@ package client

import (
"bytes"
"context"
"encoding/json"
"errors"
"io/ioutil"
"net/http"
"net/http/httptest"
"reflect"
"testing"
"time"

"github.com/google/uuid"
"github.com/hashicorp/go-hclog"
"github.com/vercel/turbo/cli/internal/turbostate"
"github.com/vercel/turbo/cli/internal/util"
"gotest.tools/v3/assert"
)

func Test_sendToServer(t *testing.T) {
Expand Down Expand Up @@ -57,7 +60,8 @@ func Test_sendToServer(t *testing.T) {
},
}

apiClient.RecordAnalyticsEvents(events)
err = apiClient.RecordAnalyticsEvents(events, 10*time.Second)
assert.NilError(t, err, "RecordAnalyticsEvent")

body := <-ch

Expand Down Expand Up @@ -158,3 +162,23 @@ func Test_FetchWhenCachingDisabled(t *testing.T) {
t.Errorf("response got %v, want <nil>", resp)
}
}

func Test_Timeout(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
<-time.After(50 * time.Millisecond)
}))
defer ts.Close()

// Set up test expected values
apiClientConfig := turbostate.APIClientConfig{
TeamSlug: "my-team-slug",
APIURL: ts.URL,
Token: "my-token",
}
apiClient := NewClient(apiClientConfig, hclog.Default(), "v1")

_, err := apiClient.JSONPost("/", []byte{}, 1*time.Millisecond)
if !errors.Is(err, context.DeadlineExceeded) {
t.Errorf("JSONPost got %v, want DeadlineExceeded", err)
}
}
39 changes: 21 additions & 18 deletions cli/internal/runsummary/spaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"sync"
"time"

"github.com/mitchellh/cli"
"github.com/vercel/turbo/cli/internal/ci"
Expand All @@ -28,15 +29,16 @@ func (req *spaceRequest) error(msg string) error {
}

type spacesClient struct {
requests chan *spaceRequest
errors []error
api *client.APIClient
ui cli.Ui
run *spaceRun
runCreated chan struct{}
wg sync.WaitGroup
spaceID string
enabled bool
requests chan *spaceRequest
errors []error
api *client.APIClient
ui cli.Ui
run *spaceRun
runCreated chan struct{}
wg sync.WaitGroup
spaceID string
enabled bool
requestTimeout time.Duration
}

type spaceRun struct {
Expand All @@ -46,13 +48,14 @@ type spaceRun struct {

func newSpacesClient(spaceID string, api *client.APIClient, ui cli.Ui) *spacesClient {
c := &spacesClient{
api: api,
ui: ui,
spaceID: spaceID,
enabled: false, // Start with disabled
requests: make(chan *spaceRequest), // TODO: give this a size based on tasks
runCreated: make(chan struct{}, 1),
run: &spaceRun{},
api: api,
ui: ui,
spaceID: spaceID,
enabled: false, // Start with disabled
requests: make(chan *spaceRequest), // TODO: give this a size based on tasks
runCreated: make(chan struct{}, 1),
run: &spaceRun{},
requestTimeout: 10 * time.Second,
}

if spaceID == "" {
Expand Down Expand Up @@ -159,9 +162,9 @@ func (c *spacesClient) makeRequest(req *spaceRequest) {
var resp []byte
var reqErr error
if req.method == "POST" {
resp, reqErr = c.api.JSONPost(req.url, payload)
resp, reqErr = c.api.JSONPost(req.url, payload, c.requestTimeout)
} else if req.method == "PATCH" {
resp, reqErr = c.api.JSONPatch(req.url, payload)
resp, reqErr = c.api.JSONPatch(req.url, payload, c.requestTimeout)
} else {
c.errors = append(c.errors, req.error("Unsupported request method"))
}
Expand Down