Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make analytics.Client more thread-safe #283

Merged
3 commits merged into from
Feb 23, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 50 additions & 53 deletions pkg/analytics/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,23 @@ import (
)

type (
Payload struct {
UserId string `json:"id"`
Event string `json:"event"`
Source []byte `json:"source,omitempty"`
Properties map[string]any `json:"properties"`
}

Client struct {
UserId string `json:"id"`
Event string `json:"event"`
Source []byte `json:"source,omitempty"`
Properties map[string]interface{} `json:"properties,omitempty"`
serverUrlOverride string
userId string
universalProperties map[string]any
}

ErrorHandler interface {
PrintErr(err error)
}

LogLevel string
)

Expand All @@ -37,98 +45,87 @@ const (
const datadogLogLevel = "_logLevel"
const datadogStatus = "status"

func NewClient(properties map[string]interface{}) *Client {
func NewClient() *Client {
local := GetOrCreateAnalyticsFile()

client := &Client{
Properties: properties,
}
client := &Client{}
client.universalProperties = make(map[string]any)

// These will get validated in AttachAuthorizations
client.UserId = local.Id
client.Properties["validated"] = false
client.userId = local.Id
client.universalProperties["validated"] = false

client.Properties["localId"] = local.Id
client.universalProperties["localId"] = local.Id
if runUuid, err := uuid.NewRandom(); err == nil {
client.Properties["runId"] = runUuid.String()
client.universalProperties["runId"] = runUuid.String()
}

return client
}

func (t *Client) AttachAuthorizations(claims *auth.KlothoClaims) {
t.Properties["localId"] = t.UserId
t.UserId = claims.Email
t.Properties["validated"] = claims.EmailVerified
t.universalProperties["localId"] = t.userId
t.userId = claims.Email
t.universalProperties["validated"] = claims.EmailVerified
}

func (t *Client) Info(event string) {
t.Properties[datadogLogLevel] = Info
t.track(event)
}

func (t *Client) Debug(event string) {
t.Properties[datadogLogLevel] = Debug
t.track(event)
t.Send(t.createPayload(Info, event))
}

func (t *Client) Warn(event string) {
t.Properties[datadogLogLevel] = Warn
t.Properties[datadogStatus] = Warn
t.track(event)
t.Send(t.createPayload(Warn, event))
}

func (t *Client) Error(event string) {
t.Properties[datadogLogLevel] = Error
t.Properties[datadogStatus] = Error
t.track(event)
t.Send(t.createPayload(Error, event))
}

func (t *Client) Panic(event string) {
t.Properties[datadogLogLevel] = Panic
// Using error since datadog does not support panic for the reserved status field
t.Properties[datadogStatus] = Error
t.track(event)
func (p *Payload) addError(err error) *Payload {
p.Properties["error"] = fmt.Sprintf("%+v", err)
return p
}

func (t *Client) AppendProperties(properties map[string]interface{}) {
for k, v := range properties {
t.Properties[k] = v
t.universalProperties[k] = v
}
}

func (t *Client) DeleteProperty(key string) {
delete(t.Properties, key)
}

func (t *Client) UploadSource(source *core.InputFiles) {
data, err := CompressFiles(source)
if err != nil {
zap.S().Warnf("Failed to upload debug bundle. %v", err)
return
}
srcClient := &Client{
UserId: t.UserId,
Properties: t.Properties,
Source: data,
}
srcClient.Info("klotho uploading")
p := t.createPayload(Info, "klotho uploading")
p.Source = data
t.Send(p)
}

func (t *Client) track(event string) {
t.Event = event
err := SendTrackingToServer(t)

if err != nil {
zap.L().Debug(fmt.Sprintf("Failed to send metrics info. %v", err))
func (t *Client) createPayload(level LogLevel, event string) *Payload {
p := &Payload{
UserId: t.userId,
Event: event,
Properties: make(map[string]any, len(t.universalProperties)+2),
}
for k, v := range t.universalProperties {
p.Properties[k] = v
}
p.Properties[datadogLogLevel] = level
if level == Panic {
p.Properties[datadogStatus] = Error // datadog doesn't support panic for the reserved status field
} else {
p.Properties[datadogStatus] = level
}
return p
}

// Hash hashes a value, using this analytic sender's UserId as a salt. It does not output anything or in any way modify the
// Hash hashes a value, using this analytic sender's userId as a salt. It does not output anything or in any way modify the
// sender's state.
func (t *Client) Hash(value any) string {
h := sha256.New()
h.Write([]byte(t.UserId)) // use this as a salt
h.Write([]byte(t.userId)) // use this as a salt
if json.NewEncoder(h).Encode(value) != nil {
return "unknown"
}
Expand All @@ -149,7 +146,7 @@ func (t *Client) PanicHandler(err *error, errHandler ErrorHandler) {
if _, hasStack := (*err).(interface{ StackTrace() errors.StackTrace }); !hasStack {
*err = errors.WithStack(*err)
}
t.Panic(rerr.Error())
t.Send(t.createPayload(Error, "ERROR").addError(rerr))
errHandler.PrintErr(*err)
}
}
154 changes: 151 additions & 3 deletions pkg/analytics/client_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package analytics

import (
"encoding/json"
"fmt"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"net/http"
"net/http/httptest"
"testing"
)

Expand Down Expand Up @@ -56,14 +62,156 @@ func TestAnalytics_Hash(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
assert := assert.New(t)
analytics := &Client{
UserId: userId,
userId: userId,
}
actual := analytics.Hash(tt.given)
assert.Equal(tt.expect, actual)
})
}
}

type jsonConvertable struct {
Foo string `json:"foo"`
func TestAnalyticsSend(t *testing.T) {
cases := []struct {
name string
send func(client *Client)
expect []sentPayload
}{
{
name: "direct send at level info with properties",
send: func(c *Client) {
c.userId = "[email protected]"
c.AppendProperties(map[string]any{"property_1": "aaa"})
c.Info("hello world")
},
expect: []sentPayload{{
"id": "[email protected]",
"event": "hello world",
"properties": map[string]any{
"_logLevel": "info",
"status": "info",
"validated": false,
"property_1": "aaa",
},
}},
},
{
name: "send via logger with no fields",
send: func(c *Client) {
c.userId = "[email protected]"
logger := zap.New(c.NewFieldListener(zapcore.WarnLevel))
logger.Warn("my message")
},
expect: []sentPayload{{
"id": "[email protected]",
"event": "WARN",
"properties": map[string]any{
"_logLevel": "warn",
"status": "warn",
"validated": false,
},
}},
},
{
name: "send via logger",
send: func(c *Client) {
c.userId = "[email protected]"
logger := zap.New(c.NewFieldListener(zapcore.WarnLevel))
logger.Error("first message", zap.Error(fmt.Errorf("my error")))
logger.Warn("second message") // no error field on this one!
},
expect: []sentPayload{
{
"id": "[email protected]",
"event": "ERROR",
"properties": map[string]any{
"_logLevel": "error",
"status": "error",
"validated": false,
"error": "my error",
},
},
{
"id": "[email protected]",
"event": "WARN",
"properties": map[string]any{
"_logLevel": "warn",
"status": "warn",
"validated": false,
},
},
},
},
}
for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
assert := assert.New(t)

handler := interactions{assert: assert}
for range tt.expect {
handler.interactions = append(handler.interactions, nil)
}

server := httptest.NewServer(&handler)
defer server.Close()

client := NewClient()
client.serverUrlOverride = server.URL

tt.send(client)
for i, receivedPayload := range handler.interactions {
expect := tt.expect[i]
if assert.NotNil(receivedPayload) {
// for properties we can't control, just assert that they exist, and then delete them.
// this is so that we don't have to set them on the expected
if properties, ok := receivedPayload["properties"].(map[string]any); ok {
for _, opaqueProperty := range []string{"localId", "runId"} {
assert.NotEmpty(properties[opaqueProperty])
delete(properties, opaqueProperty)
}
}

assert.Equal(expect, receivedPayload)
}
}
})
}
}

type (
sentPayload map[string]any
jsonConvertable struct {
Foo string `json:"foo"`
}

interactions struct {
assert *assert.Assertions
count int
interactions []sentPayload
}
)

func (s *interactions) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
if s.count >= len(s.interactions) {
s.assert.Fail("no interactions left")
w.WriteHeader(http.StatusInternalServerError)
return
}
defer func() { s.count += 1 }()

decoder := json.NewDecoder(r.Body)
body := sentPayload{}
if err := decoder.Decode(&body); !s.assert.NoError(err) {
return
}
s.interactions[s.count] = body

if s.assert.Equal(http.MethodPost, r.Method) && s.assert.Equal("/analytics/track", r.URL.RequestURI()) {
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte("ok"))
s.assert.NoError(err)
} else {
s.assert.Fail("no interactions left")
w.WriteHeader(http.StatusInternalServerError)
}
}
Loading