Skip to content

Commit

Permalink
Restore L2 drops
Browse files Browse the repository at this point in the history
  • Loading branch information
jotak committed Nov 22, 2023
1 parent 2a848d8 commit 35398d6
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 59 deletions.
58 changes: 27 additions & 31 deletions pkg/decode/decode_protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,40 +67,36 @@ func PBFlowToMap(flow *pbflow.Record) config.GenericMap {
}

ethType := ethernet.EtherType(flow.EthProtocol)
if ethType == ethernet.EtherTypeIPv4 || ethType == ethernet.EtherTypeIPv6 {
out["SrcAddr"] = ipToStr(flow.Network.GetSrcAddr())
out["DstAddr"] = ipToStr(flow.Network.GetDstAddr())
out["Proto"] = flow.Transport.GetProtocol()
out["Dscp"] = flow.Network.GetDscp()
proto := flow.Transport.GetProtocol()
if proto == syscall.IPPROTO_ICMP || proto == syscall.IPPROTO_ICMPV6 {
out["IcmpType"] = flow.GetIcmpType()
out["IcmpCode"] = flow.GetIcmpCode()
}

if ethType != ethernet.EtherTypeIPv4 && ethType != ethernet.EtherTypeIPv6 {
// e.g. ARP
return out
}

out["SrcAddr"] = ipToStr(flow.Network.GetSrcAddr())
out["DstAddr"] = ipToStr(flow.Network.GetDstAddr())
out["Proto"] = flow.Transport.GetProtocol()
out["Dscp"] = flow.Network.GetDscp()
proto := flow.Transport.GetProtocol()
if proto == syscall.IPPROTO_ICMP || proto == syscall.IPPROTO_ICMPV6 {
out["IcmpType"] = flow.GetIcmpType()
out["IcmpCode"] = flow.GetIcmpCode()
}

if proto == syscall.IPPROTO_TCP || proto == syscall.IPPROTO_UDP || proto == syscall.IPPROTO_SCTP {
if proto == syscall.IPPROTO_TCP {
out["SrcPort"] = flow.Transport.GetSrcPort()
out["DstPort"] = flow.Transport.GetDstPort()
out["Flags"] = flow.Flags
} else {
out["SrcPort"] = flow.Transport.GetSrcPort()
out["DstPort"] = flow.Transport.GetDstPort()
if proto == syscall.IPPROTO_TCP || proto == syscall.IPPROTO_UDP || proto == syscall.IPPROTO_SCTP {
if proto == syscall.IPPROTO_TCP {
out["SrcPort"] = flow.Transport.GetSrcPort()
out["DstPort"] = flow.Transport.GetDstPort()
out["Flags"] = flow.Flags
} else {
out["SrcPort"] = flow.Transport.GetSrcPort()
out["DstPort"] = flow.Transport.GetDstPort()
}
}
}

out["DnsErrno"] = flow.GetDnsErrno()
if flow.GetDnsId() != 0 {
out["DnsLatencyMs"] = flow.DnsLatency.AsDuration().Milliseconds()
out["DnsId"] = flow.GetDnsId()
out["DnsFlags"] = flow.GetDnsFlags()
out["DnsFlagsResponseCode"] = DNSRcodeToStr(flow.GetDnsFlags() & 0xF)
out["DnsErrno"] = uint32(0)
out["DnsErrno"] = flow.GetDnsErrno()
if flow.GetDnsId() != 0 {
out["DnsLatencyMs"] = flow.DnsLatency.AsDuration().Milliseconds()
out["DnsId"] = flow.GetDnsId()
out["DnsFlags"] = flow.GetDnsFlags()
out["DnsFlagsResponseCode"] = DNSRcodeToStr(flow.GetDnsFlags() & 0xF)
out["DnsErrno"] = uint32(0)
}
}

if flow.GetPktDropLatestDropCause() != 0 {
Expand Down
53 changes: 25 additions & 28 deletions pkg/exporter/convert_flp.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,38 +38,35 @@ func ConvertToFLP(fr *flow.Record) config.GenericMap {
out["Packets"] = fr.Metrics.Packets
}

if fr.Id.EthProtocol != uint16(ethernet.EtherTypeIPv4) && fr.Id.EthProtocol != uint16(ethernet.EtherTypeIPv6) {
// e.g. ARP
return out
}

out["SrcAddr"] = flow.IP(fr.Id.SrcIp).String()
out["DstAddr"] = flow.IP(fr.Id.DstIp).String()
out["Proto"] = fr.Id.TransportProtocol
out["Dscp"] = fr.Metrics.Dscp
if fr.Id.EthProtocol == uint16(ethernet.EtherTypeIPv4) || fr.Id.EthProtocol == uint16(ethernet.EtherTypeIPv6) {
out["SrcAddr"] = flow.IP(fr.Id.SrcIp).String()
out["DstAddr"] = flow.IP(fr.Id.DstIp).String()
out["Proto"] = fr.Id.TransportProtocol
out["Dscp"] = fr.Metrics.Dscp

if fr.Id.TransportProtocol == syscall.IPPROTO_ICMP || fr.Id.TransportProtocol == syscall.IPPROTO_ICMPV6 {
out["IcmpType"] = fr.Id.IcmpType
out["IcmpCode"] = fr.Id.IcmpCode
} else if fr.Id.TransportProtocol == syscall.IPPROTO_TCP || fr.Id.TransportProtocol == syscall.IPPROTO_UDP || fr.Id.TransportProtocol == syscall.IPPROTO_SCTP {
out["SrcPort"] = fr.Id.SrcPort
out["DstPort"] = fr.Id.DstPort
if fr.Id.TransportProtocol == syscall.IPPROTO_TCP {
out["Flags"] = fr.Metrics.Flags
if fr.Id.TransportProtocol == syscall.IPPROTO_ICMP || fr.Id.TransportProtocol == syscall.IPPROTO_ICMPV6 {
out["IcmpType"] = fr.Id.IcmpType
out["IcmpCode"] = fr.Id.IcmpCode
} else if fr.Id.TransportProtocol == syscall.IPPROTO_TCP || fr.Id.TransportProtocol == syscall.IPPROTO_UDP || fr.Id.TransportProtocol == syscall.IPPROTO_SCTP {
out["SrcPort"] = fr.Id.SrcPort
out["DstPort"] = fr.Id.DstPort
if fr.Id.TransportProtocol == syscall.IPPROTO_TCP {
out["Flags"] = fr.Metrics.Flags
}
}
}

out["DnsErrno"] = fr.Metrics.DnsRecord.Errno
dnsID := fr.Metrics.DnsRecord.Id
if dnsID != 0 {
out["DnsId"] = dnsID
out["DnsFlags"] = fr.Metrics.DnsRecord.Flags
out["DnsFlagsResponseCode"] = decode.DNSRcodeToStr(uint32(fr.Metrics.DnsRecord.Flags) & 0xF)
if fr.Metrics.DnsRecord.Latency != 0 {
out["DnsLatencyMs"] = fr.DNSLatency.Milliseconds()
out["DnsErrno"] = fr.Metrics.DnsRecord.Errno
dnsID := fr.Metrics.DnsRecord.Id
if dnsID != 0 {
out["DnsId"] = dnsID
out["DnsFlags"] = fr.Metrics.DnsRecord.Flags
out["DnsFlagsResponseCode"] = decode.DNSRcodeToStr(uint32(fr.Metrics.DnsRecord.Flags) & 0xF)
if fr.Metrics.DnsRecord.Latency != 0 {
out["DnsLatencyMs"] = fr.DNSLatency.Milliseconds()
}
// Not sure about the logic here, why erasing errno?
out["DnsErrno"] = uint32(0)
}
// Not sure about the logic here, why erasing errno?
out["DnsErrno"] = uint32(0)
}

if fr.Metrics.PktDrops.LatestDropCause != 0 {
Expand Down
51 changes: 51 additions & 0 deletions pkg/exporter/converters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,57 @@ func TestConversions(t *testing.T) {
"AgentIP": "10.11.12.13",
},
},
{
name: "L2 drops",
flow: &flow.Record{
RawRecord: flow.RawRecord{
Id: ebpf.BpfFlowId{
EthProtocol: 2054, // ARP protocol
Direction: flow.DirectionEgress,
SrcMac: flow.MacAddr{0x04, 0x05, 0x06, 0x07, 0x08, 0x09},
DstMac: flow.MacAddr{0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f},
SrcIp: flow.IPAddr{},
DstIp: flow.IPAddr{},
SrcPort: 0,
DstPort: 0,
TransportProtocol: 0,
},
Metrics: ebpf.BpfFlowMetrics{
Bytes: 500,
Packets: 128,
PktDrops: ebpf.BpfPktDropsT{
Packets: 10,
Bytes: 100,
LatestFlags: 0x200,
LatestState: 0,
LatestDropCause: 2,
},
},
},
Interface: "eth0",
TimeFlowStart: someTime,
TimeFlowEnd: someTime,
AgentIP: net.IPv4(0x0a, 0x0b, 0x0c, 0x0d),
},
expected: &config.GenericMap{
"FlowDirection": 1,
"Bytes": 500,
"DstMac": "0A:0B:0C:0D:0E:0F",
"SrcMac": "04:05:06:07:08:09",
"Duplicate": false,
"Etype": 2054,
"Packets": 128,
"TimeFlowStartMs": someTime.UnixMilli(),
"TimeFlowEndMs": someTime.UnixMilli(),
"Interface": "eth0",
"AgentIP": "10.11.12.13",
"PktDropBytes": 100,
"PktDropPackets": 10,
"PktDropLatestFlags": 0x200,
"PktDropLatestState": "TCP_INVALID_STATE",
"PktDropLatestDropCause": "SKB_DROP_REASON_NOT_SPECIFIED",
},
},
{
name: "TCP + drop + DNS + RTT record",
flow: &flow.Record{
Expand Down

0 comments on commit 35398d6

Please sign in to comment.