Skip to content

Commit

Permalink
make analytics.Client more thread-safe (#283)
Browse files Browse the repository at this point in the history
It's still not fully thread-safe, since we depend on being able to
append properties to it in klothomain; but for individual sends, it is.

We do this by separating out the client from the payload. Each time we
send a new message, we create a new payload, populate it with default
stuff from the client, modify that paylod (which is thread-local) and
send it.

Also, add unit tests for this.
  • Loading branch information
Yuval Shavit authored Feb 23, 2023
1 parent 5861153 commit 5972792
Show file tree
Hide file tree
Showing 5 changed files with 261 additions and 87 deletions.
104 changes: 53 additions & 51 deletions pkg/analytics/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,23 @@ import (
)

type (
Payload struct {
UserId string `json:"id"`
Event string `json:"event"`
Source []byte `json:"source,omitempty"`
Properties map[string]any `json:"properties"`
}

Client struct {
UserId string `json:"id"`
Event string `json:"event"`
Source []byte `json:"source,omitempty"`
Properties map[string]interface{} `json:"properties,omitempty"`
serverUrlOverride string
userId string
universalProperties map[string]any
}

ErrorHandler interface {
PrintErr(err error)
}

LogLevel string
)

Expand All @@ -37,68 +45,54 @@ const (
const datadogLogLevel = "_logLevel"
const datadogStatus = "status"

func NewClient(properties map[string]interface{}) *Client {
func NewClient() *Client {
local := GetOrCreateAnalyticsFile()

client := &Client{
Properties: properties,
}
client := &Client{}
client.universalProperties = make(map[string]any)

// These will get validated in AttachAuthorizations
client.UserId = local.Id
client.Properties["validated"] = false
client.userId = local.Id
client.universalProperties["validated"] = false

client.Properties["localId"] = local.Id
client.universalProperties["localId"] = local.Id
if runUuid, err := uuid.NewRandom(); err == nil {
client.Properties["runId"] = runUuid.String()
client.universalProperties["runId"] = runUuid.String()
}

return client
}

func (t *Client) AttachAuthorizations(claims *auth.KlothoClaims) {
t.Properties["localId"] = t.UserId
t.UserId = claims.Email
t.Properties["validated"] = claims.EmailVerified
t.universalProperties["localId"] = t.userId
t.userId = claims.Email
t.universalProperties["validated"] = claims.EmailVerified
}

func (t *Client) Info(event string) {
t.Properties[datadogLogLevel] = Info
t.track(event)
}

func (t *Client) Debug(event string) {
t.Properties[datadogLogLevel] = Debug
t.track(event)
t.send(t.createPayload(Info, event))
}

func (t *Client) Warn(event string) {
t.Properties[datadogLogLevel] = Warn
t.Properties[datadogStatus] = Warn
t.track(event)
t.send(t.createPayload(Warn, event))
}

func (t *Client) Error(event string) {
t.Properties[datadogLogLevel] = Error
t.Properties[datadogStatus] = Error
t.track(event)
t.send(t.createPayload(Error, event))
}

func (t *Client) Panic(event string) {
t.Properties[datadogLogLevel] = Panic
// Using error since datadog does not support panic for the reserved status field
t.Properties[datadogStatus] = Error
t.track(event)
func (p *Payload) addError(err error) {
p.Properties["error"] = fmt.Sprintf("%+v", err)
}

func (t *Client) AppendProperties(properties map[string]interface{}) {
for k, v := range properties {
t.Properties[k] = v
t.AppendProperty(k, v)
}
}

func (t *Client) DeleteProperty(key string) {
delete(t.Properties, key)
func (t *Client) AppendProperty(key string, value any) {
t.universalProperties[key] = value
}

func (t *Client) UploadSource(source *core.InputFiles) {
Expand All @@ -107,28 +101,34 @@ func (t *Client) UploadSource(source *core.InputFiles) {
zap.S().Warnf("Failed to upload debug bundle. %v", err)
return
}
srcClient := &Client{
UserId: t.UserId,
Properties: t.Properties,
Source: data,
}
srcClient.Info("klotho uploading")
p := t.createPayload(Info, "klotho uploading")
p.Source = data
t.send(p)
}

func (t *Client) track(event string) {
t.Event = event
err := SendTrackingToServer(t)

if err != nil {
zap.L().Debug(fmt.Sprintf("Failed to send metrics info. %v", err))
func (t *Client) createPayload(level LogLevel, event string) Payload {
p := Payload{
UserId: t.userId,
Event: event,
Properties: make(map[string]any, len(t.universalProperties)+2),
}
for k, v := range t.universalProperties {
p.Properties[k] = v
}
p.Properties[datadogLogLevel] = level
if level == Panic {
p.Properties[datadogStatus] = Error // datadog doesn't support panic for the reserved status field
} else {
p.Properties[datadogStatus] = level
}
return p
}

// Hash hashes a value, using this analytic sender's UserId as a salt. It does not output anything or in any way modify the
// Hash hashes a value, using this analytic sender's userId as a salt. It does not output anything or in any way modify the
// sender's state.
func (t *Client) Hash(value any) string {
h := sha256.New()
h.Write([]byte(t.UserId)) // use this as a salt
h.Write([]byte(t.userId)) // use this as a salt
if json.NewEncoder(h).Encode(value) != nil {
return "unknown"
}
Expand All @@ -149,7 +149,9 @@ func (t *Client) PanicHandler(err *error, errHandler ErrorHandler) {
if _, hasStack := (*err).(interface{ StackTrace() errors.StackTrace }); !hasStack {
*err = errors.WithStack(*err)
}
t.Panic(rerr.Error())
p := t.createPayload(Error, "ERROR")
p.addError(rerr)
t.send(p)
errHandler.PrintErr(*err)
}
}
174 changes: 171 additions & 3 deletions pkg/analytics/client_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package analytics

import (
"encoding/json"
"fmt"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"net/http"
"net/http/httptest"
"testing"
)

Expand Down Expand Up @@ -56,14 +62,176 @@ func TestAnalytics_Hash(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
assert := assert.New(t)
analytics := &Client{
UserId: userId,
userId: userId,
}
actual := analytics.Hash(tt.given)
assert.Equal(tt.expect, actual)
})
}
}

type jsonConvertable struct {
Foo string `json:"foo"`
func TestAnalyticsSend(t *testing.T) {
cases := []struct {
name string
send func(client *Client)
expect []sentPayload
}{
{
name: "direct send at level info with properties",
send: func(c *Client) {
c.userId = "[email protected]"
c.AppendProperty("property_1", "aaa")
c.Info("hello world")
},
expect: []sentPayload{{
"id": "[email protected]",
"event": "hello world",
"properties": map[string]any{
"_logLevel": "info",
"status": "info",
"validated": false,
"property_1": "aaa",
},
}},
},
{
name: "two sends have isolated state",
send: func(c *Client) {
c.userId = "[email protected]"

payload1 := c.createPayload(Warn, "message 1")
payload1.Properties["hello"] = "world"

payload2 := c.createPayload(Info, "message 2")
payload2.Properties["hello"] = "goodbye"

c.send(payload1)
c.send(payload2)
},
expect: []sentPayload{
{
"id": "[email protected]",
"event": "message 1",
"properties": map[string]any{
"_logLevel": "warn",
"status": "warn",
"validated": false,
"hello": "world",
},
},
{
"id": "[email protected]",
"event": "message 2",
"properties": map[string]any{
"_logLevel": "info",
"status": "info",
"validated": false,
"hello": "goodbye",
},
},
},
},
{
name: "send via logger",
send: func(c *Client) {
c.userId = "[email protected]"
logger := zap.New(c.NewFieldListener(zapcore.WarnLevel))
logger.Error("first message", zap.Error(fmt.Errorf("my error")))
logger.Warn("second message") // no error field on this one!
},
expect: []sentPayload{
{
"id": "[email protected]",
"event": "ERROR",
"properties": map[string]any{
"_logLevel": "error",
"status": "error",
"validated": false,
"error": "my error",
},
},
{
"id": "[email protected]",
"event": "WARN",
"properties": map[string]any{
"_logLevel": "warn",
"status": "warn",
"validated": false,
},
},
},
},
}
for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
assert := assert.New(t)

handler := interactions{assert: assert}
for range tt.expect {
handler.interactions = append(handler.interactions, nil)
}

server := httptest.NewServer(&handler)
defer server.Close()

client := NewClient()
client.serverUrlOverride = server.URL

tt.send(client)
for i, receivedPayload := range handler.interactions {
expect := tt.expect[i]
if assert.NotNil(receivedPayload) {
// for properties we can't control, just assert that they exist, and then delete them.
// this is so that we don't have to set them on the expected
if properties, ok := receivedPayload["properties"].(map[string]any); ok {
for _, opaqueProperty := range []string{"localId", "runId"} {
assert.NotEmpty(properties[opaqueProperty])
delete(properties, opaqueProperty)
}
}

assert.Equal(expect, receivedPayload)
}
}
})
}
}

type (
sentPayload map[string]any
jsonConvertable struct {
Foo string `json:"foo"`
}

interactions struct {
assert *assert.Assertions
count int
interactions []sentPayload
}
)

func (s *interactions) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
if s.count >= len(s.interactions) {
s.assert.Fail("no interactions left")
w.WriteHeader(http.StatusInternalServerError)
return
}
defer func() { s.count += 1 }()

decoder := json.NewDecoder(r.Body)
body := sentPayload{}
if err := decoder.Decode(&body); !s.assert.NoError(err) {
return
}
s.interactions[s.count] = body

if s.assert.Equal(http.MethodPost, r.Method) && s.assert.Equal("/analytics/track", r.URL.RequestURI()) {
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte("ok"))
s.assert.NoError(err)
} else {
s.assert.Fail("no interactions left")
w.WriteHeader(http.StatusInternalServerError)
}
}
Loading

0 comments on commit 5972792

Please sign in to comment.