Skip to content

Commit

Permalink
Merge branch 'main' of github.com:DataDog/datadog-agent into pgimalac…
Browse files Browse the repository at this point in the history
…/stop-exclude-checks-exec-time
  • Loading branch information
pgimalac committed Feb 6, 2025
2 parents 61c382e + 9751798 commit 1ec26e6
Show file tree
Hide file tree
Showing 20 changed files with 218 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ func (c *CheckWrapper) ConfigSource() string {
return c.inner.ConfigSource()
}

// Loader returns the name of the check loader
func (c *CheckWrapper) Loader() string {
return c.inner.Loader()
}

// IsTelemetryEnabled implements Check#IsTelemetryEnabled
func (c *CheckWrapper) IsTelemetryEnabled() bool {
return c.inner.IsTelemetryEnabled()
Expand Down
1 change: 1 addition & 0 deletions comp/core/agenttelemetry/impl/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ var defaultProfiles = `
- name: checks.execution_time
aggregate_tags:
- check_name
- check_loader
- name: pymem.inuse
schedule:
start_after: 30
Expand Down
3 changes: 3 additions & 0 deletions pkg/collector/check/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type Check interface {
Cancel()
// String provides a printable version of the check name
String() string
// Loader returns the name of the check loader
// This is used in tags so should match the tag value format constraints (eg. lowercase, no spaces)
Loader() string
// Configure configures the check
Configure(senderManger sender.SenderManager, integrationConfigDigest uint64, config, initConfig integration.Data, source string) error
// Interval returns the interval time for the check
Expand Down
4 changes: 4 additions & 0 deletions pkg/collector/check/check_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
// MockInfo is a mock for test using check.Info interface
type MockInfo struct {
Name string
LoaderName string
CheckID checkid.ID
Source string
InitConf string
Expand All @@ -35,6 +36,9 @@ func (m MockInfo) Version() string { return "" }
// ConfigSource returns the source of the check
func (m MockInfo) ConfigSource() string { return m.Source }

// Loader returns the name of the check loader
func (m MockInfo) Loader() string { return m.LoaderName }

// InitConfig returns the init_config of the check
func (m MockInfo) InitConfig() string { return m.InitConf }

Expand Down
8 changes: 6 additions & 2 deletions pkg/collector/check/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var (
tlmHistogramBuckets = telemetry.NewCounter("checks", "histogram_buckets",
[]string{"check_name"}, "Histogram buckets count")
tlmExecutionTime = telemetry.NewGauge("checks", "execution_time",
[]string{"check_name"}, "Check execution time")
[]string{"check_name", "check_loader"}, "Check execution time")
tlmCheckDelay = telemetry.NewGauge("checks",
"delay",
[]string{"check_name"},
Expand Down Expand Up @@ -100,6 +100,7 @@ type Stats struct {
CheckName string
CheckVersion string
CheckConfigSource string
CheckLoader string
CheckID checkid.ID
Interval time.Duration
// LongRunning is true if the check is a long running check
Expand Down Expand Up @@ -143,13 +144,16 @@ type StatsCheck interface {
Interval() time.Duration
// ConfigSource returns the configuration source of the check
ConfigSource() string
// Loader returns the name of the check loader
Loader() string
}

// NewStats returns a new check stats instance
func NewStats(c StatsCheck) *Stats {
stats := Stats{
CheckID: c.ID(),
CheckName: c.String(),
CheckLoader: c.Loader(),
CheckVersion: c.Version(),
CheckConfigSource: c.ConfigSource(),
Interval: c.Interval(),
Expand Down Expand Up @@ -185,7 +189,7 @@ func (cs *Stats) Add(t time.Duration, err error, warnings []error, metricStats S
cs.ExecutionTimes[cs.TotalRuns%uint64(len(cs.ExecutionTimes))] = tms
cs.TotalRuns++
if cs.Telemetry {
tlmExecutionTime.Set(float64(tms), cs.CheckName)
tlmExecutionTime.Set(float64(tms), cs.CheckName, cs.CheckLoader)
}
var totalExecutionTime int64
ringSize := cs.TotalRuns
Expand Down
35 changes: 20 additions & 15 deletions pkg/collector/check/stats/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,41 @@ import (

// Mock Check implementation used for testing
type mockCheck struct {
cfgSource string
id checkid.ID
stringVal string
version string
interval time.Duration
cfgSource string
loaderName string
id checkid.ID
stringVal string
version string
interval time.Duration
}

// Mock Check interface implementation
func (mc *mockCheck) ConfigSource() string { return mc.cfgSource }
func (mc *mockCheck) Loader() string { return mc.loaderName }
func (mc *mockCheck) ID() checkid.ID { return mc.id }
func (mc *mockCheck) String() string { return mc.stringVal }
func (mc *mockCheck) Version() string { return mc.version }
func (mc *mockCheck) Interval() time.Duration { return mc.interval }

func newMockCheck() StatsCheck {
return &mockCheck{
cfgSource: "checkConfigSrc",
id: "checkID",
stringVal: "checkString",
version: "checkVersion",
interval: 15 * time.Second,
cfgSource: "checkConfigSrc",
id: "checkID",
stringVal: "checkString",
loaderName: "mockLoader",
version: "checkVersion",
interval: 15 * time.Second,
}
}

func newMockCheckWithInterval(interval time.Duration) StatsCheck {
return &mockCheck{
cfgSource: "checkConfigSrc",
id: "checkID",
stringVal: "checkString",
version: "checkVersion",
interval: interval,
cfgSource: "checkConfigSrc",
id: "checkID",
stringVal: "checkString",
loaderName: "mockloader",
version: "checkVersion",
interval: interval,
}
}

Expand All @@ -71,6 +75,7 @@ func TestNewStats(t *testing.T) {

assert.Equal(t, stats.CheckID, checkid.ID("checkID"))
assert.Equal(t, stats.CheckName, "checkString")
assert.Equal(t, stats.CheckLoader, "mockLoader")
assert.Equal(t, stats.CheckVersion, "checkVersion")
assert.Equal(t, stats.CheckVersion, "checkVersion")
assert.Equal(t, stats.CheckConfigSource, "checkConfigSrc")
Expand Down
3 changes: 3 additions & 0 deletions pkg/collector/check/stub/stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ func (c *StubCheck) Version() string { return "" }
// ConfigSource returns the empty string
func (c *StubCheck) ConfigSource() string { return "" }

// Loader returns a stubbed loader name
func (*StubCheck) Loader() string { return "stub" }

// Stop is a noop
func (c *StubCheck) Stop() {}

Expand Down
4 changes: 4 additions & 0 deletions pkg/collector/corechecks/checkbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ func (c *CheckBase) ConfigSource() string {
return c.source
}

func (*CheckBase) Loader() string {
return GoCheckLoaderName
}

// InitConfig returns the init_config configuration for the check.
func (c *CheckBase) InitConfig() string {
return c.initConfig
Expand Down
7 changes: 7 additions & 0 deletions pkg/collector/corechecks/embed/apm/apm.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/collector/check"
checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id"
"github.com/DataDog/datadog-agent/pkg/collector/check/stats"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks/embed/common"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
"github.com/DataDog/datadog-agent/pkg/config/utils"
Expand Down Expand Up @@ -70,6 +71,12 @@ func (c *APMCheck) ConfigSource() string {
return c.source
}

// Loader returns the check loader
func (*APMCheck) Loader() string {
// the apm check is scheduled by the Go loader
return corechecks.GoCheckLoaderName
}

// Run executes the check with retries
func (c *APMCheck) Run() error {
c.running.Store(true)
Expand Down
7 changes: 7 additions & 0 deletions pkg/collector/corechecks/embed/process/process_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/collector/check"
checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id"
"github.com/DataDog/datadog-agent/pkg/collector/check/stats"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks/embed/common"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
"github.com/DataDog/datadog-agent/pkg/config/utils"
Expand Down Expand Up @@ -70,6 +71,12 @@ func (c *ProcessAgentCheck) ConfigSource() string {
return c.source
}

// Loader returns the check loader
func (*ProcessAgentCheck) Loader() string {
// the process check is scheduled by the Go loader
return corechecks.GoCheckLoaderName
}

// InitConfig returns the init configuration
func (c *ProcessAgentCheck) InitConfig() string {
return c.initConfig
Expand Down
7 changes: 5 additions & 2 deletions pkg/collector/corechecks/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type CheckFactory func() check.Check
// Catalog keeps track of Go checks by name
var catalog = make(map[string]CheckFactory)

// GoCheckLoaderName is the name of the Go loader
const GoCheckLoaderName string = "core"

// RegisterCheck adds a check to the catalog
func RegisterCheck(name string, checkFactory option.Option[func() check.Check]) {
if v, ok := checkFactory.Get(); ok {
Expand All @@ -51,8 +54,8 @@ func NewGoCheckLoader() (*GoCheckLoader, error) {
}

// Name return returns Go loader name
func (gl *GoCheckLoader) Name() string {
return "core"
func (*GoCheckLoader) Name() string {
return GoCheckLoaderName
}

// Load returns a Go check
Expand Down
5 changes: 5 additions & 0 deletions pkg/collector/corechecks/longrunning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ func (m *mockLongRunningCheck) GetSender() (sender.Sender, error) {
return s.(sender.Sender), args.Error(1)
}

func (m *mockLongRunningCheck) Loader() string {
args := m.Called()
return args.String(0)
}

func newMockLongRunningCheck() *mockLongRunningCheck {
return &mockLongRunningCheck{
stopCh: make(chan struct{}),
Expand Down
16 changes: 11 additions & 5 deletions pkg/collector/corechecks/servicediscovery/module/impl_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
const (
pathServices = "/services"

heartbeatTime = 15 * time.Minute
// Use a low cache validity to ensure that we refresh information every time
// the check is run if needed. This is the same as cacheValidityNoRT in
// pkg/process/checks/container.go.
Expand Down Expand Up @@ -259,8 +258,15 @@ func (s *discovery) handleDebugEndpoint(w http.ResponseWriter, _ *http.Request)

// handleServers is the handler for the /services endpoint.
// Returns the list of currently running services.
func (s *discovery) handleServices(w http.ResponseWriter, _ *http.Request) {
services, err := s.getServices()
func (s *discovery) handleServices(w http.ResponseWriter, req *http.Request) {
params, err := parseParams(req.URL.Query())
if err != nil {
_ = log.Errorf("invalid params to /discovery%s: %v", pathServices, err)
w.WriteHeader(http.StatusBadRequest)
return
}

services, err := s.getServices(params)
if err != nil {
_ = log.Errorf("failed to handle /discovery%s: %v", pathServices, err)
w.WriteHeader(http.StatusInternalServerError)
Expand Down Expand Up @@ -846,7 +852,7 @@ func (s *discovery) handleStoppedServices(response *model.ServicesResponse, aliv
}

// getStatus returns the list of currently running services.
func (s *discovery) getServices() (*model.ServicesResponse, error) {
func (s *discovery) getServices(params params) (*model.ServicesResponse, error) {
s.mux.Lock()
defer s.mux.Unlock()

Expand Down Expand Up @@ -892,7 +898,7 @@ func (s *discovery) getServices() (*model.ServicesResponse, error) {
s.enrichContainerData(service, containersMap, pidToCid)

if _, ok := s.runningServices[pid]; ok {
if serviceHeartbeatTime := time.Unix(service.LastHeartbeat, 0); now.Sub(serviceHeartbeatTime).Truncate(time.Minute) >= heartbeatTime {
if serviceHeartbeatTime := time.Unix(service.LastHeartbeat, 0); now.Sub(serviceHeartbeatTime).Truncate(time.Minute) >= params.heartbeatTime {
service.LastHeartbeat = now.Unix()
response.HeartbeatServices = append(response.HeartbeatServices, *service)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,17 @@ func setupDiscoveryModule(t *testing.T) (string, *proccontainersmocks.MockContai
return srv.URL, mockContainerProvider, mTimeProvider
}

func getServices(t require.TestingT, url string) *model.ServicesResponse {
func getServicesWithParams(t require.TestingT, url string, params *params) *model.ServicesResponse {
location := url + "/" + string(config.DiscoveryModule) + pathServices
req, err := http.NewRequest(http.MethodGet, location, nil)
require.NoError(t, err)

if params != nil {
qp := req.URL.Query()
params.updateQuery(qp)
req.URL.RawQuery = qp.Encode()
}

resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
defer resp.Body.Close()
Expand All @@ -131,6 +137,10 @@ func getServices(t require.TestingT, url string) *model.ServicesResponse {
return res
}

func getServices(t require.TestingT, url string) *model.ServicesResponse {
return getServicesWithParams(t, url, nil)
}

func startTCPServer(t *testing.T, proto string, address string) (*os.File, *net.TCPAddr) {
listener, err := net.Listen(proto, address)
require.NoError(t, err)
Expand Down Expand Up @@ -1054,7 +1064,7 @@ func TestCache(t *testing.T) {
f.Close()

require.EventuallyWithT(t, func(collect *assert.CollectT) {
_, err = discovery.getServices()
_, err = discovery.getServices(defaultParams())
require.NoError(collect, err)

for _, cmd := range cmds {
Expand All @@ -1074,7 +1084,7 @@ func TestCache(t *testing.T) {
cmd.Wait()
}

_, err = discovery.getServices()
_, err = discovery.getServices(defaultParams())
require.NoError(t, err)

for _, cmd := range cmds {
Expand Down
55 changes: 55 additions & 0 deletions pkg/collector/corechecks/servicediscovery/module/params.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2025-present Datadog, Inc.

package module

import (
"net/url"
"strconv"
"time"
)

const (
heartbeatParam = "heartbeat"
heartbeatTime = 15 * time.Minute
)

type params struct {
heartbeatTime time.Duration
}

func defaultParams() params {
return params{
heartbeatTime: heartbeatTime,
}
}

func (params params) updateQuery(query url.Values) {
query.Set(heartbeatParam, strconv.Itoa(int(params.heartbeatTime.Seconds())))
}

func parseDuration(raw string) (time.Duration, error) {
val, err := strconv.Atoi(raw)
if err != nil {
return 0, err
}

return time.Duration(val) * time.Second, err
}

func parseParams(query url.Values) (params, error) {
params := defaultParams()

raw := query.Get(heartbeatParam)
if raw != "" {
heartbeat, err := parseDuration(raw)
if err != nil {
return params, err
}
params.heartbeatTime = heartbeat
}

return params, nil
}
Loading

0 comments on commit 1ec26e6

Please sign in to comment.