Skip to content

Commit

Permalink
in buffer batching with perf buffers for NPM (#31402)
Browse files Browse the repository at this point in the history
Co-authored-by: usamasaqib <[email protected]>
  • Loading branch information
brycekahle and usamasaqib authored Jan 6, 2025
1 parent 3a41780 commit 5001b19
Show file tree
Hide file tree
Showing 33 changed files with 1,328 additions and 574 deletions.
2 changes: 2 additions & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -456,10 +456,12 @@
/pkg/util/crio/ @DataDog/container-integrations
/pkg/util/docker/ @DataDog/container-integrations
/pkg/util/ecs/ @DataDog/container-integrations
/pkg/util/encoding/ @DataDog/ebpf-platform
/pkg/util/funcs/ @DataDog/ebpf-platform
/pkg/util/gpu/ @DataDog/container-platform
/pkg/util/kernel/ @DataDog/ebpf-platform
/pkg/util/safeelf/ @DataDog/ebpf-platform
/pkg/util/slices/ @DataDog/ebpf-platform
/pkg/util/ktime @DataDog/agent-security
/pkg/util/kubernetes/ @DataDog/container-integrations @DataDog/container-platform @DataDog/container-app
/pkg/util/podman/ @DataDog/container-integrations
Expand Down
4 changes: 4 additions & 0 deletions cmd/system-probe/config/adjust_npm.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ const (
func adjustNetwork(cfg model.Config) {
ebpflessEnabled := cfg.GetBool(netNS("enable_ebpfless"))

deprecateInt(cfg, spNS("closed_connection_flush_threshold"), netNS("closed_connection_flush_threshold"))
deprecateInt(cfg, spNS("closed_channel_size"), netNS("closed_channel_size"))
applyDefault(cfg, netNS("closed_channel_size"), 500)

limitMaxInt(cfg, spNS("max_conns_per_message"), maxConnsMessageBatchSize)

if cfg.GetBool(spNS("disable_tcp")) {
Expand Down
8 changes: 6 additions & 2 deletions pkg/config/setup/system_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,11 @@ func InitSystemProbeConfig(cfg pkgconfigmodel.Config) {
cfg.BindEnvAndSetDefault(join(spNS, "max_tracked_connections"), 65536)
cfg.BindEnv(join(spNS, "max_closed_connections_buffered"))
cfg.BindEnv(join(netNS, "max_failed_connections_buffered"))
cfg.BindEnvAndSetDefault(join(spNS, "closed_connection_flush_threshold"), 0)
cfg.BindEnvAndSetDefault(join(spNS, "closed_channel_size"), 500)
cfg.BindEnv(join(spNS, "closed_connection_flush_threshold"))
cfg.BindEnv(join(netNS, "closed_connection_flush_threshold"))
cfg.BindEnv(join(spNS, "closed_channel_size"))
cfg.BindEnv(join(netNS, "closed_channel_size"))
cfg.BindEnvAndSetDefault(join(netNS, "closed_buffer_wakeup_count"), 4)
cfg.BindEnvAndSetDefault(join(spNS, "max_connection_state_buffered"), 75000)

cfg.BindEnvAndSetDefault(join(spNS, "disable_dns_inspection"), false, "DD_DISABLE_DNS_INSPECTION")
Expand All @@ -212,6 +215,7 @@ func InitSystemProbeConfig(cfg pkgconfigmodel.Config) {
cfg.BindEnvAndSetDefault(join(spNS, "enable_conntrack_all_namespaces"), true, "DD_SYSTEM_PROBE_ENABLE_CONNTRACK_ALL_NAMESPACES")
cfg.BindEnvAndSetDefault(join(netNS, "enable_protocol_classification"), true, "DD_ENABLE_PROTOCOL_CLASSIFICATION")
cfg.BindEnvAndSetDefault(join(netNS, "enable_ringbuffers"), true, "DD_SYSTEM_PROBE_NETWORK_ENABLE_RINGBUFFERS")
cfg.BindEnvAndSetDefault(join(netNS, "enable_custom_batching"), false, "DD_SYSTEM_PROBE_NETWORK_ENABLE_CUSTOM_BATCHING")
cfg.BindEnvAndSetDefault(join(netNS, "enable_tcp_failed_connections"), true, "DD_SYSTEM_PROBE_NETWORK_ENABLE_FAILED_CONNS")
cfg.BindEnvAndSetDefault(join(netNS, "ignore_conntrack_init_failure"), false, "DD_SYSTEM_PROBE_NETWORK_IGNORE_CONNTRACK_INIT_FAILURE")
cfg.BindEnvAndSetDefault(join(netNS, "conntrack_init_timeout"), 10*time.Second)
Expand Down
13 changes: 13 additions & 0 deletions pkg/ebpf/c/bpf_helpers_custom.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,17 @@ unsigned long long load_half(void *skb,
unsigned long long load_word(void *skb,
unsigned long long off) asm("llvm.bpf.load.word");

// declare our own versions of these enums, because they don't exist on <5.8
enum {
DD_BPF_RB_NO_WAKEUP = 1,
DD_BPF_RB_FORCE_WAKEUP = 2,
};

enum {
DD_BPF_RB_AVAIL_DATA = 0,
DD_BPF_RB_RING_SIZE = 1,
DD_BPF_RB_CONS_POS = 2,
DD_BPF_RB_PROD_POS = 3,
};

#endif
20 changes: 20 additions & 0 deletions pkg/ebpf/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ type ModifierAfterInit interface {
AfterInit(*manager.Manager, names.ModuleName, *manager.Options) error
}

// ModifierPreStart is a sub-interface of Modifier that exposes an PreStart method
type ModifierPreStart interface {
Modifier

// PreStart is called before the ebpf.Manager.Start call
PreStart(*manager.Manager, names.ModuleName) error
}

// ModifierBeforeStop is a sub-interface of Modifier that exposes a BeforeStop method
type ModifierBeforeStop interface {
Modifier
Expand Down Expand Up @@ -168,3 +176,15 @@ func (m *Manager) Stop(cleanupType manager.MapCleanupType) error {

return errors.Join(errs, err)
}

// Start is a wrapper around ebpf-manager.Manager.Start
func (m *Manager) Start() error {
err := runModifiersOfType(m.EnabledModifiers, "PreStart", func(mod ModifierPreStart) error {
return mod.PreStart(m.Manager, m.Name)
})
if err != nil {
return err
}

return m.Manager.Start()
}
Loading

0 comments on commit 5001b19

Please sign in to comment.