Skip to content

Commit

Permalink
WIP: remove GC from ring buff mapper not needed (#185)
Browse files Browse the repository at this point in the history
Signed-off-by: msherif1234 <[email protected]>
  • Loading branch information
msherif1234 authored Sep 20, 2023
1 parent c8c9e6a commit 7d31bf6
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 17 deletions.
4 changes: 2 additions & 2 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,8 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*fl
}

alog.Debug("connecting flows' processing graph")
mapTracer := node.AsStart(f.mapTracer.TraceLoop(ctx, f.cfg.EnableGC))
rbTracer := node.AsStart(f.rbTracer.TraceLoop(ctx, f.cfg.EnableGC))
mapTracer := node.AsStart(f.mapTracer.TraceLoop(ctx, f.cfg.ForceGC))
rbTracer := node.AsStart(f.rbTracer.TraceLoop(ctx))

accounter := node.AsMiddle(f.accounter.Account,
node.ChannelBufferLen(f.cfg.BuffersLength))
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ type Config struct {
// This feature requires the flows agent to attach at both Ingress and Egress hookpoints.
// If both Ingress and Egress are not enabled then this feature will not be enabled even if set to true via env.
EnableRTT bool `env:"ENABLE_RTT" envDefault:"false"`
// EnableGC enables golang garbage collection run at the end of every map eviction, default is true
EnableGC bool `env:"ENABLE_GARBAGE_COLLECTION" envDefault:"true"`
// ForceGC enables forcing golang garbage collection run at the end of every map eviction, default is true
ForceGC bool `env:"FORCE_GARBAGE_COLLECTION" envDefault:"true"`
// EnablePktDrops enable Packet drops eBPF hook to account for dropped flows
EnablePktDrops bool `env:"ENABLE_PKT_DROPS" envDefault:"false"`
// EnableDNSTracking enable DNS tracking eBPF hook to track dns query/response flows
Expand Down
12 changes: 6 additions & 6 deletions pkg/flow/tracer_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ func (m *MapTracer) Flush() {
m.evictionCond.Broadcast()
}

func (m *MapTracer) TraceLoop(ctx context.Context, enableGC bool) node.StartFunc[[]*Record] {
func (m *MapTracer) TraceLoop(ctx context.Context, forceGC bool) node.StartFunc[[]*Record] {
return func(out chan<- []*Record) {
evictionTicker := time.NewTicker(m.evictionTimeout)
go m.evictionSynchronization(ctx, enableGC, out)
go m.evictionSynchronization(ctx, forceGC, out)
for {
select {
case <-ctx.Done():
Expand All @@ -68,7 +68,7 @@ func (m *MapTracer) TraceLoop(ctx context.Context, enableGC bool) node.StartFunc
// evictionSynchronization loop just waits for the evictionCond to happen
// and triggers the actual eviction. It makes sure that only one eviction
// is being triggered at the same time
func (m *MapTracer) evictionSynchronization(ctx context.Context, enableGC bool, out chan<- []*Record) {
func (m *MapTracer) evictionSynchronization(ctx context.Context, forceGC bool, out chan<- []*Record) {
// flow eviction loop. It just keeps waiting for eviction until someone triggers the
// evictionCond.Broadcast signal
for {
Expand All @@ -81,14 +81,14 @@ func (m *MapTracer) evictionSynchronization(ctx context.Context, enableGC bool,
return
default:
mtlog.Debug("evictionSynchronization signal received")
m.evictFlows(ctx, enableGC, out)
m.evictFlows(ctx, forceGC, out)
}
m.evictionCond.L.Unlock()

}
}

func (m *MapTracer) evictFlows(ctx context.Context, enableGC bool, forwardFlows chan<- []*Record) {
func (m *MapTracer) evictFlows(ctx context.Context, forceGC bool, forwardFlows chan<- []*Record) {
// it's important that this monotonic timer reports same or approximate values as kernel-side bpf_ktime_get_ns()
monotonicTimeNow := monotime.Now()
currentTime := time.Now()
Expand Down Expand Up @@ -121,7 +121,7 @@ func (m *MapTracer) evictFlows(ctx context.Context, enableGC bool, forwardFlows
forwardFlows <- forwardingFlows
}

if enableGC {
if forceGC {
runtime.GC()
}
mtlog.Debugf("%d flows evicted", len(forwardingFlows))
Expand Down
10 changes: 3 additions & 7 deletions pkg/flow/tracer_ringbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"errors"
"fmt"
"runtime"
"sync/atomic"
"syscall"
"time"
Expand Down Expand Up @@ -52,7 +51,7 @@ func NewRingBufTracer(
}
}

func (m *RingBufTracer) TraceLoop(ctx context.Context, enableGC bool) node.StartFunc[*RawRecord] {
func (m *RingBufTracer) TraceLoop(ctx context.Context) node.StartFunc[*RawRecord] {
return func(out chan<- *RawRecord) {
debugging := logrus.IsLevelEnabled(logrus.DebugLevel)
for {
Expand All @@ -61,7 +60,7 @@ func (m *RingBufTracer) TraceLoop(ctx context.Context, enableGC bool) node.Start
rtlog.Debug("exiting trace loop due to context cancellation")
return
default:
if err := m.listenAndForwardRingBuffer(debugging, enableGC, out); err != nil {
if err := m.listenAndForwardRingBuffer(debugging, out); err != nil {
if errors.Is(err, ringbuf.ErrClosed) {
rtlog.Debug("Received signal, exiting..")
return
Expand All @@ -74,7 +73,7 @@ func (m *RingBufTracer) TraceLoop(ctx context.Context, enableGC bool) node.Start
}
}

func (m *RingBufTracer) listenAndForwardRingBuffer(debugging, enableGC bool, forwardCh chan<- *RawRecord) error {
func (m *RingBufTracer) listenAndForwardRingBuffer(debugging bool, forwardCh chan<- *RawRecord) error {
event, err := m.ringBuffer.ReadRingBuf()
if err != nil {
return fmt.Errorf("reading from ring buffer: %w", err)
Expand All @@ -96,9 +95,6 @@ func (m *RingBufTracer) listenAndForwardRingBuffer(debugging, enableGC bool, for

// Will need to send it to accounter anyway to account regardless of complete/ongoing flow
forwardCh <- readFlow
if enableGC {
runtime.GC()
}
return nil
}

Expand Down

0 comments on commit 7d31bf6

Please sign in to comment.