From a215b77addb2d8008357def5d422c601a5a600e0 Mon Sep 17 00:00:00 2001 From: "k." Date: Thu, 3 Oct 2024 18:32:37 +0330 Subject: [PATCH] feat(log,metrics): add log and metrics modules. (#63) --- cmd/commands/run.go | 8 +++-- cmd/commands/utils.go | 4 +-- database/database.go | 3 ++ go.mod | 14 ++++++-- go.sum | 41 +++++++++++++++-------- metrics/metrics.go | 63 ++++++++++++++++++++++++++++++++++++ relay/relay.go | 10 +++++- server/http/server.go | 2 ++ server/websocket/server.go | 66 +++++++++++++++++++++++++++++++++++--- server/websocket/utils.go | 24 ++++++++++++++ 10 files changed, 209 insertions(+), 26 deletions(-) create mode 100644 metrics/metrics.go create mode 100644 server/websocket/utils.go diff --git a/cmd/commands/run.go b/cmd/commands/run.go index 13a7218..dafbd43 100644 --- a/cmd/commands/run.go +++ b/cmd/commands/run.go @@ -2,7 +2,7 @@ package commands import ( "errors" - "fmt" + "log" "os" "os/signal" "syscall" @@ -16,6 +16,8 @@ func HandleRun(args []string) { ExitOnError(errors.New("at least 1 arguments expected\nuse help command for more information")) } + log.Println("loading config...") + cfg, err := config.Load(args[2]) if err != nil { ExitOnError(err) @@ -33,13 +35,13 @@ func HandleRun(args []string) { select { case sig := <-sigChan: - fmt.Printf("Received signal: %s\nInitiating graceful shutdown...\n", sig.String()) //nolint + log.Printf("Received signal: %s\nInitiating graceful shutdown...\n", sig.String()) //nolint if err := r.Stop(); err != nil { ExitOnError(err) } case err := <-errCh: - fmt.Printf("Unexpected error: %v\nInitiating shutdown due to the error...\n", err) //nolint + log.Printf("Unexpected error: %v\nInitiating shutdown due to the error...\n", err) //nolint if err := r.Stop(); err != nil { ExitOnError(err) } diff --git a/cmd/commands/utils.go b/cmd/commands/utils.go index c32e113..b974b1d 100644 --- a/cmd/commands/utils.go +++ b/cmd/commands/utils.go @@ -1,11 +1,11 @@ package commands import ( - "fmt" + "log" "os" ) func ExitOnError(err error) { - fmt.Printf("immortal error: %s\n", err.Error()) //nolint + log.Printf("immortal error: %s\n", err.Error()) //nolint os.Exit(1) } diff --git a/database/database.go b/database/database.go index 0cfea1f..8f51611 100644 --- a/database/database.go +++ b/database/database.go @@ -2,6 +2,7 @@ package database import ( "context" + "log" "time" "go.mongodb.org/mongo-driver/mongo" @@ -47,5 +48,7 @@ func Connect(cfg Config) (*Database, error) { } func (db *Database) Stop() error { + log.Println("closing database connection...") + return db.Client.Disconnect(context.Background()) } diff --git a/go.mod b/go.mod index f868b61..f6e7f33 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/gorilla/websocket v1.5.3 github.com/joho/godotenv v1.5.1 github.com/mailru/easyjson v0.7.7 + github.com/prometheus/client_golang v1.20.4 github.com/stretchr/testify v1.9.0 github.com/tidwall/gjson v1.17.3 go.mongodb.org/mongo-driver v1.17.0 @@ -16,17 +17,23 @@ require ( ) require ( + github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.14.3 // indirect github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/crypto/blake256 v1.0.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/josharian/intern v1.0.0 // indirect - github.com/klauspost/compress v1.13.6 // indirect - github.com/kr/pretty v0.3.0 // indirect + github.com/klauspost/compress v1.17.9 // indirect + github.com/kr/text v0.2.0 // indirect github.com/montanaflynn/stats v0.7.1 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.55.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect @@ -35,6 +42,7 @@ require ( github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect golang.org/x/crypto v0.26.0 // indirect golang.org/x/sync v0.8.0 // indirect + golang.org/x/sys v0.23.0 // indirect golang.org/x/text v0.17.0 // indirect - gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect + google.golang.org/protobuf v1.34.2 // indirect ) diff --git a/go.sum b/go.sum index 8403de1..415dc18 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bits-and-blooms/bitset v1.10.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/bits-and-blooms/bitset v1.14.3 h1:Gd2c8lSNf9pKXom5JtD7AaKO8o7fGQ2LtFj1436qilA= github.com/bits-and-blooms/bitset v1.14.3/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= @@ -7,6 +9,8 @@ github.com/btcsuite/btcd/btcec/v2 v2.3.4 h1:3EJjcN70HCu/mwqlUsGK8GcNVyLVxFDlWurT github.com/btcsuite/btcd/btcec/v2 v2.3.4/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04= github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U= github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -26,23 +30,32 @@ github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= -github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= -github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= -github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= -github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/prometheus/client_golang v1.20.4 h1:Tgh3Yr67PaOv/uTqloMsCEdeuFTatm5zIq5+qNN23vI= +github.com/prometheus/client_golang v1.20.4/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= +github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc= +github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= +github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tidwall/gjson v1.17.3 h1:bwWLZU7icoKRG+C+0PNwIKC6FCJO/Q3p2pZvuP0jN94= @@ -81,6 +94,8 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= +golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -93,10 +108,10 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/metrics/metrics.go b/metrics/metrics.go new file mode 100644 index 0000000..ca5aa29 --- /dev/null +++ b/metrics/metrics.go @@ -0,0 +1,63 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type Metrics struct { + EventsTotal *prometheus.CounterVec + RequestsTotal *prometheus.CounterVec + MessagesTotal prometheus.Counter + Subscriptions prometheus.Gauge + Connections prometheus.Gauge + EventLaency prometheus.Histogram + RequestLatency prometheus.Histogram +} + +func New() *Metrics { + eventsT := promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "events_total", + Help: "number of events sent to the relay.", + }, []string{"status"}) + + reqsT := promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "requests_total", + Help: "number of REQ messages sent to relay.", + }, []string{"status"}) + + msgT := promauto.NewCounter(prometheus.CounterOpts{ + Name: "messages_total", + Help: "number of messages received.", + }) + + subs := promauto.NewGauge(prometheus.GaugeOpts{ + Name: "subscriptions", + Help: "number of open subscription.", + }) + + conns := promauto.NewGauge(prometheus.GaugeOpts{ + Name: "connections", + Help: "number of open websocket connections.", + }) + + eventL := promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "event_latency", + Help: "time needed to request to an EVENT message.", + }) + + reqL := promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "requset_latency", + Help: "time needed to request to a REQ message.", + }) + + return &Metrics{ + EventsTotal: eventsT, + Connections: conns, + MessagesTotal: msgT, + Subscriptions: subs, + RequestsTotal: reqsT, + EventLaency: eventL, + RequestLatency: reqL, + } +} diff --git a/relay/relay.go b/relay/relay.go index f20e811..ad4acc6 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -1,9 +1,12 @@ package relay import ( + "log" + "github.com/dezh-tech/immortal/config" "github.com/dezh-tech/immortal/database" "github.com/dezh-tech/immortal/handler" + "github.com/dezh-tech/immortal/metrics" "github.com/dezh-tech/immortal/server/http" "github.com/dezh-tech/immortal/server/websocket" ) @@ -30,7 +33,9 @@ func New(cfg *config.Config) (*Relay, error) { h := handler.New(db, cfg.Parameters.Handler) - ws, err := websocket.New(cfg.WebsocketServer, cfg.GetNIP11Documents(), h) + m := metrics.New() + + ws, err := websocket.New(cfg.WebsocketServer, cfg.GetNIP11Documents(), h, m) if err != nil { return nil, err } @@ -50,6 +55,7 @@ func New(cfg *config.Config) (*Relay, error) { // Start runs the relay and its children. func (r *Relay) Start() chan error { + log.Println("relay started successfully...") errCh := make(chan error, 2) go func() { @@ -69,6 +75,8 @@ func (r *Relay) Start() chan error { // Stop shutdowns the relay and its children gracefully. func (r *Relay) Stop() error { + log.Println("stopping relay...") + if err := r.websocketServer.Stop(); err != nil { return err } diff --git a/server/http/server.go b/server/http/server.go index e0a012e..d51f28b 100644 --- a/server/http/server.go +++ b/server/http/server.go @@ -8,6 +8,7 @@ import ( "github.com/dezh-tech/immortal/database" "github.com/gorilla/mux" + "github.com/prometheus/client_golang/prometheus/promhttp" ) type Server struct { @@ -25,6 +26,7 @@ func New(cfg Config, db *database.Database) (*Server, error) { } r.HandleFunc("/health", s.healthHandler).Methods("GET") + r.Handle("/metrics", promhttp.Handler()) return s, nil } diff --git a/server/websocket/server.go b/server/websocket/server.go index 07cc0ff..3c95fbd 100644 --- a/server/websocket/server.go +++ b/server/websocket/server.go @@ -3,6 +3,7 @@ package websocket import ( "encoding/json" "fmt" + "log" "net" "net/http" "os" @@ -11,6 +12,7 @@ import ( "github.com/bits-and-blooms/bloom/v3" "github.com/dezh-tech/immortal/handler" + "github.com/dezh-tech/immortal/metrics" "github.com/dezh-tech/immortal/types/filter" "github.com/dezh-tech/immortal/types/message" "github.com/dezh-tech/immortal/types/nip11" @@ -23,15 +25,19 @@ var upgrader = websocket.Upgrader{ // Server represents a websocket serer which keeps track of client connections and handle them. type Server struct { - knownEvents *bloom.BloomFilter + mu sync.RWMutex + config Config + knownEvents *bloom.BloomFilter conns map[*websocket.Conn]clientState - nip11Doc *nip11.RelayInformationDocument - mu sync.RWMutex handlers *handler.Handler + nip11Doc *nip11.RelayInformationDocument + metrics *metrics.Metrics } -func New(cfg Config, nip11Doc *nip11.RelayInformationDocument, h *handler.Handler) (*Server, error) { +func New(cfg Config, nip11Doc *nip11.RelayInformationDocument, + h *handler.Handler, m *metrics.Metrics, +) (*Server, error) { seb := bloom.NewWithEstimates(cfg.KnownBloomSize, 0.9) f, err := os.Open(cfg.BloomBackupPath) @@ -49,11 +55,14 @@ func New(cfg Config, nip11Doc *nip11.RelayInformationDocument, h *handler.Handle mu: sync.RWMutex{}, nip11Doc: nip11Doc, handlers: h, + metrics: m, }, nil } // Start starts a new server instance. func (s *Server) Start() error { + log.Println("websocket server started successfully...") + http.Handle("/", s) err := http.ListenAndServe(net.JoinHostPort(s.config.Bind, //nolint strconv.Itoa(int(s.config.Port))), nil) @@ -78,10 +87,15 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { } s.mu.Lock() + + log.Println("new websocket connection: ", conn.RemoteAddr().String()) + s.metrics.Connections.Inc() + s.conns[conn] = clientState{ subs: make(map[string]filter.Filters), RWMutex: &sync.RWMutex{}, } + s.mu.Unlock() s.readLoop(conn) @@ -94,7 +108,11 @@ func (s *Server) readLoop(conn *websocket.Conn) { if err != nil { // clean up closed connection. s.mu.Lock() + + s.metrics.Connections.Dec() + delete(s.conns, conn) + s.mu.Unlock() break @@ -107,6 +125,8 @@ func (s *Server) readLoop(conn *websocket.Conn) { continue } + s.metrics.MessagesTotal.Inc() + switch msg.Type() { case "REQ": go s.handleReq(conn, msg) @@ -124,11 +144,19 @@ func (s *Server) readLoop(conn *websocket.Conn) { func (s *Server) handleReq(conn *websocket.Conn, m message.Message) { s.mu.Lock() defer s.mu.Unlock() + defer measureLatency(s.metrics.RequestLatency)() + + status := success + defer func() { + s.metrics.RequestsTotal.WithLabelValues(status).Inc() + }() msg, ok := m.(*message.Req) if !ok { _ = conn.WriteMessage(1, message.MakeNotice("error: can't parse REQ message.")) + status = parseFail + return } @@ -136,6 +164,8 @@ func (s *Server) handleReq(conn *websocket.Conn, m message.Message) { _ = conn.WriteMessage(1, message.MakeNotice(fmt.Sprintf("error: max limit of filters is: %d", s.config.Limitation.MaxFilters))) + status = limitsFail + return } @@ -143,6 +173,8 @@ func (s *Server) handleReq(conn *websocket.Conn, m message.Message) { _ = conn.WriteMessage(1, message.MakeNotice(fmt.Sprintf("error: max limit of sub id is: %d", s.config.Limitation.MaxSubidLength))) + status = limitsFail + return } @@ -151,6 +183,8 @@ func (s *Server) handleReq(conn *websocket.Conn, m message.Message) { _ = conn.WriteMessage(1, message.MakeNotice(fmt.Sprintf("error: can't find connection %s", conn.RemoteAddr()))) + status = serverFail + return } @@ -158,12 +192,17 @@ func (s *Server) handleReq(conn *websocket.Conn, m message.Message) { _ = conn.WriteMessage(1, message.MakeNotice(fmt.Sprintf("error: max limit of subs is: %d", s.config.Limitation.MaxSubscriptions))) + status = limitsFail + return } res, err := s.handlers.HandleReq(msg.Filters) if err != nil { _ = conn.WriteMessage(1, message.MakeNotice(fmt.Sprintf("error: can't process REQ message: %s", err.Error()))) + status = databaseFail + + return } for _, e := range res { @@ -174,6 +213,7 @@ func (s *Server) handleReq(conn *websocket.Conn, m message.Message) { _ = conn.WriteMessage(1, message.MakeEOSE(msg.SubscriptionID)) client.Lock() + s.metrics.Subscriptions.Inc() client.subs[msg.SubscriptionID] = msg.Filters client.Unlock() } @@ -182,6 +222,12 @@ func (s *Server) handleReq(conn *websocket.Conn, m message.Message) { func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) { s.mu.Lock() defer s.mu.Unlock() + defer measureLatency(s.metrics.EventLaency)() + + status := success + defer func() { + s.metrics.EventsTotal.WithLabelValues(status).Inc() + }() msg, ok := m.(*message.Event) if !ok { @@ -191,6 +237,7 @@ func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) { ) _ = conn.WriteMessage(1, okm) + status = parseFail return } @@ -203,6 +250,8 @@ func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) { _ = conn.WriteMessage(1, okm) + status = limitsFail + return } @@ -223,6 +272,8 @@ func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) { _ = conn.WriteMessage(1, okm) + status = invalidFail + return } @@ -236,6 +287,8 @@ func (s *Server) handleEvent(conn *websocket.Conn, m message.Message) { _ = conn.WriteMessage(1, okm) + status = serverFail + return } @@ -278,6 +331,7 @@ func (s *Server) handleClose(conn *websocket.Conn, m message.Message) { } client.Lock() + s.metrics.Subscriptions.Dec() delete(client.subs, msg.String()) client.Unlock() } @@ -287,10 +341,13 @@ func (s *Server) Stop() error { s.mu.Lock() defer s.mu.Unlock() + log.Println("stopping websocket server...") + for wsConn, client := range s.conns { client.Lock() // close all subscriptions. for id := range client.subs { + s.metrics.Subscriptions.Dec() delete(client.subs, id) err := wsConn.WriteMessage(1, message.MakeClosed(id, "error: shutdown the relay.")) @@ -301,6 +358,7 @@ func (s *Server) Stop() error { } // close connection. + s.metrics.Connections.Dec() delete(s.conns, wsConn) err := wsConn.Close() if err != nil { diff --git a/server/websocket/utils.go b/server/websocket/utils.go new file mode 100644 index 0000000..c58cb99 --- /dev/null +++ b/server/websocket/utils.go @@ -0,0 +1,24 @@ +package websocket + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +const ( + success = "success" + databaseFail = "db_fail" + parseFail = "parse_fail" + limitsFail = "limits_fail" + serverFail = "server_fail" + invalidFail = "invalid_fail" +) + +func measureLatency(ht prometheus.Histogram) func() { + start := time.Now() + + return func() { + ht.Observe(time.Since(start).Seconds()) + } +}