diff --git a/pkg/analytics/client.go b/pkg/analytics/client.go index 8bdcff337..46c03cdb0 100644 --- a/pkg/analytics/client.go +++ b/pkg/analytics/client.go @@ -14,15 +14,23 @@ import ( ) type ( + Payload struct { + UserId string `json:"id"` + Event string `json:"event"` + Source []byte `json:"source,omitempty"` + Properties map[string]any `json:"properties"` + } + Client struct { - UserId string `json:"id"` - Event string `json:"event"` - Source []byte `json:"source,omitempty"` - Properties map[string]interface{} `json:"properties,omitempty"` + serverUrlOverride string + userId string + universalProperties map[string]any } + ErrorHandler interface { PrintErr(err error) } + LogLevel string ) @@ -37,68 +45,54 @@ const ( const datadogLogLevel = "_logLevel" const datadogStatus = "status" -func NewClient(properties map[string]interface{}) *Client { +func NewClient() *Client { local := GetOrCreateAnalyticsFile() - client := &Client{ - Properties: properties, - } + client := &Client{} + client.universalProperties = make(map[string]any) // These will get validated in AttachAuthorizations - client.UserId = local.Id - client.Properties["validated"] = false + client.userId = local.Id + client.universalProperties["validated"] = false - client.Properties["localId"] = local.Id + client.universalProperties["localId"] = local.Id if runUuid, err := uuid.NewRandom(); err == nil { - client.Properties["runId"] = runUuid.String() + client.universalProperties["runId"] = runUuid.String() } return client } func (t *Client) AttachAuthorizations(claims *auth.KlothoClaims) { - t.Properties["localId"] = t.UserId - t.UserId = claims.Email - t.Properties["validated"] = claims.EmailVerified + t.universalProperties["localId"] = t.userId + t.userId = claims.Email + t.universalProperties["validated"] = claims.EmailVerified } func (t *Client) Info(event string) { - t.Properties[datadogLogLevel] = Info - t.track(event) -} - -func (t *Client) Debug(event string) { - t.Properties[datadogLogLevel] = Debug - t.track(event) + t.send(t.createPayload(Info, event)) } func (t *Client) Warn(event string) { - t.Properties[datadogLogLevel] = Warn - t.Properties[datadogStatus] = Warn - t.track(event) + t.send(t.createPayload(Warn, event)) } func (t *Client) Error(event string) { - t.Properties[datadogLogLevel] = Error - t.Properties[datadogStatus] = Error - t.track(event) + t.send(t.createPayload(Error, event)) } -func (t *Client) Panic(event string) { - t.Properties[datadogLogLevel] = Panic - // Using error since datadog does not support panic for the reserved status field - t.Properties[datadogStatus] = Error - t.track(event) +func (p *Payload) addError(err error) { + p.Properties["error"] = fmt.Sprintf("%+v", err) } func (t *Client) AppendProperties(properties map[string]interface{}) { for k, v := range properties { - t.Properties[k] = v + t.AppendProperty(k, v) } } -func (t *Client) DeleteProperty(key string) { - delete(t.Properties, key) +func (t *Client) AppendProperty(key string, value any) { + t.universalProperties[key] = value } func (t *Client) UploadSource(source *core.InputFiles) { @@ -107,28 +101,34 @@ func (t *Client) UploadSource(source *core.InputFiles) { zap.S().Warnf("Failed to upload debug bundle. %v", err) return } - srcClient := &Client{ - UserId: t.UserId, - Properties: t.Properties, - Source: data, - } - srcClient.Info("klotho uploading") + p := t.createPayload(Info, "klotho uploading") + p.Source = data + t.send(p) } -func (t *Client) track(event string) { - t.Event = event - err := SendTrackingToServer(t) - - if err != nil { - zap.L().Debug(fmt.Sprintf("Failed to send metrics info. %v", err)) +func (t *Client) createPayload(level LogLevel, event string) Payload { + p := Payload{ + UserId: t.userId, + Event: event, + Properties: make(map[string]any, len(t.universalProperties)+2), + } + for k, v := range t.universalProperties { + p.Properties[k] = v + } + p.Properties[datadogLogLevel] = level + if level == Panic { + p.Properties[datadogStatus] = Error // datadog doesn't support panic for the reserved status field + } else { + p.Properties[datadogStatus] = level } + return p } -// Hash hashes a value, using this analytic sender's UserId as a salt. It does not output anything or in any way modify the +// Hash hashes a value, using this analytic sender's userId as a salt. It does not output anything or in any way modify the // sender's state. func (t *Client) Hash(value any) string { h := sha256.New() - h.Write([]byte(t.UserId)) // use this as a salt + h.Write([]byte(t.userId)) // use this as a salt if json.NewEncoder(h).Encode(value) != nil { return "unknown" } @@ -149,7 +149,9 @@ func (t *Client) PanicHandler(err *error, errHandler ErrorHandler) { if _, hasStack := (*err).(interface{ StackTrace() errors.StackTrace }); !hasStack { *err = errors.WithStack(*err) } - t.Panic(rerr.Error()) + p := t.createPayload(Error, "ERROR") + p.addError(rerr) + t.send(p) errHandler.PrintErr(*err) } } diff --git a/pkg/analytics/client_test.go b/pkg/analytics/client_test.go index 83e643605..4af0f2f7f 100644 --- a/pkg/analytics/client_test.go +++ b/pkg/analytics/client_test.go @@ -1,7 +1,13 @@ package analytics import ( + "encoding/json" + "fmt" "github.com/stretchr/testify/assert" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "net/http" + "net/http/httptest" "testing" ) @@ -56,7 +62,7 @@ func TestAnalytics_Hash(t *testing.T) { t.Run(tt.name, func(t *testing.T) { assert := assert.New(t) analytics := &Client{ - UserId: userId, + userId: userId, } actual := analytics.Hash(tt.given) assert.Equal(tt.expect, actual) @@ -64,6 +70,168 @@ func TestAnalytics_Hash(t *testing.T) { } } -type jsonConvertable struct { - Foo string `json:"foo"` +func TestAnalyticsSend(t *testing.T) { + cases := []struct { + name string + send func(client *Client) + expect []sentPayload + }{ + { + name: "direct send at level info with properties", + send: func(c *Client) { + c.userId = "my-user@klo.dev" + c.AppendProperty("property_1", "aaa") + c.Info("hello world") + }, + expect: []sentPayload{{ + "id": "my-user@klo.dev", + "event": "hello world", + "properties": map[string]any{ + "_logLevel": "info", + "status": "info", + "validated": false, + "property_1": "aaa", + }, + }}, + }, + { + name: "two sends have isolated state", + send: func(c *Client) { + c.userId = "my-user@klo.dev" + + payload1 := c.createPayload(Warn, "message 1") + payload1.Properties["hello"] = "world" + + payload2 := c.createPayload(Info, "message 2") + payload2.Properties["hello"] = "goodbye" + + c.send(payload1) + c.send(payload2) + }, + expect: []sentPayload{ + { + "id": "my-user@klo.dev", + "event": "message 1", + "properties": map[string]any{ + "_logLevel": "warn", + "status": "warn", + "validated": false, + "hello": "world", + }, + }, + { + "id": "my-user@klo.dev", + "event": "message 2", + "properties": map[string]any{ + "_logLevel": "info", + "status": "info", + "validated": false, + "hello": "goodbye", + }, + }, + }, + }, + { + name: "send via logger", + send: func(c *Client) { + c.userId = "my-user@klo.dev" + logger := zap.New(c.NewFieldListener(zapcore.WarnLevel)) + logger.Error("first message", zap.Error(fmt.Errorf("my error"))) + logger.Warn("second message") // no error field on this one! + }, + expect: []sentPayload{ + { + "id": "my-user@klo.dev", + "event": "ERROR", + "properties": map[string]any{ + "_logLevel": "error", + "status": "error", + "validated": false, + "error": "my error", + }, + }, + { + "id": "my-user@klo.dev", + "event": "WARN", + "properties": map[string]any{ + "_logLevel": "warn", + "status": "warn", + "validated": false, + }, + }, + }, + }, + } + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + assert := assert.New(t) + + handler := interactions{assert: assert} + for range tt.expect { + handler.interactions = append(handler.interactions, nil) + } + + server := httptest.NewServer(&handler) + defer server.Close() + + client := NewClient() + client.serverUrlOverride = server.URL + + tt.send(client) + for i, receivedPayload := range handler.interactions { + expect := tt.expect[i] + if assert.NotNil(receivedPayload) { + // for properties we can't control, just assert that they exist, and then delete them. + // this is so that we don't have to set them on the expected + if properties, ok := receivedPayload["properties"].(map[string]any); ok { + for _, opaqueProperty := range []string{"localId", "runId"} { + assert.NotEmpty(properties[opaqueProperty]) + delete(properties, opaqueProperty) + } + } + + assert.Equal(expect, receivedPayload) + } + } + }) + } +} + +type ( + sentPayload map[string]any + jsonConvertable struct { + Foo string `json:"foo"` + } + + interactions struct { + assert *assert.Assertions + count int + interactions []sentPayload + } +) + +func (s *interactions) ServeHTTP(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + if s.count >= len(s.interactions) { + s.assert.Fail("no interactions left") + w.WriteHeader(http.StatusInternalServerError) + return + } + defer func() { s.count += 1 }() + + decoder := json.NewDecoder(r.Body) + body := sentPayload{} + if err := decoder.Decode(&body); !s.assert.NoError(err) { + return + } + s.interactions[s.count] = body + + if s.assert.Equal(http.MethodPost, r.Method) && s.assert.Equal("/analytics/track", r.URL.RequestURI()) { + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte("ok")) + s.assert.NoError(err) + } else { + s.assert.Fail("no interactions left") + w.WriteHeader(http.StatusInternalServerError) + } } diff --git a/pkg/analytics/fields_listener.go b/pkg/analytics/fields_listener.go index fa96eaf26..83e2f11bc 100644 --- a/pkg/analytics/fields_listener.go +++ b/pkg/analytics/fields_listener.go @@ -45,38 +45,37 @@ func (fl *fieldListener) Write(entry zapcore.Entry, fields []zapcore.Field) erro allFields = append(allFields, fl.fields...) allFields = append(allFields, fields...) - message := entry.Level.CapitalString() + logLevel := Warn + switch entry.Level { + case zapcore.DebugLevel: + logLevel = Debug + case zapcore.InfoLevel: + logLevel = Info + case zapcore.WarnLevel: + logLevel = Warn + case zapcore.ErrorLevel: + logLevel = Error + case zapcore.DPanicLevel, zapcore.PanicLevel, zapcore.FatalLevel: + logLevel = Panic + } + + p := fl.client.createPayload(logLevel, entry.Level.CapitalString()) for k, v := range logging.SanitizeFields(allFields, fl.client.Hash) { - property := fmt.Sprintf("log.%s", k) - fl.client.Properties[property] = v - defer func() { fl.client.DeleteProperty(property) }() + p.Properties[fmt.Sprintf("log.%s", k)] = v } for _, f := range allFields { if f.Key == logging.EntryMessageField { - message += (" " + entry.Message) + p.Event += (" " + entry.Message) } if err, isError := f.Interface.(error); isError { - fl.client.Properties["error"] = fmt.Sprintf("%+v", err) - defer fl.client.DeleteProperty("error") + p.addError(err) } } - switch entry.Level { - case zapcore.DebugLevel: - fl.client.Debug(message) - case zapcore.InfoLevel: - fl.client.Info(message) - case zapcore.WarnLevel: - fl.client.Warn(message) - case zapcore.ErrorLevel: - fl.client.Error(message) - case zapcore.DPanicLevel, zapcore.PanicLevel, zapcore.FatalLevel: - fl.client.Panic(message) - default: - fl.client.Warn(message) // shouldn't happen, but just to be safe - } + fl.client.send(p) + return nil } diff --git a/pkg/analytics/tracking_utils.go b/pkg/analytics/tracking_utils.go index 41c06d8fe..712855ffb 100644 --- a/pkg/analytics/tracking_utils.go +++ b/pkg/analytics/tracking_utils.go @@ -5,6 +5,7 @@ import ( "bytes" "encoding/json" "fmt" + "go.uber.org/zap" "math" "net/http" "time" @@ -12,23 +13,26 @@ import ( "github.com/klothoplatform/klotho/pkg/core" ) -var kloServerUrl = "http://srv.klo.dev" +const kloServerUrl = "http://srv.klo.dev" type AnalyticsFile struct { Id string } -func SendTrackingToServer(bundle *Client) error { - postBody, _ := json.Marshal(bundle) +func (t *Client) send(payload Payload) { + postBody, _ := json.Marshal(payload) data := bytes.NewBuffer(postBody) - resp, err := http.Post(fmt.Sprintf("%v/analytics/track", kloServerUrl), "application/json", data) - if err != nil { - return err + url := t.serverUrlOverride + if url == "" { + url = kloServerUrl } + resp, err := http.Post(fmt.Sprintf("%s/analytics/track", url), "application/json", data) - defer resp.Body.Close() - - return nil + if err != nil { + zap.L().Debug(fmt.Sprintf("Failed to send metrics info. %v", err)) + return + } + resp.Body.Close() } func CompressFiles(input *core.InputFiles) ([]byte, error) { diff --git a/pkg/cli/klothomain.go b/pkg/cli/klothomain.go index 3dcf87c5c..725b46d9a 100644 --- a/pkg/cli/klothomain.go +++ b/pkg/cli/klothomain.go @@ -209,7 +209,8 @@ func (km KlothoMain) run(cmd *cobra.Command, args []string) (err error) { } // Set up analytics, and hook them up to the logs - analyticsClient := analytics.NewClient(map[string]any{ + analyticsClient := analytics.NewClient() + analyticsClient.AppendProperties(map[string]any{ "version": km.Version, "strict": cfg.strict, "edition": km.DefaultUpdateStream, @@ -255,7 +256,7 @@ func (km KlothoMain) run(cmd *cobra.Command, args []string) (err error) { defer analyticsClient.PanicHandler(&err, errHandler) updateStream := options.Update.Stream.OrDefault(km.DefaultUpdateStream) - analyticsClient.Properties["updateStream"] = updateStream + analyticsClient.AppendProperty("updateStream", updateStream) if cfg.version { var versionQualifier string @@ -267,7 +268,7 @@ func (km KlothoMain) run(cmd *cobra.Command, args []string) (err error) { } klothoName := "klotho" if km.VersionQualifier != "" { - analyticsClient.Properties[km.VersionQualifier] = true + analyticsClient.AppendProperty(km.VersionQualifier, true) } // if update is specified do the update in place