Skip to content

Commit

Permalink
healthcheck: update rack and dc on healthcheck metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
karol-kokoszka committed Aug 6, 2024
1 parent a75df89 commit 3cdd20d
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 24 deletions.
15 changes: 10 additions & 5 deletions pkg/service/healthcheck/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ func (r Runner) Run(ctx context.Context, clusterID, taskID, runID uuid.UUID, pro

type runner struct {
logger log.Logger
configCacher configcache.ConfigCacher
configCache configcache.ConfigCacher
scyllaClient scyllaclient.ProviderFunc
timeout time.Duration
metrics *runnerMetrics
ping func(ctx context.Context, clusterID uuid.UUID, host string, timeout time.Duration) (rtt time.Duration, err error)
ping func(ctx context.Context, clusterID uuid.UUID, host string, timeout time.Duration, nodeConf configcache.NodeConfig) (rtt time.Duration, err error)
pingAgent func(ctx context.Context, clusterID uuid.UUID, host string, timeout time.Duration) (rtt time.Duration, err error)
}

Expand All @@ -69,7 +69,7 @@ func (r runner) Run(ctx context.Context, clusterID, _, _ uuid.UUID, _ json.RawMe
// Enable interactive mode for fast backoff
ctx = scyllaclient.Interactive(ctx)

nodes, err := r.configCacher.AvailableHosts(ctx, clusterID)
nodes, err := r.configCache.AvailableHosts(ctx, clusterID)
if err != nil {
return err
}
Expand All @@ -81,12 +81,17 @@ func (r runner) Run(ctx context.Context, clusterID, _, _ uuid.UUID, _ json.RawMe

func (r runner) checkHosts(ctx context.Context, clusterID uuid.UUID, addresses []string) {
f := func(i int) error {
rtt := time.Duration(0)
ni, err := r.configCache.Read(clusterID, addresses[i])
if err == nil {
rtt, err = r.ping(ctx, clusterID, addresses[i], r.timeout, ni)
}
hl := prometheus.Labels{
clusterKey: clusterID.String(),
hostKey: addresses[i],
rackKey: ni.Rack,
dcKey: ni.Datacenter,
}

rtt, err := r.ping(ctx, clusterID, addresses[i], r.timeout)
if err != nil {
r.metrics.status.With(hl).Set(-1)
} else {
Expand Down
45 changes: 26 additions & 19 deletions pkg/service/healthcheck/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (s *Service) Runner() Runner {
return Runner{
cql: runner{
logger: s.logger.Named("CQL healthcheck"),
configCacher: s.configCache,
configCache: s.configCache,
scyllaClient: s.scyllaClient,
timeout: s.config.MaxTimeout,
metrics: &runnerMetrics{
Expand All @@ -69,7 +69,7 @@ func (s *Service) Runner() Runner {
},
rest: runner{
logger: s.logger.Named("REST healthcheck"),
configCacher: s.configCache,
configCache: s.configCache,
scyllaClient: s.scyllaClient,
timeout: s.config.MaxTimeout,
metrics: &runnerMetrics{
Expand All @@ -81,7 +81,7 @@ func (s *Service) Runner() Runner {
},
alternator: runner{
logger: s.logger.Named("Alternator healthcheck"),
configCacher: s.configCache,
configCache: s.configCache,
scyllaClient: s.scyllaClient,
timeout: s.config.MaxTimeout,
metrics: &runnerMetrics{
Expand Down Expand Up @@ -153,7 +153,12 @@ func (s *Service) parallelRESTPingFunc(ctx context.Context, clusterID uuid.UUID,
return nil
}

rtt, err := s.pingREST(ctx, clusterID, status[i].Addr, s.config.MaxTimeout)
rtt := time.Duration(0)
ni, err := s.configCache.Read(clusterID, status[i].Addr)
if err == nil {
rtt, err = s.pingREST(ctx, clusterID, status[i].Addr, s.config.MaxTimeout, ni)
}

o.RESTRtt = float64(rtt.Milliseconds())
if err != nil {
s.logger.Error(ctx, "REST ping failed",
Expand Down Expand Up @@ -194,7 +199,12 @@ func (s *Service) parallelCQLPingFunc(ctx context.Context, clusterID uuid.UUID,
return nil
}

rtt, err := s.pingCQL(ctx, clusterID, status[i].Addr, s.config.MaxTimeout)
rtt := time.Duration(0)
ni, err := s.configCache.Read(clusterID, status[i].Addr)
if err == nil {
rtt, err = s.pingCQL(ctx, clusterID, status[i].Addr, s.config.MaxTimeout, ni)
}

o.CQLRtt = float64(rtt.Milliseconds())
if err != nil {
s.logger.Error(ctx, "CQL ping failed",
Expand All @@ -220,9 +230,7 @@ func (s *Service) parallelCQLPingFunc(ctx context.Context, clusterID uuid.UUID,
o.CQLStatus = statusUp
}

ni, err := s.configCache.Read(clusterID, status[i].Addr)
if err != nil {
s.logger.Error(ctx, "Unable to fetch node information", "error", err)
if ni.NodeInfo == nil {
o.SSL = false
} else {
o.SSL = ni.CQLTLSConfig() != nil
Expand All @@ -245,7 +253,12 @@ func (s *Service) parallelAlternatorPingFunc(ctx context.Context, clusterID uuid
return nil
}

rtt, err := s.pingAlternator(ctx, clusterID, status[i].Addr, s.config.MaxTimeout)
rtt := time.Duration(0)
ni, err := s.configCache.Read(clusterID, status[i].Addr)
if err == nil {
rtt, err = s.pingAlternator(ctx, clusterID, status[i].Addr, s.config.MaxTimeout, ni)
}

if err != nil {
s.logger.Error(ctx, "Alternator ping failed",
"cluster_id", clusterID,
Expand Down Expand Up @@ -279,10 +292,8 @@ func (s *Service) parallelAlternatorPingFunc(ctx context.Context, clusterID uuid

// pingAlternator sends ping probe and returns RTT.
// When Alternator frontend is disabled, it returns 0 and nil error.
func (s *Service) pingAlternator(ctx context.Context, clusterID uuid.UUID, host string, timeout time.Duration) (rtt time.Duration, err error) {
ni, err := s.configCache.Read(clusterID, host)
// Proceed if we managed to get required information.
if err != nil && ni.NodeInfo == nil {
func (s *Service) pingAlternator(ctx context.Context, _ uuid.UUID, host string, timeout time.Duration, ni configcache.NodeConfig) (rtt time.Duration, err error) {
if ni.NodeInfo == nil {
return 0, errors.Wrap(err, "get node info")
}
if !ni.AlternatorEnabled() {
Expand Down Expand Up @@ -316,11 +327,7 @@ func (s *Service) decorateNodeStatus(status *NodeStatus, ni configcache.NodeConf
status.AgentVersion = ni.AgentVersion
}

func (s *Service) pingCQL(ctx context.Context, clusterID uuid.UUID, host string, timeout time.Duration) (rtt time.Duration, err error) {
ni, err := s.configCache.Read(clusterID, host)
if err != nil {
return 0, err
}
func (s *Service) pingCQL(ctx context.Context, clusterID uuid.UUID, host string, timeout time.Duration, ni configcache.NodeConfig) (rtt time.Duration, err error) {
// Try to connect directly to host address.
config := cqlping.Config{
Addr: ni.CQLAddr(host),
Expand All @@ -346,7 +353,7 @@ func (s *Service) pingCQL(ctx context.Context, clusterID uuid.UUID, host string,
return rtt, err
}

func (s *Service) pingREST(ctx context.Context, clusterID uuid.UUID, host string, timeout time.Duration) (time.Duration, error) {
func (s *Service) pingREST(ctx context.Context, clusterID uuid.UUID, host string, timeout time.Duration, _ configcache.NodeConfig) (time.Duration, error) {
client, err := s.scyllaClient(ctx, clusterID)
if err != nil {
return 0, errors.Wrapf(err, "get client for cluster with id %s", clusterID)
Expand Down

0 comments on commit 3cdd20d

Please sign in to comment.