Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: add prometheus exporter #1

Merged
merged 1 commit into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@

# Editor specifics
.vscode
__debug_bin
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ language: go
go:
- 1.16.x
- 1.17
- 1.20
install:
- go get -t ./...
- ./scripts/install-checks.sh
Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.17.0-alpine3.14 as builder
FROM golang:1.20.2-alpine3.14 as builder

LABEL maintainer="Samuel Jirenius <[email protected]>"

Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile.alpine
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.17.0-alpine3.14 as builder
FROM golang:1.20.2-alpine3.14 as builder

LABEL maintainer="Samuel Jirenius <[email protected]>"

Expand Down
22 changes: 18 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,14 +1,28 @@
module github.com/resgateio/resgate

go 1.13
go 1.20

require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/gorilla/websocket v1.4.2
github.com/jirenius/timerqueue v1.0.0
github.com/nats-io/nats-server/v2 v2.6.6 // indirect
github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc
github.com/posener/wstest v1.2.0
github.com/prometheus/client_golang v1.14.0
github.com/rs/xid v1.3.0
google.golang.org/protobuf v1.27.1 // indirect
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/nats-io/nats-server/v2 v2.6.6 // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
google.golang.org/protobuf v1.28.1 // indirect
)
455 changes: 446 additions & 9 deletions go.sum

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Server Options:
-w, --wspath <path> WebSocket path for clients (default: /)
-a, --apipath <path> Web resource path for clients (default: /api/)
-r, --reqtimeout <milliseconds> Timeout duration for NATS requests (default: 3000)
-m, --metricsport <port> HTTP port for prometheus metrics connections, disable if not set
-u, --headauth <method> Resource method for header authentication
--apiencoding <type> Encoding for web resources: json, jsonflat (default: json)
--putmethod <methodName> Call method name mapped to HTTP PUT requests
Expand Down Expand Up @@ -126,6 +127,7 @@ func (c *Config) Init(fs *flag.FlagSet, args []string) {
configFile string
port uint
headauth string
metricsport uint
addr string
natsRootCAs StringSlice
debugTrace bool
Expand All @@ -151,6 +153,8 @@ func (c *Config) Init(fs *flag.FlagSet, args []string) {
fs.StringVar(&c.APIPath, "apipath", "", "Web resource path for clients.")
fs.StringVar(&headauth, "u", "", "Resource method for header authentication.")
fs.StringVar(&headauth, "headauth", "", "Resource method for header authentication.")
fs.UintVar(&metricsport, "m", 0, "HTTP port for prometheus metrics connections, disable if not set.")
fs.UintVar(&metricsport, "metricsport", 0, "HTTP port for prometheus metrics connections, disable if not set.")
fs.BoolVar(&c.TLS, "tls", false, "Enable TLS for HTTP.")
fs.StringVar(&c.TLSCert, "tlscert", "", "HTTP server certificate file.")
fs.StringVar(&c.TLSKey, "tlskey", "", "Private key for HTTP server certificate.")
Expand Down Expand Up @@ -184,6 +188,15 @@ func (c *Config) Init(fs *flag.FlagSet, args []string) {
printAndDie(fmt.Sprintf(`Invalid port "%d": must be less than 65536`, port), true)
}

if metricsport != 0 {
if metricsport >= 1<<16 {
printAndDie(fmt.Sprintf(`Invalid metrics port "%d": must be less than 65536`, metricsport), true)
}
if metricsport == port {
printAndDie(fmt.Sprintf(`Invalid metrics port "%d": must be different from API port ("%d")`, metricsport, port), true)
}
}

if showHelp {
usage()
}
Expand Down Expand Up @@ -216,6 +229,9 @@ func (c *Config) Init(fs *flag.FlagSet, args []string) {
if port > 0 {
c.Port = uint16(port)
}
if metricsport > 0 {
c.MetricsPort = uint16(metricsport)
}

// Helper function to set string pointers to nil if empty.
setString := func(v string, s **string) {
Expand Down
49 changes: 49 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package metrics

import (
"regexp"

"github.com/prometheus/client_golang/prometheus"
)

var (
uUIDRegex = regexp.MustCompile("[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}")
iDRegex = regexp.MustCompile("[0-9]+")
)

var (
// SubcriptionsCount number of subscriptions per sanitized name
SubcriptionsCount = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "resgate",
Subsystem: "cache",
Name: "subscriptions",
Help: "Number of subscriptions per sanitized name",
}, []string{"name"})
// NATSConnected status of NATS connection
NATSConnected = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "resgate",
Subsystem: "nats",
Name: "connected",
Help: "Status of NATS connection",
}, []string{"host"})
// WSStablishedConnections number of stablished websocket connections
WSStablishedConnections = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "resgate",
Subsystem: "ws",
Name: "stablished_connections",
Help: "Number of stablished websocket connections",
})
)

// RegisterMetrics register all the defined metrics so they can be populated and consumed.
func RegisterMetrics() {
prometheus.MustRegister(SubcriptionsCount)
prometheus.MustRegister(NATSConnected)
prometheus.MustRegister(WSStablishedConnections)
}

func SanitizedString(s string) string {
s = uUIDRegex.ReplaceAllString(s, "{uuid}")
s = iDRegex.ReplaceAllString(s, "{id}")
return s
}
4 changes: 4 additions & 0 deletions nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/jirenius/timerqueue"
nats "github.com/nats-io/nats.go"
"github.com/resgateio/resgate/logger"
"github.com/resgateio/resgate/metrics"
"github.com/resgateio/resgate/server/mq"
)

Expand Down Expand Up @@ -104,6 +105,8 @@ func (c *Client) Connect() error {
c.tq = timerqueue.New(c.onTimeout, c.RequestTimeout)
c.stopped = make(chan struct{})

metrics.NATSConnected.WithLabelValues(c.mq.ConnectedClusterName()).Set(1)

go c.listener(c.mqCh, c.stopped)

return nil
Expand Down Expand Up @@ -171,6 +174,7 @@ func (c *Client) onClose(conn *nats.Conn) {
if c.closeHandler != nil {
err := conn.LastError()
c.closeHandler(fmt.Errorf("lost NATS connection: %s", err))
metrics.NATSConnected.WithLabelValues(c.mq.ConnectedClusterName()).Set(0)
}
}

Expand Down
3 changes: 3 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Config struct {
Port uint16 `json:"port"`
WSPath string `json:"wsPath"`
APIPath string `json:"apiPath"`
MetricsPort uint16 `json:"metricsPort"`
APIEncoding string `json:"apiEncoding"`
HeaderAuth *string `json:"headerAuth"`
AllowOrigin *string `json:"allowOrigin"`
Expand All @@ -38,6 +39,7 @@ type Config struct {

scheme string
netAddr string
metricsNetAddr string
headerAuthRID string
headerAuthAction string
allowOrigin []string
Expand Down Expand Up @@ -102,6 +104,7 @@ func (c *Config) prepare() error {
} else {
c.netAddr = DefaultAddr
}
c.metricsNetAddr = c.netAddr + fmt.Sprintf(":%d", c.MetricsPort)
c.netAddr += fmt.Sprintf(":%d", c.Port)

if c.HeaderAuth != nil {
Expand Down
77 changes: 77 additions & 0 deletions server/metricsServer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package server

import (
"context"
"net"
"net/http"
"time"

"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/resgateio/resgate/metrics"
)

func (s *Service) initMetricsServer() {
}

// startMetricsServer initializes the server and starts a goroutine with a prometheus metrics server
func (s *Service) startMetricsServer() {
if s.cfg.MetricsPort == 0 {
return
}

metrics.RegisterMetrics()

mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())

hln, err := net.Listen("tcp", s.cfg.metricsNetAddr)
if err != nil {
s.Logf("Metrics server can't listin on %s", s.cfg.metricsNetAddr)
return
}

metricsServer := &http.Server{
Handler: mux,
}
s.m = metricsServer

s.Logf("Metrics endpoint listening on %s://%s", s.cfg.scheme, s.cfg.metricsNetAddr)

go func() {
var err error
if s.cfg.TLS {
err = s.m.ServeTLS(hln, s.cfg.TLSCert, s.cfg.TLSKey)
} else {
err = s.m.Serve(hln)
}

if err != nil {
s.Stop(err)
}
}()

}

// stopMetricsServer stops the Metrics server
func (s *Service) stopMetricsServer() {
s.mu.Lock()
defer s.mu.Unlock()

if s.m == nil {
return
}

s.Debugf("Stopping Metrics server...")

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

s.m.Shutdown(ctx)
s.m = nil

if ctx.Err() == context.DeadlineExceeded {
s.Errorf("Metrics server forcefully stopped after timeout")
} else {
s.Debugf("Metrics server gracefully stopped")
}
}
4 changes: 4 additions & 0 deletions server/rescache/eventSubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rescache
import (
"sync"

"github.com/resgateio/resgate/metrics"
"github.com/resgateio/resgate/server/codec"
"github.com/resgateio/resgate/server/mq"
"github.com/resgateio/resgate/server/reserr"
Expand Down Expand Up @@ -108,6 +109,7 @@ func (e *EventSubscription) addSubscriber(sub Subscriber, t *Throttle) {
// An error occurred during request
case stateError:
e.count--
metrics.SubcriptionsCount.WithLabelValues(metrics.SanitizedString(e.ResourceName)).Set(float64(e.count))
e.mu.Unlock()
defer e.mu.Lock()
sub.Loaded(nil, rs.err)
Expand Down Expand Up @@ -212,6 +214,7 @@ func (e *EventSubscription) addCount() {
e.cache.unsubQueue.Remove(e)
}
e.count++
metrics.SubcriptionsCount.WithLabelValues(metrics.SanitizedString(e.ResourceName)).Set(float64(e.count))
}

// removeCount decreases the subscription count, and puts the event subscription
Expand All @@ -221,6 +224,7 @@ func (e *EventSubscription) removeCount(n int64) {
if e.count == 0 && n != 0 {
e.cache.unsubQueue.Add(e)
}
metrics.SubcriptionsCount.WithLabelValues(metrics.SanitizedString(e.ResourceName)).Set(float64(e.count))
}

func (e *EventSubscription) enqueueEvent(subj string, payload []byte) {
Expand Down
3 changes: 3 additions & 0 deletions server/rescache/rescache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/jirenius/timerqueue"
"github.com/resgateio/resgate/logger"
"github.com/resgateio/resgate/metrics"
"github.com/resgateio/resgate/server/codec"
"github.com/resgateio/resgate/server/mq"
"github.com/resgateio/resgate/server/reserr"
Expand Down Expand Up @@ -229,6 +230,7 @@ func (c *Cache) AddConn(conn Conn) {
defer c.mu.Unlock()

c.conns[conn.CID()] = conn
metrics.WSStablishedConnections.Set(float64(len(c.conns)))
}

// RemoveConn removes a connection listening to events.
Expand All @@ -237,6 +239,7 @@ func (c *Cache) RemoveConn(conn Conn) {
defer c.mu.Unlock()

delete(c.conns, conn.CID())
metrics.WSStablishedConnections.Set(float64(len(c.conns)))
}

// getSubscription returns the existing eventSubscription after adding its count, or creates a new
Expand Down
7 changes: 7 additions & 0 deletions server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type Service struct {
enc APIEncoder
mimetype string

// metrics httpServer
m *http.Server

// wsListener/wsConn
upgrader websocket.Upgrader
conns map[string]*wsConn // Connections by wsConn Id's
Expand All @@ -45,6 +48,7 @@ func NewService(mq mq.Client, cfg Config) (*Service, error) {
if err := s.cfg.prepare(); err != nil {
return nil, err
}
s.initMetricsServer()
s.initHTTPServer()
s.initWSHandler()
s.initMQClient()
Expand Down Expand Up @@ -121,6 +125,8 @@ func (s *Service) start() error {
return err
}

s.startMetricsServer()

s.startHTTPServer()
s.Logf("Server ready")

Expand All @@ -142,6 +148,7 @@ func (s *Service) Stop(err error) {
}
s.Logf("Stopping server...")

s.stopMetricsServer()
s.stopWSHandler()
s.stopHTTPServer()
s.stopMQClient()
Expand Down