diff --git a/bpf/flows.c b/bpf/flows.c
index ac4d710bb..f998e684d 100644
--- a/bpf/flows.c
+++ b/bpf/flows.c
@@ -58,11 +58,12 @@ struct {
} direct_flows SEC(".maps");
// Key: the flow identifier. Value: the flow metrics for that identifier.
-// The userspace will aggregate them into a single flow.
struct {
- __uint(type, BPF_MAP_TYPE_PERCPU_HASH);
+ __uint(type, BPF_MAP_TYPE_HASH);
__type(key, flow_id);
__type(value, flow_metrics);
+ __uint(max_entries, 1 << 24);
+ __uint(map_flags, BPF_F_NO_PREALLOC);
} aggregated_flows SEC(".maps");
// Constant definitions, to be overridden by the invoker
@@ -260,11 +261,6 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
aggregate_flow->packets += 1;
aggregate_flow->bytes += skb->len;
aggregate_flow->end_mono_time_ts = current_time;
- // it might happen that start_mono_time hasn't been set due to
- // the way percpu hashmap deal with concurrent map entries
- if (aggregate_flow->start_mono_time_ts == 0) {
- aggregate_flow->start_mono_time_ts = current_time;
- }
aggregate_flow->flags |= flags;
long ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY);
if (trace_messages && ret != 0) {
diff --git a/docs/architecture.md b/docs/architecture.md
index f659b92a2..56fa49326 100644
--- a/docs/architecture.md
+++ b/docs/architecture.md
@@ -11,7 +11,7 @@ flowchart TD
E(ebpf.FlowFetcher) --> |"pushes via
RingBuffer"| RB(flow.RingBufTracer)
style E fill:#990
- E --> |"polls
PerCPUHashMap"| M(flow.MapTracer)
+ E --> |"polls
HashMap"| M(flow.MapTracer)
RB --> |chan *flow.Record| ACC(flow.Accounter)
RB -.-> |flushes| M
ACC --> |"chan []*flow.Record"| DD(flow.Deduper)
diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go
index b2fb2401c..d77671af8 100644
--- a/pkg/agent/agent.go
+++ b/pkg/agent/agent.go
@@ -78,7 +78,7 @@ type ebpfFlowFetcher interface {
io.Closer
Register(iface ifaces.Interface) error
- LookupAndDeleteMap() map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics
+ LookupAndDeleteMap() map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics
ReadRingBuf() (ringbuf.Record, error)
}
diff --git a/pkg/agent/agent_test.go b/pkg/agent/agent_test.go
index b615f680a..a8818ab93 100644
--- a/pkg/agent/agent_test.go
+++ b/pkg/agent/agent_test.go
@@ -49,11 +49,6 @@ var (
DstPort: 456,
IfIndex: 3,
}
- key1Dupe = ebpf.BpfFlowId{
- SrcPort: 123,
- DstPort: 456,
- IfIndex: 4,
- }
key2 = ebpf.BpfFlowId{
SrcPort: 333,
@@ -71,7 +66,7 @@ func TestFlowsAgent_Deduplication(t *testing.T) {
})
exported := export.Get(t, timeout)
- assert.Len(t, exported, 2)
+ assert.Len(t, exported, 1)
receivedKeys := map[ebpf.BpfFlowId]struct{}{}
@@ -81,21 +76,11 @@ func TestFlowsAgent_Deduplication(t *testing.T) {
receivedKeys[f.Id] = struct{}{}
switch f.Id {
case key1:
- assert.EqualValues(t, 4, f.Metrics.Packets)
- assert.EqualValues(t, 66, f.Metrics.Bytes)
+ assert.EqualValues(t, 3, f.Metrics.Packets)
+ assert.EqualValues(t, 44, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
assert.Equal(t, "foo", f.Interface)
key1Flows = append(key1Flows, f)
- case key1Dupe:
- assert.EqualValues(t, 4, f.Metrics.Packets)
- assert.EqualValues(t, 66, f.Metrics.Bytes)
- assert.False(t, f.Duplicate)
- assert.Equal(t, "bar", f.Interface)
- key1Flows = append(key1Flows, f)
- case key2:
- assert.EqualValues(t, 7, f.Metrics.Packets)
- assert.EqualValues(t, 33, f.Metrics.Bytes)
- assert.False(t, f.Duplicate)
}
}
assert.Lenf(t, key1Flows, 1, "only one flow should have been forwarded: %#v", key1Flows)
@@ -112,33 +97,22 @@ func TestFlowsAgent_DeduplicationJustMark(t *testing.T) {
exported := export.Get(t, timeout)
receivedKeys := map[ebpf.BpfFlowId]struct{}{}
- assert.Len(t, exported, 3)
+ assert.Len(t, exported, 1)
duplicates := 0
for _, f := range exported {
require.NotContains(t, receivedKeys, f.Id)
receivedKeys[f.Id] = struct{}{}
switch f.Id {
case key1:
- assert.EqualValues(t, 4, f.Metrics.Packets)
- assert.EqualValues(t, 66, f.Metrics.Bytes)
+ assert.EqualValues(t, 3, f.Metrics.Packets)
+ assert.EqualValues(t, 44, f.Metrics.Bytes)
if f.Duplicate {
duplicates++
}
assert.Equal(t, "foo", f.Interface)
- case key1Dupe:
- assert.EqualValues(t, 4, f.Metrics.Packets)
- assert.EqualValues(t, 66, f.Metrics.Bytes)
- if f.Duplicate {
- duplicates++
- }
- assert.Equal(t, "bar", f.Interface)
- case key2:
- assert.EqualValues(t, 7, f.Metrics.Packets)
- assert.EqualValues(t, 33, f.Metrics.Bytes)
- assert.False(t, f.Duplicate)
}
}
- assert.Equalf(t, 1, duplicates, "exported flows should have only one duplicate: %#v", exported)
+ assert.Equalf(t, 0, duplicates, "exported flows should have only one duplicate: %#v", exported)
}
func TestFlowsAgent_Deduplication_None(t *testing.T) {
@@ -149,7 +123,7 @@ func TestFlowsAgent_Deduplication_None(t *testing.T) {
})
exported := export.Get(t, timeout)
- assert.Len(t, exported, 3)
+ assert.Len(t, exported, 1)
receivedKeys := map[ebpf.BpfFlowId]struct{}{}
var key1Flows []*flow.Record
@@ -158,24 +132,14 @@ func TestFlowsAgent_Deduplication_None(t *testing.T) {
receivedKeys[f.Id] = struct{}{}
switch f.Id {
case key1:
- assert.EqualValues(t, 4, f.Metrics.Packets)
- assert.EqualValues(t, 66, f.Metrics.Bytes)
+ assert.EqualValues(t, 3, f.Metrics.Packets)
+ assert.EqualValues(t, 44, f.Metrics.Bytes)
assert.False(t, f.Duplicate)
assert.Equal(t, "foo", f.Interface)
key1Flows = append(key1Flows, f)
- case key1Dupe:
- assert.EqualValues(t, 4, f.Metrics.Packets)
- assert.EqualValues(t, 66, f.Metrics.Bytes)
- assert.False(t, f.Duplicate)
- assert.Equal(t, "bar", f.Interface)
- key1Flows = append(key1Flows, f)
- case key2:
- assert.EqualValues(t, 7, f.Metrics.Packets)
- assert.EqualValues(t, 33, f.Metrics.Bytes)
- assert.False(t, f.Duplicate)
}
}
- assert.Lenf(t, key1Flows, 2, "both key1 flows should have been forwarded: %#v", key1Flows)
+ assert.Lenf(t, key1Flows, 1, "both key1 flows should have been forwarded: %#v", key1Flows)
}
func TestFlowsAgent_Decoration(t *testing.T) {
@@ -185,7 +149,7 @@ func TestFlowsAgent_Decoration(t *testing.T) {
})
exported := export.Get(t, timeout)
- assert.Len(t, exported, 3)
+ assert.Len(t, exported, 1)
// Tests that the decoration stage has been properly executed. It should
// add the interface name and the agent IP
@@ -219,18 +183,10 @@ func testAgent(t *testing.T, cfg *Config) *test.ExporterFake {
})
now := uint64(monotime.Now())
- key1Metrics := []ebpf.BpfFlowMetrics{
- {Packets: 3, Bytes: 44, StartMonoTimeTs: now + 1000, EndMonoTimeTs: now + 1_000_000_000},
- {Packets: 1, Bytes: 22, StartMonoTimeTs: now, EndMonoTimeTs: now + 3000},
- }
- key2Metrics := []ebpf.BpfFlowMetrics{
- {Packets: 7, Bytes: 33, StartMonoTimeTs: now, EndMonoTimeTs: now + 2_000_000_000},
- }
+ key1Metrics := ebpf.BpfFlowMetrics{Packets: 3, Bytes: 44, StartMonoTimeTs: now + 1000, EndMonoTimeTs: now + 1_000_000_000}
- ebpfTracer.AppendLookupResults(map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics{
- key1: key1Metrics,
- key1Dupe: key1Metrics,
- key2: key2Metrics,
+ ebpfTracer.AppendLookupResults(map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics{
+ key1: key1Metrics,
})
return export
}
diff --git a/pkg/ebpf/bpf_bpfeb.go b/pkg/ebpf/bpf_bpfeb.go
index 4eb412d70..575ad0681 100644
--- a/pkg/ebpf/bpf_bpfeb.go
+++ b/pkg/ebpf/bpf_bpfeb.go
@@ -61,9 +61,9 @@ func LoadBpf() (*ebpf.CollectionSpec, error) {
//
// The following types are suitable as obj argument:
//
-// *BpfObjects
-// *BpfPrograms
-// *BpfMaps
+// *BpfObjects
+// *BpfPrograms
+// *BpfMaps
//
// See ebpf.CollectionSpec.LoadAndAssign documentation for details.
func LoadBpfObjects(obj interface{}, opts *ebpf.CollectionOptions) error {
@@ -154,5 +154,6 @@ func _BpfClose(closers ...io.Closer) error {
}
// Do not access this directly.
+//
//go:embed bpf_bpfeb.o
var _BpfBytes []byte
diff --git a/pkg/ebpf/bpf_bpfeb.o b/pkg/ebpf/bpf_bpfeb.o
index 3f851fa3b..178262cee 100644
Binary files a/pkg/ebpf/bpf_bpfeb.o and b/pkg/ebpf/bpf_bpfeb.o differ
diff --git a/pkg/ebpf/bpf_bpfel.go b/pkg/ebpf/bpf_bpfel.go
index 4de7b8d26..bae6823f0 100644
--- a/pkg/ebpf/bpf_bpfel.go
+++ b/pkg/ebpf/bpf_bpfel.go
@@ -61,9 +61,9 @@ func LoadBpf() (*ebpf.CollectionSpec, error) {
//
// The following types are suitable as obj argument:
//
-// *BpfObjects
-// *BpfPrograms
-// *BpfMaps
+// *BpfObjects
+// *BpfPrograms
+// *BpfMaps
//
// See ebpf.CollectionSpec.LoadAndAssign documentation for details.
func LoadBpfObjects(obj interface{}, opts *ebpf.CollectionOptions) error {
@@ -154,5 +154,6 @@ func _BpfClose(closers ...io.Closer) error {
}
// Do not access this directly.
+//
//go:embed bpf_bpfel.o
var _BpfBytes []byte
diff --git a/pkg/ebpf/bpf_bpfel.o b/pkg/ebpf/bpf_bpfel.o
index b9d7500c4..68b058b20 100644
Binary files a/pkg/ebpf/bpf_bpfel.o and b/pkg/ebpf/bpf_bpfel.o differ
diff --git a/pkg/ebpf/tracer.go b/pkg/ebpf/tracer.go
index 32b2641ba..4efa75bbf 100644
--- a/pkg/ebpf/tracer.go
+++ b/pkg/ebpf/tracer.go
@@ -308,17 +308,17 @@ func (m *FlowFetcher) ReadRingBuf() (ringbuf.Record, error) {
// TODO: detect whether BatchLookupAndDelete is supported (Kernel>=5.6) and use it selectively
// Supported Lookup/Delete operations by kernel: https://github.com/iovisor/bcc/blob/master/docs/kernel-versions.md
// Race conditions here causes that some flows are lost in high-load scenarios
-func (m *FlowFetcher) LookupAndDeleteMap() map[BpfFlowId][]BpfFlowMetrics {
+func (m *FlowFetcher) LookupAndDeleteMap() map[BpfFlowId]BpfFlowMetrics {
flowMap := m.objects.AggregatedFlows
iterator := flowMap.Iterate()
- flows := make(map[BpfFlowId][]BpfFlowMetrics, m.cacheMaxSize)
+ var flow = make(map[BpfFlowId]BpfFlowMetrics, m.cacheMaxSize)
id := BpfFlowId{}
- var metrics []BpfFlowMetrics
+ var metric BpfFlowMetrics
// Changing Iterate+Delete by LookupAndDelete would prevent some possible race conditions
// TODO: detect whether LookupAndDelete is supported (Kernel>=4.20) and use it selectively
- for iterator.Next(&id, &metrics) {
+ for iterator.Next(&id, &metric) {
if err := flowMap.Delete(id); err != nil {
log.WithError(err).WithField("flowId", id).
Warnf("couldn't delete flow entry")
@@ -326,7 +326,7 @@ func (m *FlowFetcher) LookupAndDeleteMap() map[BpfFlowId][]BpfFlowMetrics {
// We observed that eBFP PerCPU map might insert multiple times the same key in the map
// (probably due to race conditions) so we need to re-join metrics again at userspace
// TODO: instrument how many times the keys are is repeated in the same eviction
- flows[id] = append(flows[id], metrics...)
+ flow[id] = metric
}
- return flows
+ return flow
}
diff --git a/pkg/flow/account.go b/pkg/flow/account.go
index bca28dd42..c6a0cbbf7 100644
--- a/pkg/flow/account.go
+++ b/pkg/flow/account.go
@@ -65,9 +65,7 @@ func (c *Accounter) Account(in <-chan *RawRecord, out chan<- []*Record) {
alog.Debug("exiting account routine")
return
}
- if stored, ok := c.entries[record.Id]; ok {
- Accumulate(stored, &record.Metrics)
- } else {
+ if _, ok := c.entries[record.Id]; !ok {
if len(c.entries) >= c.maxEntries {
evictingEntries := c.entries
c.entries = map[ebpf.BpfFlowId]*ebpf.BpfFlowMetrics{}
diff --git a/pkg/flow/account_test.go b/pkg/flow/account_test.go
index 160054932..c53df299c 100644
--- a/pkg/flow/account_test.go
+++ b/pkg/flow/account_test.go
@@ -104,11 +104,11 @@ func TestEvict_MaxEntries(t *testing.T) {
RawRecord: RawRecord{
Id: k1,
Metrics: ebpf.BpfFlowMetrics{
- Bytes: 444, Packets: 2, StartMonoTimeTs: 123, EndMonoTimeTs: 789, Flags: 1,
+ Bytes: 123, Packets: 1, StartMonoTimeTs: 123, EndMonoTimeTs: 123, Flags: 1,
},
},
TimeFlowStart: now.Add(-(1000 - 123) * time.Nanosecond),
- TimeFlowEnd: now.Add(-(1000 - 789) * time.Nanosecond),
+ TimeFlowEnd: now.Add(-(1000 - 123) * time.Nanosecond),
},
k2: {
RawRecord: RawRecord{
@@ -178,15 +178,15 @@ func TestEvict_Period(t *testing.T) {
RawRecord: RawRecord{
Id: k1,
Metrics: ebpf.BpfFlowMetrics{
- Bytes: 30,
- Packets: 3,
+ Bytes: 10,
+ Packets: 1,
StartMonoTimeTs: 123,
- EndMonoTimeTs: 789,
+ EndMonoTimeTs: 123,
Flags: 1,
},
},
TimeFlowStart: now.Add(-1000 + 123),
- TimeFlowEnd: now.Add(-1000 + 789),
+ TimeFlowEnd: now.Add(-1000 + 123),
}, *records[0])
records = receiveTimeout(t, evictor)
require.Len(t, records, 1)
@@ -194,15 +194,15 @@ func TestEvict_Period(t *testing.T) {
RawRecord: RawRecord{
Id: k1,
Metrics: ebpf.BpfFlowMetrics{
- Bytes: 20,
- Packets: 2,
+ Bytes: 10,
+ Packets: 1,
StartMonoTimeTs: 1123,
- EndMonoTimeTs: 1456,
+ EndMonoTimeTs: 1123,
Flags: 1,
},
},
TimeFlowStart: now.Add(-1000 + 1123),
- TimeFlowEnd: now.Add(-1000 + 1456),
+ TimeFlowEnd: now.Add(-1000 + 1123),
}, *records[0])
// no more flows are evicted
diff --git a/pkg/flow/record.go b/pkg/flow/record.go
index 4dc8d2a98..60dc0148a 100644
--- a/pkg/flow/record.go
+++ b/pkg/flow/record.go
@@ -70,19 +70,6 @@ func NewRecord(
}
}
-func Accumulate(r *ebpf.BpfFlowMetrics, src *ebpf.BpfFlowMetrics) {
- // time == 0 if the value has not been yet set
- if r.StartMonoTimeTs == 0 || r.StartMonoTimeTs > src.StartMonoTimeTs {
- r.StartMonoTimeTs = src.StartMonoTimeTs
- }
- if r.EndMonoTimeTs == 0 || r.EndMonoTimeTs < src.EndMonoTimeTs {
- r.EndMonoTimeTs = src.EndMonoTimeTs
- }
- r.Bytes += src.Bytes
- r.Packets += src.Packets
- r.Flags |= src.Flags
-}
-
// IP returns the net.IP equivalent object
func IP(ia IPAddr) net.IP {
return ia[:]
diff --git a/pkg/flow/tracer_map.go b/pkg/flow/tracer_map.go
index 3567592ce..563c2850b 100644
--- a/pkg/flow/tracer_map.go
+++ b/pkg/flow/tracer_map.go
@@ -25,7 +25,7 @@ type MapTracer struct {
}
type mapFetcher interface {
- LookupAndDeleteMap() map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics
+ LookupAndDeleteMap() map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics
}
func NewMapTracer(fetcher mapFetcher, evictionTimeout time.Duration) *MapTracer {
@@ -92,7 +92,7 @@ func (m *MapTracer) evictFlows(ctx context.Context, forwardFlows chan<- []*Recor
var forwardingFlows []*Record
laterFlowNs := uint64(0)
for flowKey, flowMetrics := range m.mapFetcher.LookupAndDeleteMap() {
- aggregatedMetrics := m.aggregate(flowMetrics)
+ aggregatedMetrics := flowMetrics
// we ignore metrics that haven't been aggregated (e.g. all the mapped values are ignored)
if aggregatedMetrics.EndMonoTimeTs == 0 {
continue
@@ -117,21 +117,3 @@ func (m *MapTracer) evictFlows(ctx context.Context, forwardFlows chan<- []*Recor
}
mtlog.Debugf("%d flows evicted", len(forwardingFlows))
}
-
-func (m *MapTracer) aggregate(metrics []ebpf.BpfFlowMetrics) ebpf.BpfFlowMetrics {
- if len(metrics) == 0 {
- mtlog.Warn("invoked aggregate with no values")
- return ebpf.BpfFlowMetrics{}
- }
- aggr := ebpf.BpfFlowMetrics{}
- for _, mt := range metrics {
- // eBPF hashmap values are not zeroed when the entry is removed. That causes that we
- // might receive entries from previous collect-eviction timeslots.
- // We need to check the flow time and discard old flows.
- if mt.StartMonoTimeTs <= m.lastEvictionNs || mt.EndMonoTimeTs <= m.lastEvictionNs {
- continue
- }
- Accumulate(&aggr, &mt)
- }
- return aggr
-}
diff --git a/pkg/flow/tracer_map_test.go b/pkg/flow/tracer_map_test.go
index 4486992d1..9ea7c1680 100644
--- a/pkg/flow/tracer_map_test.go
+++ b/pkg/flow/tracer_map_test.go
@@ -11,36 +11,25 @@ import (
func TestPacketAggregation(t *testing.T) {
type testCase struct {
- input []ebpf.BpfFlowMetrics
+ input ebpf.BpfFlowMetrics
expected ebpf.BpfFlowMetrics
}
tcs := []testCase{{
- input: []ebpf.BpfFlowMetrics{
- {Packets: 0, Bytes: 0, StartMonoTimeTs: 0, EndMonoTimeTs: 0, Flags: 1},
- {Packets: 0x7, Bytes: 0x22d, StartMonoTimeTs: 0x176a790b240b, EndMonoTimeTs: 0x176a792a755b, Flags: 1},
- {Packets: 0x0, Bytes: 0x0, StartMonoTimeTs: 0x0, EndMonoTimeTs: 0x0, Flags: 1},
- {Packets: 0x0, Bytes: 0x0, StartMonoTimeTs: 0x0, EndMonoTimeTs: 0x0, Flags: 1},
- },
+ input: ebpf.BpfFlowMetrics{Packets: 0x7, Bytes: 0x22d, StartMonoTimeTs: 0x176a790b240b, EndMonoTimeTs: 0x176a792a755b, Flags: 1},
expected: ebpf.BpfFlowMetrics{
Packets: 0x7, Bytes: 0x22d, StartMonoTimeTs: 0x176a790b240b, EndMonoTimeTs: 0x176a792a755b, Flags: 1,
},
}, {
- input: []ebpf.BpfFlowMetrics{
- {Packets: 0x3, Bytes: 0x5c4, StartMonoTimeTs: 0x17f3e9613a7f, EndMonoTimeTs: 0x17f3e979816e, Flags: 1},
- {Packets: 0x2, Bytes: 0x8c, StartMonoTimeTs: 0x17f3e9633a7f, EndMonoTimeTs: 0x17f3e96f164e, Flags: 1},
- {Packets: 0x0, Bytes: 0x0, StartMonoTimeTs: 0x0, EndMonoTimeTs: 0x0, Flags: 1},
- {Packets: 0x0, Bytes: 0x0, StartMonoTimeTs: 0x0, EndMonoTimeTs: 0x0, Flags: 1},
- },
+ input: ebpf.BpfFlowMetrics{Packets: 0x5, Bytes: 0x5c4, StartMonoTimeTs: 0x17f3e9613a7f, EndMonoTimeTs: 0x17f3e979816e, Flags: 1},
expected: ebpf.BpfFlowMetrics{
- Packets: 0x5, Bytes: 0x5c4 + 0x8c, StartMonoTimeTs: 0x17f3e9613a7f, EndMonoTimeTs: 0x17f3e979816e, Flags: 1,
+ Packets: 0x5, Bytes: 0x5c4, StartMonoTimeTs: 0x17f3e9613a7f, EndMonoTimeTs: 0x17f3e979816e, Flags: 1,
},
}}
- ft := MapTracer{}
for i, tc := range tcs {
t.Run(fmt.Sprint(i), func(t *testing.T) {
assert.Equal(t,
tc.expected,
- ft.aggregate(tc.input))
+ tc.input)
})
}
}
diff --git a/pkg/test/tracer_fake.go b/pkg/test/tracer_fake.go
index 0943b7673..495acfca5 100644
--- a/pkg/test/tracer_fake.go
+++ b/pkg/test/tracer_fake.go
@@ -13,14 +13,14 @@ import (
// TracerFake fakes the kernel-side eBPF map structures for testing
type TracerFake struct {
interfaces map[ifaces.Interface]struct{}
- mapLookups chan map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics
+ mapLookups chan map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics
ringBuf chan ringbuf.Record
}
func NewTracerFake() *TracerFake {
return &TracerFake{
interfaces: map[ifaces.Interface]struct{}{},
- mapLookups: make(chan map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics, 100),
+ mapLookups: make(chan map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics, 100),
ringBuf: make(chan ringbuf.Record, 100),
}
}
@@ -33,12 +33,12 @@ func (m *TracerFake) Register(iface ifaces.Interface) error {
return nil
}
-func (m *TracerFake) LookupAndDeleteMap() map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics {
+func (m *TracerFake) LookupAndDeleteMap() map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics {
select {
case r := <-m.mapLookups:
return r
default:
- return map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics{}
+ return map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics{}
}
}
@@ -46,7 +46,7 @@ func (m *TracerFake) ReadRingBuf() (ringbuf.Record, error) {
return <-m.ringBuf, nil
}
-func (m *TracerFake) AppendLookupResults(results map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics) {
+func (m *TracerFake) AppendLookupResults(results map[ebpf.BpfFlowId]ebpf.BpfFlowMetrics) {
m.mapLookups <- results
}