Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-1312: remove GC from ring buff mapper not needed #185

Merged
merged 1 commit into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,8 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*fl
}

alog.Debug("connecting flows' processing graph")
mapTracer := node.AsStart(f.mapTracer.TraceLoop(ctx, f.cfg.EnableGC))
rbTracer := node.AsStart(f.rbTracer.TraceLoop(ctx, f.cfg.EnableGC))
mapTracer := node.AsStart(f.mapTracer.TraceLoop(ctx, f.cfg.ForceGC))
rbTracer := node.AsStart(f.rbTracer.TraceLoop(ctx))

accounter := node.AsMiddle(f.accounter.Account,
node.ChannelBufferLen(f.cfg.BuffersLength))
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ type Config struct {
// This feature requires the flows agent to attach at both Ingress and Egress hookpoints.
// If both Ingress and Egress are not enabled then this feature will not be enabled even if set to true via env.
EnableRTT bool `env:"ENABLE_RTT" envDefault:"false"`
// EnableGC enables golang garbage collection run at the end of every map eviction, default is true
EnableGC bool `env:"ENABLE_GARBAGE_COLLECTION" envDefault:"true"`
// ForceGC enables forcing golang garbage collection run at the end of every map eviction, default is true
ForceGC bool `env:"FORCE_GARBAGE_COLLECTION" envDefault:"true"`
// EnablePktDrops enable Packet drops eBPF hook to account for dropped flows
EnablePktDrops bool `env:"ENABLE_PKT_DROPS" envDefault:"false"`
// EnableDNSTracking enable DNS tracking eBPF hook to track dns query/response flows
Expand Down
12 changes: 6 additions & 6 deletions pkg/flow/tracer_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ func (m *MapTracer) Flush() {
m.evictionCond.Broadcast()
}

func (m *MapTracer) TraceLoop(ctx context.Context, enableGC bool) node.StartFunc[[]*Record] {
func (m *MapTracer) TraceLoop(ctx context.Context, forceGC bool) node.StartFunc[[]*Record] {
return func(out chan<- []*Record) {
evictionTicker := time.NewTicker(m.evictionTimeout)
go m.evictionSynchronization(ctx, enableGC, out)
go m.evictionSynchronization(ctx, forceGC, out)
for {
select {
case <-ctx.Done():
Expand All @@ -68,7 +68,7 @@ func (m *MapTracer) TraceLoop(ctx context.Context, enableGC bool) node.StartFunc
// evictionSynchronization loop just waits for the evictionCond to happen
// and triggers the actual eviction. It makes sure that only one eviction
// is being triggered at the same time
func (m *MapTracer) evictionSynchronization(ctx context.Context, enableGC bool, out chan<- []*Record) {
func (m *MapTracer) evictionSynchronization(ctx context.Context, forceGC bool, out chan<- []*Record) {
// flow eviction loop. It just keeps waiting for eviction until someone triggers the
// evictionCond.Broadcast signal
for {
Expand All @@ -81,14 +81,14 @@ func (m *MapTracer) evictionSynchronization(ctx context.Context, enableGC bool,
return
default:
mtlog.Debug("evictionSynchronization signal received")
m.evictFlows(ctx, enableGC, out)
m.evictFlows(ctx, forceGC, out)
}
m.evictionCond.L.Unlock()

}
}

func (m *MapTracer) evictFlows(ctx context.Context, enableGC bool, forwardFlows chan<- []*Record) {
func (m *MapTracer) evictFlows(ctx context.Context, forceGC bool, forwardFlows chan<- []*Record) {
// it's important that this monotonic timer reports same or approximate values as kernel-side bpf_ktime_get_ns()
monotonicTimeNow := monotime.Now()
currentTime := time.Now()
Expand Down Expand Up @@ -121,7 +121,7 @@ func (m *MapTracer) evictFlows(ctx context.Context, enableGC bool, forwardFlows
forwardFlows <- forwardingFlows
}

if enableGC {
if forceGC {
runtime.GC()
}
mtlog.Debugf("%d flows evicted", len(forwardingFlows))
Expand Down
10 changes: 3 additions & 7 deletions pkg/flow/tracer_ringbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"errors"
"fmt"
"runtime"
"sync/atomic"
"syscall"
"time"
Expand Down Expand Up @@ -52,7 +51,7 @@ func NewRingBufTracer(
}
}

func (m *RingBufTracer) TraceLoop(ctx context.Context, enableGC bool) node.StartFunc[*RawRecord] {
func (m *RingBufTracer) TraceLoop(ctx context.Context) node.StartFunc[*RawRecord] {
return func(out chan<- *RawRecord) {
debugging := logrus.IsLevelEnabled(logrus.DebugLevel)
for {
Expand All @@ -61,7 +60,7 @@ func (m *RingBufTracer) TraceLoop(ctx context.Context, enableGC bool) node.Start
rtlog.Debug("exiting trace loop due to context cancellation")
return
default:
if err := m.listenAndForwardRingBuffer(debugging, enableGC, out); err != nil {
if err := m.listenAndForwardRingBuffer(debugging, out); err != nil {
if errors.Is(err, ringbuf.ErrClosed) {
rtlog.Debug("Received signal, exiting..")
return
Expand All @@ -74,7 +73,7 @@ func (m *RingBufTracer) TraceLoop(ctx context.Context, enableGC bool) node.Start
}
}

func (m *RingBufTracer) listenAndForwardRingBuffer(debugging, enableGC bool, forwardCh chan<- *RawRecord) error {
func (m *RingBufTracer) listenAndForwardRingBuffer(debugging bool, forwardCh chan<- *RawRecord) error {
event, err := m.ringBuffer.ReadRingBuf()
if err != nil {
return fmt.Errorf("reading from ring buffer: %w", err)
Expand All @@ -96,9 +95,6 @@ func (m *RingBufTracer) listenAndForwardRingBuffer(debugging, enableGC bool, for

// Will need to send it to accounter anyway to account regardless of complete/ongoing flow
forwardCh <- readFlow
if enableGC {
runtime.GC()
}
return nil
}

Expand Down