Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-983: Change aggregation flow map to hashmap instead perCPU hashmap #118

Merged
merged 1 commit into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ struct {
} direct_flows SEC(".maps");

// Key: the flow identifier. Value: the flow metrics for that identifier.
// The userspace will aggregate them into a single flow.
struct {
__uint(type, BPF_MAP_TYPE_PERCPU_HASH);
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, flow_id);
__type(value, flow_metrics);
__uint(max_entries, 1 << 24);
__uint(map_flags, BPF_F_NO_PREALLOC);
} aggregated_flows SEC(".maps");

// Constant definitions, to be overridden by the invoker
Expand Down Expand Up @@ -260,11 +261,6 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
aggregate_flow->packets += 1;
aggregate_flow->bytes += skb->len;
aggregate_flow->end_mono_time_ts = current_time;
// it might happen that start_mono_time hasn't been set due to
// the way percpu hashmap deal with concurrent map entries
if (aggregate_flow->start_mono_time_ts == 0) {
aggregate_flow->start_mono_time_ts = current_time;
}
aggregate_flow->flags |= flags;
long ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY);
if (trace_messages && ret != 0) {
Expand Down
2 changes: 1 addition & 1 deletion docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ flowchart TD
E(ebpf.FlowFetcher) --> |"pushes via<br/>RingBuffer"| RB(flow.RingBufTracer)
style E fill:#990

E --> |"polls<br/>PerCPUHashMap"| M(flow.MapTracer)
E --> |"polls<br/>HashMap"| M(flow.MapTracer)
RB --> |chan *flow.Record| ACC(flow.Accounter)
RB -.-> |flushes| M
ACC --> |"chan []*flow.Record"| DD(flow.Deduper)
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type ebpfFlowFetcher interface {
io.Closer
Register(iface ifaces.Interface) error

LookupAndDeleteMap() map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics
LookupAndDeleteMap() map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics
ReadRingBuf() (ringbuf.Record, error)
}

Expand Down
74 changes: 15 additions & 59 deletions pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ var (
DstPort: 456,
IfIndex: 3,
}
key1Dupe = ebpf.BpfFlowId{
SrcPort: 123,
DstPort: 456,
IfIndex: 4,
}

key2 = ebpf.BpfFlowId{
SrcPort: 333,
Expand All @@ -71,7 +66,7 @@ func TestFlowsAgent_Deduplication(t *testing.T) {
})

exported := export.Get(t, timeout)
assert.Len(t, exported, 2)
assert.Len(t, exported, 1)

receivedKeys := map[ebpf.BpfFlowId]struct{}{}

Expand All @@ -81,21 +76,11 @@ func TestFlowsAgent_Deduplication(t *testing.T) {
receivedKeys[f.Id] = struct{}{}
switch f.Id {
case key1:
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
assert.EqualValues(t, 3, f.Metrics.Packets)
assert.EqualValues(t, 44, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
assert.Equal(t, "foo", f.Interface)
key1Flows = append(key1Flows, f)
case key1Dupe:
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
assert.Equal(t, "bar", f.Interface)
key1Flows = append(key1Flows, f)
case key2:
assert.EqualValues(t, 7, f.Metrics.Packets)
assert.EqualValues(t, 33, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
}
}
assert.Lenf(t, key1Flows, 1, "only one flow should have been forwarded: %#v", key1Flows)
Expand All @@ -112,33 +97,22 @@ func TestFlowsAgent_DeduplicationJustMark(t *testing.T) {
exported := export.Get(t, timeout)
receivedKeys := map[ebpf.BpfFlowId]struct{}{}

assert.Len(t, exported, 3)
assert.Len(t, exported, 1)
duplicates := 0
for _, f := range exported {
require.NotContains(t, receivedKeys, f.Id)
receivedKeys[f.Id] = struct{}{}
switch f.Id {
case key1:
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
assert.EqualValues(t, 3, f.Metrics.Packets)
assert.EqualValues(t, 44, f.Metrics.Bytes)
if f.Duplicate {
duplicates++
}
assert.Equal(t, "foo", f.Interface)
case key1Dupe:
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
if f.Duplicate {
duplicates++
}
assert.Equal(t, "bar", f.Interface)
case key2:
assert.EqualValues(t, 7, f.Metrics.Packets)
assert.EqualValues(t, 33, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
}
}
assert.Equalf(t, 1, duplicates, "exported flows should have only one duplicate: %#v", exported)
assert.Equalf(t, 0, duplicates, "exported flows should have only one duplicate: %#v", exported)
}

func TestFlowsAgent_Deduplication_None(t *testing.T) {
Expand All @@ -149,7 +123,7 @@ func TestFlowsAgent_Deduplication_None(t *testing.T) {
})

exported := export.Get(t, timeout)
assert.Len(t, exported, 3)
assert.Len(t, exported, 1)
receivedKeys := map[ebpf.BpfFlowId]struct{}{}

var key1Flows []*flow.Record
Expand All @@ -158,24 +132,14 @@ func TestFlowsAgent_Deduplication_None(t *testing.T) {
receivedKeys[f.Id] = struct{}{}
switch f.Id {
case key1:
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
assert.EqualValues(t, 3, f.Metrics.Packets)
assert.EqualValues(t, 44, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
assert.Equal(t, "foo", f.Interface)
key1Flows = append(key1Flows, f)
case key1Dupe:
assert.EqualValues(t, 4, f.Metrics.Packets)
assert.EqualValues(t, 66, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
assert.Equal(t, "bar", f.Interface)
key1Flows = append(key1Flows, f)
case key2:
assert.EqualValues(t, 7, f.Metrics.Packets)
assert.EqualValues(t, 33, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
}
}
assert.Lenf(t, key1Flows, 2, "both key1 flows should have been forwarded: %#v", key1Flows)
assert.Lenf(t, key1Flows, 1, "both key1 flows should have been forwarded: %#v", key1Flows)
}

func TestFlowsAgent_Decoration(t *testing.T) {
Expand All @@ -185,7 +149,7 @@ func TestFlowsAgent_Decoration(t *testing.T) {
})

exported := export.Get(t, timeout)
assert.Len(t, exported, 3)
assert.Len(t, exported, 1)

// Tests that the decoration stage has been properly executed. It should
// add the interface name and the agent IP
Expand Down Expand Up @@ -219,18 +183,10 @@ func testAgent(t *testing.T, cfg *Config) *test.ExporterFake {
})

now := uint64(monotime.Now())
key1Metrics := []ebpf.BpfFlowMetrics{
{Packets: 3, Bytes: 44, StartMonoTimeTs: now + 1000, EndMonoTimeTs: now + 1_000_000_000},
{Packets: 1, Bytes: 22, StartMonoTimeTs: now, EndMonoTimeTs: now + 3000},
}
key2Metrics := []ebpf.BpfFlowMetrics{
{Packets: 7, Bytes: 33, StartMonoTimeTs: now, EndMonoTimeTs: now + 2_000_000_000},
}
key1Metrics := ebpf.BpfFlowMetrics{Packets: 3, Bytes: 44, StartMonoTimeTs: now + 1000, EndMonoTimeTs: now + 1_000_000_000}

ebpfTracer.AppendLookupResults(map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics{
key1: key1Metrics,
key1Dupe: key1Metrics,
key2: key2Metrics,
ebpfTracer.AppendLookupResults(map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics{
key1: key1Metrics,
})
return export
}
7 changes: 4 additions & 3 deletions pkg/ebpf/bpf_bpfeb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_bpfeb.o
Binary file not shown.
7 changes: 4 additions & 3 deletions pkg/ebpf/bpf_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_bpfel.o
Binary file not shown.
12 changes: 6 additions & 6 deletions pkg/ebpf/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,25 +308,25 @@ func (m *FlowFetcher) ReadRingBuf() (ringbuf.Record, error) {
// TODO: detect whether BatchLookupAndDelete is supported (Kernel>=5.6) and use it selectively
// Supported Lookup/Delete operations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md
// Race conditions here causes that some flows are lost in high-load scenarios
func (m *FlowFetcher) LookupAndDeleteMap() map[BpfFlowId][]BpfFlowMetrics {
func (m *FlowFetcher) LookupAndDeleteMap() map[BpfFlowId]BpfFlowMetrics {
flowMap := m.objects.AggregatedFlows

iterator := flowMap.Iterate()
flows := make(map[BpfFlowId][]BpfFlowMetrics, m.cacheMaxSize)
var flow = make(map[BpfFlowId]BpfFlowMetrics, m.cacheMaxSize)

id := BpfFlowId{}
var metrics []BpfFlowMetrics
var metric BpfFlowMetrics
// Changing Iterate+Delete by LookupAndDelete would prevent some possible race conditions
// TODO: detect whether LookupAndDelete is supported (Kernel>=4.20) and use it selectively
for iterator.Next(&id, &metrics) {
for iterator.Next(&id, &metric) {
if err := flowMap.Delete(id); err != nil {
log.WithError(err).WithField("flowId", id).
Warnf("couldn't delete flow entry")
}
// We observed that eBFP PerCPU map might insert multiple times the same key in the map
// (probably due to race conditions) so we need to re-join metrics again at userspace
// TODO: instrument how many times the keys are is repeated in the same eviction
flows[id] = append(flows[id], metrics...)
flow[id] = metric
}
return flows
return flow
}
4 changes: 1 addition & 3 deletions pkg/flow/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ func (c *Accounter) Account(in <-chan *RawRecord, out chan<- []*Record) {
alog.Debug("exiting account routine")
return
}
if stored, ok := c.entries[record.Id]; ok {
Accumulate(stored, &record.Metrics)
} else {
if _, ok := c.entries[record.Id]; !ok {
if len(c.entries) >= c.maxEntries {
evictingEntries := c.entries
c.entries = map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics{}
Expand Down
20 changes: 10 additions & 10 deletions pkg/flow/account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ func TestEvict_MaxEntries(t *testing.T) {
RawRecord: RawRecord{
Id: k1,
Metrics: ebpf.BpfFlowMetrics{
Bytes: 444, Packets: 2, StartMonoTimeTs: 123, EndMonoTimeTs: 789, Flags: 1,
Bytes: 123, Packets: 1, StartMonoTimeTs: 123, EndMonoTimeTs: 123, Flags: 1,
},
},
TimeFlowStart: now.Add(-(1000 - 123) * time.Nanosecond),
TimeFlowEnd: now.Add(-(1000 - 789) * time.Nanosecond),
TimeFlowEnd: now.Add(-(1000 - 123) * time.Nanosecond),
},
k2: {
RawRecord: RawRecord{
Expand Down Expand Up @@ -178,31 +178,31 @@ func TestEvict_Period(t *testing.T) {
RawRecord: RawRecord{
Id: k1,
Metrics: ebpf.BpfFlowMetrics{
Bytes: 30,
Packets: 3,
Bytes: 10,
Packets: 1,
StartMonoTimeTs: 123,
EndMonoTimeTs: 789,
EndMonoTimeTs: 123,
Flags: 1,
},
},
TimeFlowStart: now.Add(-1000 + 123),
TimeFlowEnd: now.Add(-1000 + 789),
TimeFlowEnd: now.Add(-1000 + 123),
}, *records[0])
records = receiveTimeout(t, evictor)
require.Len(t, records, 1)
assert.Equal(t, Record{
RawRecord: RawRecord{
Id: k1,
Metrics: ebpf.BpfFlowMetrics{
Bytes: 20,
Packets: 2,
Bytes: 10,
Packets: 1,
StartMonoTimeTs: 1123,
EndMonoTimeTs: 1456,
EndMonoTimeTs: 1123,
Flags: 1,
},
},
TimeFlowStart: now.Add(-1000 + 1123),
TimeFlowEnd: now.Add(-1000 + 1456),
TimeFlowEnd: now.Add(-1000 + 1123),
}, *records[0])

// no more flows are evicted
Expand Down
13 changes: 0 additions & 13 deletions pkg/flow/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,6 @@ func NewRecord(
}
}

func Accumulate(r *ebpf.BpfFlowMetrics, src *ebpf.BpfFlowMetrics) {
msherif1234 marked this conversation as resolved.
Show resolved Hide resolved
// time == 0 if the value has not been yet set
if r.StartMonoTimeTs == 0 || r.StartMonoTimeTs > src.StartMonoTimeTs {
r.StartMonoTimeTs = src.StartMonoTimeTs
}
if r.EndMonoTimeTs == 0 || r.EndMonoTimeTs < src.EndMonoTimeTs {
r.EndMonoTimeTs = src.EndMonoTimeTs
}
r.Bytes += src.Bytes
r.Packets += src.Packets
r.Flags |= src.Flags
}

// IP returns the net.IP equivalent object
func IP(ia IPAddr) net.IP {
return ia[:]
Expand Down
22 changes: 2 additions & 20 deletions pkg/flow/tracer_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type MapTracer struct {
}

type mapFetcher interface {
LookupAndDeleteMap() map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics
LookupAndDeleteMap() map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics
}

func NewMapTracer(fetcher mapFetcher, evictionTimeout time.Duration) *MapTracer {
Expand Down Expand Up @@ -92,7 +92,7 @@ func (m *MapTracer) evictFlows(ctx context.Context, forwardFlows chan<- []*Recor
var forwardingFlows []*Record
laterFlowNs := uint64(0)
for flowKey, flowMetrics := range m.mapFetcher.LookupAndDeleteMap() {
aggregatedMetrics := m.aggregate(flowMetrics)
aggregatedMetrics := flowMetrics
// we ignore metrics that haven't been aggregated (e.g. all the mapped values are ignored)
if aggregatedMetrics.EndMonoTimeTs == 0 {
continue
Expand All @@ -117,21 +117,3 @@ func (m *MapTracer) evictFlows(ctx context.Context, forwardFlows chan<- []*Recor
}
mtlog.Debugf("%d flows evicted", len(forwardingFlows))
}

func (m *MapTracer) aggregate(metrics []ebpf.BpfFlowMetrics) ebpf.BpfFlowMetrics {
if len(metrics) == 0 {
mtlog.Warn("invoked aggregate with no values")
return ebpf.BpfFlowMetrics{}
}
aggr := ebpf.BpfFlowMetrics{}
for _, mt := range metrics {
// eBPF hashmap values are not zeroed when the entry is removed. That causes that we
// might receive entries from previous collect-eviction timeslots.
// We need to check the flow time and discard old flows.
if mt.StartMonoTimeTs <= m.lastEvictionNs || mt.EndMonoTimeTs <= m.lastEvictionNs {
continue
}
Accumulate(&aggr, &mt)
}
return aggr
}
Loading