diff --git a/bpf/configs.h b/bpf/configs.h index fcf55db55..cbec7a1f7 100644 --- a/bpf/configs.h +++ b/bpf/configs.h @@ -5,7 +5,6 @@ // Constant definitions, to be overridden by the invoker volatile const u32 sampling = 0; volatile const u8 trace_messages = 0; -volatile const u8 enable_rtt = 0; volatile const u16 pca_port = 0; volatile const u8 pca_proto = 0; volatile const u8 enable_dns_tracking = 0; diff --git a/bpf/flows.c b/bpf/flows.c index 8e633a12a..30c7b2c97 100644 --- a/bpf/flows.c +++ b/bpf/flows.c @@ -66,12 +66,6 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { id.if_index = skb->ifindex; id.direction = direction; - // We calculate the RTT before looking up aggregated_flows map because we want - // to keep the critical section between map lookup and update consume minimum time. - if (enable_rtt) { - // This is currently not to be enabled by default. - calculate_flow_rtt(&pkt, direction, data_end); - } int dns_errno = 0; if (enable_dns_tracking) { dns_errno = track_dns_packet(skb, &pkt); @@ -90,10 +84,6 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { } aggregate_flow->flags |= pkt.flags; aggregate_flow->dscp = pkt.dscp; - // Does not matter the gate. Will be zero if not enabled. - if (pkt.rtt > aggregate_flow->flow_rtt) { - aggregate_flow->flow_rtt = pkt.rtt; - } aggregate_flow->dns_record.id = pkt.dns_id; aggregate_flow->dns_record.flags = pkt.dns_flags; aggregate_flow->dns_record.latency = pkt.dns_latency; @@ -115,7 +105,6 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) { .start_mono_time_ts = pkt.current_ts, .end_mono_time_ts = pkt.current_ts, .flags = pkt.flags, - .flow_rtt = pkt.rtt, .dscp = pkt.dscp, .dns_record.id = pkt.dns_id, .dns_record.flags = pkt.dns_flags, diff --git a/bpf/maps_definition.h b/bpf/maps_definition.h index d49743bb6..69ead7faa 100644 --- a/bpf/maps_definition.h +++ b/bpf/maps_definition.h @@ -18,18 +18,6 @@ struct { __uint(map_flags, BPF_F_NO_PREALLOC); } aggregated_flows SEC(".maps"); -// Common hashmap to keep track of all flow sequences. -// Key is flow_seq_id which is standard 4 tuple and a sequence id -// sequence id is specific to the type of transport protocol -// Value is u64 which represents the occurrence timestamp of the packet. -struct { - __uint(type, BPF_MAP_TYPE_HASH); - __uint(max_entries, 1 << 20); // Will take around 64MB of space. - __type(key, flow_seq_id); - __type(value, u64); - __uint(map_flags, BPF_F_NO_PREALLOC); -} flow_sequences SEC(".maps"); - //PerfEvent Array for Packet Payloads struct { __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); diff --git a/bpf/rtt_tracker.h b/bpf/rtt_tracker.h index a5cd98f8e..36dd1e9d6 100644 --- a/bpf/rtt_tracker.h +++ b/bpf/rtt_tracker.h @@ -1,143 +1,148 @@ /* - A simple RTT tracker implemented to be used at the ebpf layer inside the flow_monitor hookpoint. - This tracker currently tracks RTT for TCP flows by looking at the TCP start sequence and estimates - RTT by perform (timestamp of receiveing ack packet - timestamp of sending syn packet) + A simple RTT tracker implemented to be using eBPF kprobe hook. */ #ifndef __RTT_TRACKER_H__ #define __RTT_TRACKER_H__ +#include +#include #include "utils.h" #include "maps_definition.h" -const u64 MIN_RTT = 50000; //50 micro seconds +static __always_inline void rtt_fill_in_l2(struct sk_buff *skb, flow_id *id) { + struct ethhdr eth; -static __always_inline void fill_flow_seq_id(flow_seq_id *seq_id, pkt_info *pkt, u32 seq, bool reverse) { - flow_id *id = pkt->id; - if (reverse) { - __builtin_memcpy(seq_id->src_ip, id->dst_ip, IP_MAX_LEN); - __builtin_memcpy(seq_id->dst_ip, id->src_ip, IP_MAX_LEN); - seq_id->src_port = id->dst_port; - seq_id->dst_port = id->src_port; - } else { - __builtin_memcpy(seq_id->src_ip, id->src_ip, IP_MAX_LEN); - __builtin_memcpy(seq_id->dst_ip, id->dst_ip, IP_MAX_LEN); - seq_id->src_port = id->src_port; - seq_id->dst_port = id->dst_port; - } - seq_id->transport_protocol = id->transport_protocol; - seq_id->seq_id = seq; - seq_id->if_index = id->if_index; -} + __builtin_memset(ð, 0, sizeof(eth)); -static __always_inline void reverse_flow_id_struct(flow_id *src, flow_id *dst) { - // Fields which remain same - dst->eth_protocol = src->eth_protocol; - dst->transport_protocol = src->transport_protocol; - dst->if_index = src->if_index; - - // Fields which should be reversed - dst->direction = (src->direction == INGRESS) ? EGRESS : INGRESS; - __builtin_memcpy(dst->src_mac, src->dst_mac, ETH_ALEN); - __builtin_memcpy(dst->dst_mac, src->src_mac, ETH_ALEN); - __builtin_memcpy(dst->src_ip, src->dst_ip, IP_MAX_LEN); - __builtin_memcpy(dst->dst_ip, src->src_ip, IP_MAX_LEN); - dst->src_port = src->dst_port; - dst->dst_port = src->src_port; - /* ICMP type can be ignore for now. We only deal with TCP packets for now.*/ -} + u8 *skb_head = BPF_CORE_READ(skb, head); + u16 skb_mac_header = BPF_CORE_READ(skb, mac_header); -static __always_inline void update_reverse_flow_rtt(pkt_info *pkt, u32 seq) { - flow_id rev_flow_id; - __builtin_memset(&rev_flow_id, 0, sizeof(rev_flow_id)); - reverse_flow_id_struct(pkt->id, &rev_flow_id); - - flow_metrics *reverse_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &rev_flow_id); - if (reverse_flow != NULL) { - if (pkt->rtt > reverse_flow->flow_rtt) { - reverse_flow->flow_rtt = pkt->rtt; - long ret = bpf_map_update_elem(&aggregated_flows, &rev_flow_id, reverse_flow, BPF_EXIST); - if (trace_messages && ret != 0) { - bpf_printk("error updating rtt value in flow %d\n", ret); - } - } - } + bpf_probe_read(ð, sizeof(eth), (struct ethhdr *)(skb_head + skb_mac_header)); + __builtin_memcpy(id->dst_mac, eth.h_dest, ETH_ALEN); + __builtin_memcpy(id->src_mac, eth.h_source, ETH_ALEN); + id->eth_protocol = bpf_ntohs(eth.h_proto); } -static __always_inline void __calculate_tcp_rtt(pkt_info *pkt, struct tcphdr *tcp, flow_seq_id *seq_id) { - // Stored sequence should be ack_seq - 1 - u32 seq = bpf_ntohl(tcp->ack_seq) - 1; - // check reversed flow - fill_flow_seq_id(seq_id, pkt, seq, true); - - u64 *prev_ts = (u64 *)bpf_map_lookup_elem(&flow_sequences, seq_id); - if (prev_ts != NULL) { - u64 rtt = pkt->current_ts - *prev_ts; - /** - * FIXME: Because of SAMPLING the way it is done if we miss one of SYN/SYN+ACK/ACK - * then we can get RTT values which are the process response time rather than actual RTT. - * This check below clears them out but needs to be modified with a better solution or change - * the algorithm for calculating RTT so it doesn't interact with SAMPLING like this. - */ - if (rtt < MIN_RTT) { - return; +static __always_inline void rtt_fill_in_l3(struct sk_buff *skb, flow_id *id, u16 family, u8 *dscp) { + u16 skb_network_header = BPF_CORE_READ(skb, network_header); + u8 *skb_head = BPF_CORE_READ(skb, head); + + switch (family) { + case AF_INET: { + struct iphdr ip; + __builtin_memset(&ip, 0, sizeof(ip)); + bpf_probe_read(&ip, sizeof(ip), (struct iphdr *)(skb_head + skb_network_header)); + __builtin_memcpy(id->src_ip, ip4in6, sizeof(ip4in6)); + __builtin_memcpy(id->dst_ip, ip4in6, sizeof(ip4in6)); + __builtin_memcpy(id->src_ip + sizeof(ip4in6), &ip.saddr, sizeof(ip.saddr)); + __builtin_memcpy(id->dst_ip + sizeof(ip4in6), &ip.daddr, sizeof(ip.daddr)); + *dscp = ipv4_get_dscp(&ip); + break; } - pkt->rtt = rtt; - // Delete the flow from flow sequence map so if it - // restarts we have a new RTT calculation. - long ret = bpf_map_delete_elem(&flow_sequences, seq_id); - if (trace_messages && ret != 0) { - bpf_printk("error evicting flow sequence: %d", ret); + case AF_INET6: { + struct ipv6hdr ip; + __builtin_memset(&ip, 0, sizeof(ip)); + bpf_probe_read(&ip, sizeof(ip), (struct ipv6hdr *)(skb_head + skb_network_header)); + __builtin_memcpy(id->src_ip, ip.saddr.in6_u.u6_addr8, IP_MAX_LEN); + __builtin_memcpy(id->dst_ip, ip.daddr.in6_u.u6_addr8, IP_MAX_LEN); + *dscp = ipv6_get_dscp(&ip); + break; } - // This is an ACK packet with valid sequence id so a SYN must - // have been sent. We can safely update the reverse flow RTT here. - update_reverse_flow_rtt(pkt, seq); + default: + return; } - return; } -static __always_inline void __store_tcp_ts(pkt_info *pkt, struct tcphdr *tcp, flow_seq_id *seq_id) { - // store timestamp of syn packets. - u32 seq = bpf_ntohl(tcp->seq); - fill_flow_seq_id(seq_id, pkt, seq, false); - long ret = bpf_map_update_elem(&flow_sequences, seq_id, &pkt->current_ts, BPF_NOEXIST); - if (trace_messages && ret != 0) { - bpf_printk("err saving flow sequence record %d", ret); - } - return; +static __always_inline void rtt_fill_in_tcp(struct sk_buff *skb, flow_id *id, u16 *flags) { + u16 skb_transport_header = BPF_CORE_READ(skb, transport_header); + u8 *skb_head = BPF_CORE_READ(skb, head); + struct tcphdr tcp; + u16 sport, dport; + + __builtin_memset(&tcp, 0, sizeof(tcp)); + + bpf_probe_read(&tcp, sizeof(tcp), (struct tcphdr *)(skb_head + skb_transport_header)); + sport = bpf_ntohs(tcp.source); + dport = bpf_ntohs(tcp.dest); + id->src_port = sport; + id->dst_port = dport; + set_flags(&tcp, flags); + id->transport_protocol = IPPROTO_TCP; } -static __always_inline void calculate_flow_rtt_tcp(pkt_info *pkt, u8 direction, void *data_end, flow_seq_id *seq_id) { - struct tcphdr *tcp = (struct tcphdr *) pkt->l4_hdr; - if ( !tcp || ((void *)tcp + sizeof(*tcp) > data_end) ) { - return; - } +static __always_inline int calculate_flow_rtt_tcp(struct sock *sk, struct sk_buff *skb) { + struct tcp_sock *ts; + u16 family, flags; + u64 rtt, len; + int ret = 0; + flow_id id; + u8 dscp; - /* We calculate RTT for both SYN/SYN+ACK and SYN+ACK/ACK and take the maximum of both.*/ - if (tcp->syn && tcp->ack) { // SYN ACK Packet - __calculate_tcp_rtt(pkt, tcp, seq_id); - __store_tcp_ts(pkt, tcp, seq_id); + if (skb == NULL) { + return 0; } - else if (tcp->ack) { - __calculate_tcp_rtt(pkt, tcp, seq_id); - } - else if (tcp->syn) { - __store_tcp_ts(pkt, tcp, seq_id); + + __builtin_memset(&id, 0, sizeof(id)); + + id.if_index = BPF_CORE_READ(skb, skb_iif); + len = BPF_CORE_READ(skb, len); + + // read L2 info + rtt_fill_in_l2(skb, &id); + + family = BPF_CORE_READ(sk, __sk_common.skc_family); + + // read L3 info + rtt_fill_in_l3(skb, &id, family, &dscp); + + // read TCP info + rtt_fill_in_tcp(skb, &id, &flags); + + // read TCP socket rtt and store it in nanoseconds + ts = (struct tcp_sock *)(sk); + rtt = BPF_CORE_READ(ts, srtt_us) >> 3; + rtt *= 1000u; + u64 current_ts = bpf_ktime_get_ns(); + flow_metrics *aggregate_flow = bpf_map_lookup_elem(&aggregated_flows, &id); + if (aggregate_flow != NULL) { + aggregate_flow->end_mono_time_ts = current_ts; + aggregate_flow->flow_rtt = rtt; + aggregate_flow->dscp = dscp; + aggregate_flow->flags |= flags; + aggregate_flow->packets ++; + aggregate_flow->bytes += len; + ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY); + if (trace_messages && ret != 0) { + bpf_printk("error rtt track updating flow %d\n", ret); + } + } else { + flow_metrics new_flow = { + .packets = 1, + .bytes = len, + .start_mono_time_ts = current_ts, + .end_mono_time_ts = current_ts, + .flags = flags, + .flow_rtt = rtt, + .dscp = dscp, + }; + ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY); + if (trace_messages && ret != 0) { + bpf_printk("error creating rtt flow %d\n", ret); + } } + return ret; } -static __always_inline void calculate_flow_rtt(pkt_info *pkt, u8 direction, void *data_end) { - flow_seq_id seq_id; - __builtin_memset(&seq_id, 0, sizeof(flow_seq_id)); +SEC("kprobe/tcp_rcv_established") +int tcp_rcv_kprobe(struct pt_regs *regs) { + struct sock *sk; + struct sk_buff *skb; - switch (pkt->id->transport_protocol) - { - case IPPROTO_TCP: - calculate_flow_rtt_tcp(pkt, direction, data_end, &seq_id); - break; - default: - break; - } + sk = (struct sock *)PT_REGS_PARM1_CORE(regs); + skb = (struct sk_buff *)PT_REGS_PARM2_CORE(regs); + + return calculate_flow_rtt_tcp(sk, skb); } #endif /* __RTT_TRACKER_H__ */ \ No newline at end of file diff --git a/bpf/types.h b/bpf/types.h index 7d50bcc59..4b544c439 100644 --- a/bpf/types.h +++ b/bpf/types.h @@ -158,7 +158,6 @@ typedef struct pkt_info_t { u64 current_ts; // ts recorded when pkt came. u16 flags; // TCP specific void *l4_hdr; // Stores the actual l4 header - u64 rtt; // rtt calculated from the flow if possible. else zero u8 dscp; // IPv4/6 DSCP value u16 dns_id; u16 dns_flags; diff --git a/pkg/ebpf/bpf_bpfeb.go b/pkg/ebpf/bpf_bpfeb.go index 7f6ceebd8..4be831d3e 100644 --- a/pkg/ebpf/bpf_bpfeb.go +++ b/pkg/ebpf/bpf_bpfeb.go @@ -65,16 +65,6 @@ type BpfFlowRecordT struct { Metrics BpfFlowMetrics } -type BpfFlowSeqId struct { - SrcPort uint16 - DstPort uint16 - SrcIp [16]uint8 - DstIp [16]uint8 - SeqId uint32 - TransportProtocol uint8 - IfIndex uint32 -} - type BpfPktDropsT struct { Packets uint32 Bytes uint64 @@ -129,6 +119,7 @@ type BpfProgramSpecs struct { IngressFlowParse *ebpf.ProgramSpec `ebpf:"ingress_flow_parse"` IngressPcaParse *ebpf.ProgramSpec `ebpf:"ingress_pca_parse"` KfreeSkb *ebpf.ProgramSpec `ebpf:"kfree_skb"` + TcpRcvKprobe *ebpf.ProgramSpec `ebpf:"tcp_rcv_kprobe"` } // BpfMapSpecs contains maps before they are loaded into the kernel. @@ -138,7 +129,6 @@ type BpfMapSpecs struct { AggregatedFlows *ebpf.MapSpec `ebpf:"aggregated_flows"` DirectFlows *ebpf.MapSpec `ebpf:"direct_flows"` DnsFlows *ebpf.MapSpec `ebpf:"dns_flows"` - FlowSequences *ebpf.MapSpec `ebpf:"flow_sequences"` PacketRecord *ebpf.MapSpec `ebpf:"packet_record"` } @@ -164,7 +154,6 @@ type BpfMaps struct { AggregatedFlows *ebpf.Map `ebpf:"aggregated_flows"` DirectFlows *ebpf.Map `ebpf:"direct_flows"` DnsFlows *ebpf.Map `ebpf:"dns_flows"` - FlowSequences *ebpf.Map `ebpf:"flow_sequences"` PacketRecord *ebpf.Map `ebpf:"packet_record"` } @@ -173,7 +162,6 @@ func (m *BpfMaps) Close() error { m.AggregatedFlows, m.DirectFlows, m.DnsFlows, - m.FlowSequences, m.PacketRecord, ) } @@ -187,6 +175,7 @@ type BpfPrograms struct { IngressFlowParse *ebpf.Program `ebpf:"ingress_flow_parse"` IngressPcaParse *ebpf.Program `ebpf:"ingress_pca_parse"` KfreeSkb *ebpf.Program `ebpf:"kfree_skb"` + TcpRcvKprobe *ebpf.Program `ebpf:"tcp_rcv_kprobe"` } func (p *BpfPrograms) Close() error { @@ -196,6 +185,7 @@ func (p *BpfPrograms) Close() error { p.IngressFlowParse, p.IngressPcaParse, p.KfreeSkb, + p.TcpRcvKprobe, ) } diff --git a/pkg/ebpf/bpf_bpfeb.o b/pkg/ebpf/bpf_bpfeb.o index 0af59e932..ae955b9d6 100644 Binary files a/pkg/ebpf/bpf_bpfeb.o and b/pkg/ebpf/bpf_bpfeb.o differ diff --git a/pkg/ebpf/bpf_bpfel.o b/pkg/ebpf/bpf_bpfel.o deleted file mode 100644 index f938b3e45..000000000 Binary files a/pkg/ebpf/bpf_bpfel.o and /dev/null differ diff --git a/pkg/ebpf/bpf_bpfel_x86.go b/pkg/ebpf/bpf_bpfel_x86.go new file mode 100644 index 000000000..207e720f5 --- /dev/null +++ b/pkg/ebpf/bpf_bpfel_x86.go @@ -0,0 +1,204 @@ +// Code generated by bpf2go; DO NOT EDIT. +//go:build 386 || amd64 + +package ebpf + +import ( + "bytes" + _ "embed" + "fmt" + "io" + + "github.com/cilium/ebpf" +) + +type BpfDnsFlowId struct { + SrcPort uint16 + DstPort uint16 + SrcIp [16]uint8 + DstIp [16]uint8 + Id uint16 + Protocol uint8 +} + +type BpfDnsRecordT struct { + Id uint16 + Flags uint16 + Latency uint64 + Errno uint8 +} + +type BpfFlowId BpfFlowIdT + +type BpfFlowIdT struct { + EthProtocol uint16 + Direction uint8 + SrcMac [6]uint8 + DstMac [6]uint8 + SrcIp [16]uint8 + DstIp [16]uint8 + SrcPort uint16 + DstPort uint16 + TransportProtocol uint8 + IcmpType uint8 + IcmpCode uint8 + IfIndex uint32 +} + +type BpfFlowMetrics BpfFlowMetricsT + +type BpfFlowMetricsT struct { + Packets uint32 + Bytes uint64 + StartMonoTimeTs uint64 + EndMonoTimeTs uint64 + Flags uint16 + Errno uint8 + Dscp uint8 + PktDrops BpfPktDropsT + DnsRecord BpfDnsRecordT + FlowRtt uint64 +} + +type BpfFlowRecordT struct { + Id BpfFlowId + Metrics BpfFlowMetrics +} + +type BpfPktDropsT struct { + Packets uint32 + Bytes uint64 + LatestFlags uint16 + LatestState uint8 + LatestDropCause uint32 +} + +// LoadBpf returns the embedded CollectionSpec for Bpf. +func LoadBpf() (*ebpf.CollectionSpec, error) { + reader := bytes.NewReader(_BpfBytes) + spec, err := ebpf.LoadCollectionSpecFromReader(reader) + if err != nil { + return nil, fmt.Errorf("can't load Bpf: %w", err) + } + + return spec, err +} + +// LoadBpfObjects loads Bpf and converts it into a struct. +// +// The following types are suitable as obj argument: +// +// *BpfObjects +// *BpfPrograms +// *BpfMaps +// +// See ebpf.CollectionSpec.LoadAndAssign documentation for details. +func LoadBpfObjects(obj interface{}, opts *ebpf.CollectionOptions) error { + spec, err := LoadBpf() + if err != nil { + return err + } + + return spec.LoadAndAssign(obj, opts) +} + +// BpfSpecs contains maps and programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type BpfSpecs struct { + BpfProgramSpecs + BpfMapSpecs +} + +// BpfSpecs contains programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type BpfProgramSpecs struct { + EgressFlowParse *ebpf.ProgramSpec `ebpf:"egress_flow_parse"` + EgressPcaParse *ebpf.ProgramSpec `ebpf:"egress_pca_parse"` + IngressFlowParse *ebpf.ProgramSpec `ebpf:"ingress_flow_parse"` + IngressPcaParse *ebpf.ProgramSpec `ebpf:"ingress_pca_parse"` + KfreeSkb *ebpf.ProgramSpec `ebpf:"kfree_skb"` + TcpRcvKprobe *ebpf.ProgramSpec `ebpf:"tcp_rcv_kprobe"` +} + +// BpfMapSpecs contains maps before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type BpfMapSpecs struct { + AggregatedFlows *ebpf.MapSpec `ebpf:"aggregated_flows"` + DirectFlows *ebpf.MapSpec `ebpf:"direct_flows"` + DnsFlows *ebpf.MapSpec `ebpf:"dns_flows"` + PacketRecord *ebpf.MapSpec `ebpf:"packet_record"` +} + +// BpfObjects contains all objects after they have been loaded into the kernel. +// +// It can be passed to LoadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type BpfObjects struct { + BpfPrograms + BpfMaps +} + +func (o *BpfObjects) Close() error { + return _BpfClose( + &o.BpfPrograms, + &o.BpfMaps, + ) +} + +// BpfMaps contains all maps after they have been loaded into the kernel. +// +// It can be passed to LoadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type BpfMaps struct { + AggregatedFlows *ebpf.Map `ebpf:"aggregated_flows"` + DirectFlows *ebpf.Map `ebpf:"direct_flows"` + DnsFlows *ebpf.Map `ebpf:"dns_flows"` + PacketRecord *ebpf.Map `ebpf:"packet_record"` +} + +func (m *BpfMaps) Close() error { + return _BpfClose( + m.AggregatedFlows, + m.DirectFlows, + m.DnsFlows, + m.PacketRecord, + ) +} + +// BpfPrograms contains all programs after they have been loaded into the kernel. +// +// It can be passed to LoadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type BpfPrograms struct { + EgressFlowParse *ebpf.Program `ebpf:"egress_flow_parse"` + EgressPcaParse *ebpf.Program `ebpf:"egress_pca_parse"` + IngressFlowParse *ebpf.Program `ebpf:"ingress_flow_parse"` + IngressPcaParse *ebpf.Program `ebpf:"ingress_pca_parse"` + KfreeSkb *ebpf.Program `ebpf:"kfree_skb"` + TcpRcvKprobe *ebpf.Program `ebpf:"tcp_rcv_kprobe"` +} + +func (p *BpfPrograms) Close() error { + return _BpfClose( + p.EgressFlowParse, + p.EgressPcaParse, + p.IngressFlowParse, + p.IngressPcaParse, + p.KfreeSkb, + p.TcpRcvKprobe, + ) +} + +func _BpfClose(closers ...io.Closer) error { + for _, closer := range closers { + if err := closer.Close(); err != nil { + return err + } + } + return nil +} + +// Do not access this directly. +// +//go:embed bpf_bpfel_x86.o +var _BpfBytes []byte diff --git a/pkg/ebpf/bpf_bpfel_x86.o b/pkg/ebpf/bpf_bpfel_x86.o new file mode 100644 index 000000000..6a2fac565 Binary files /dev/null and b/pkg/ebpf/bpf_bpfel_x86.o differ diff --git a/pkg/ebpf/tracer.go b/pkg/ebpf/tracer.go index cf213ce6e..2d04c0671 100644 --- a/pkg/ebpf/tracer.go +++ b/pkg/ebpf/tracer.go @@ -26,18 +26,16 @@ import ( ) // $BPF_CLANG and $BPF_CFLAGS are set by the Makefile. -//go:generate bpf2go -cc $BPF_CLANG -cflags $BPF_CFLAGS -type flow_metrics_t -type flow_id_t -type flow_record_t -type pkt_drops_t -type dns_record_t Bpf ../../bpf/flows.c -- -I../../bpf/headers +//go:generate bpf2go -cc $BPF_CLANG -cflags $BPF_CFLAGS -target $GOARCH -type flow_metrics_t -type flow_id_t -type flow_record_t -type pkt_drops_t -type dns_record_t Bpf ../../bpf/flows.c -- -I../../bpf/headers const ( qdiscType = "clsact" // ebpf map names as defined in bpf/maps_definition.h aggregatedFlowsMap = "aggregated_flows" - flowSequencesMap = "flow_sequences" dnsLatencyMap = "dns_flows" // constants defined in flows.c as "volatile const" constSampling = "sampling" constTraceMessages = "trace_messages" - constEnableRtt = "enable_rtt" constEnableDNSTracking = "enable_dns_tracking" pktDropHook = "kfree_skb" constPcaPort = "pca_port" @@ -62,6 +60,7 @@ type FlowFetcher struct { enableIngress bool enableEgress bool pktDropsTracePoint link.Link + rttKprobeLink link.Link } type FlowFetcherConfig struct { @@ -88,30 +87,12 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) { // Resize maps according to user-provided configuration spec.Maps[aggregatedFlowsMap].MaxEntries = uint32(cfg.CacheMaxSize) - spec.Maps[flowSequencesMap].MaxEntries = uint32(cfg.CacheMaxSize) traceMsgs := 0 if cfg.Debug { traceMsgs = 1 } - enableRtt := 0 - if cfg.EnableRTT { - if !(cfg.EnableEgress && cfg.EnableIngress) { - log.Warnf("ENABLE_RTT is set to true. But both Ingress AND Egress are not enabled. Disabling ENABLE_RTT") - enableRtt = 0 - } else { - enableRtt = 1 - } - } - - if enableRtt == 0 { - // Cannot set the size of map to be 0 so set it to 1. - spec.Maps[flowSequencesMap].MaxEntries = uint32(1) - } else { - log.Debugf("RTT calculations are enabled") - } - enableDNSTracking := 0 if cfg.DNSTracker { enableDNSTracking = 1 @@ -124,7 +105,6 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) { if err := spec.RewriteConstants(map[string]interface{}{ constSampling: uint32(cfg.Sampling), constTraceMessages: uint8(traceMsgs), - constEnableRtt: uint8(enableRtt), constEnableDNSTracking: uint8(enableDNSTracking), }); err != nil { return nil, fmt.Errorf("rewriting BPF constants definition: %w", err) @@ -154,6 +134,14 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) { } } + var rttKprobeLink link.Link + if cfg.EnableRTT { + rttKprobeLink, err = link.Kprobe("tcp_rcv_established", objects.TcpRcvKprobe, nil) + if err != nil { + return nil, fmt.Errorf("failed to attach the BPF program to tcpReceiveKprobe: %w", err) + } + } + // read events from igress+egress ringbuffer flows, err := ringbuf.NewReader(objects.DirectFlows) if err != nil { @@ -169,6 +157,7 @@ func NewFlowFetcher(cfg *FlowFetcherConfig) (*FlowFetcher, error) { enableIngress: cfg.EnableIngress, enableEgress: cfg.EnableEgress, pktDropsTracePoint: pktDropsLink, + rttKprobeLink: rttKprobeLink, }, nil } @@ -297,6 +286,11 @@ func (m *FlowFetcher) Close() error { errs = append(errs, err) } } + if m.rttKprobeLink != nil { + if err := m.rttKprobeLink.Close(); err != nil { + errs = append(errs, err) + } + } // m.ringbufReader.Read is a blocking operation, so we need to close the ring buffer // from another goroutine to avoid the system not being able to exit if there // isn't traffic in a given interface @@ -321,9 +315,6 @@ func (m *FlowFetcher) Close() error { if err := m.objects.DnsFlows.Close(); err != nil { errs = append(errs, err) } - if err := m.objects.FlowSequences.Close(); err != nil { - errs = append(errs, err) - } if len(errs) == 0 { m.objects = nil } @@ -425,7 +416,6 @@ func (m *FlowFetcher) LookupAndDeleteMap() map[BpfFlowId][]BpfFlowMetrics { // DeleteMapsStaleEntries Look for any stale entries in the features maps and delete them func (m *FlowFetcher) DeleteMapsStaleEntries(timeOut time.Duration) { m.lookupAndDeleteDNSMap(timeOut) - m.lookupAndDeleteRTTMap(timeOut) } // lookupAndDeleteDNSMap iterate over DNS queries map and delete any stale DNS requests @@ -449,28 +439,6 @@ func (m *FlowFetcher) lookupAndDeleteDNSMap(timeOut time.Duration) { } } -// lookupAndDeleteRTTMap iterate over flows sequence map and delete any -// stale flows that we never get responses for. -func (m *FlowFetcher) lookupAndDeleteRTTMap(timeOut time.Duration) { - monotonicTimeNow := monotime.Now() - rttMap := m.objects.FlowSequences - var rttKey BpfFlowSeqId - var rttVal uint64 - - if rttMap != nil { - iterator := rttMap.Iterate() - for iterator.Next(&rttKey, &rttVal) { - if time.Duration(uint64(monotonicTimeNow)-rttVal) >= timeOut { - if err := rttMap.Delete(rttKey); err != nil { - log.WithError(err).WithField("rttKey", rttKey). - Warnf("couldn't delete RTT record entry") - } - } - } - } - -} - // kernelSpecificLoadAndAssign based on kernel version it will load only the supported ebPF hooks func kernelSpecificLoadAndAssign(oldKernel bool, spec *ebpf.CollectionSpec) (BpfObjects, error) { objects := BpfObjects{} @@ -482,6 +450,7 @@ func kernelSpecificLoadAndAssign(oldKernel bool, spec *ebpf.CollectionSpec) (Bpf type NewBpfPrograms struct { EgressFlowParse *ebpf.Program `ebpf:"egress_flow_parse"` IngressFlowParse *ebpf.Program `ebpf:"ingress_flow_parse"` + TCPRcvKprobe *ebpf.Program `ebpf:"tcp_rcv_kprobe"` } type NewBpfObjects struct { NewBpfPrograms @@ -504,9 +473,9 @@ func kernelSpecificLoadAndAssign(oldKernel bool, spec *ebpf.CollectionSpec) (Bpf // Note for any future maps or programs make sure to copy them manually here objects.DirectFlows = newObjects.DirectFlows objects.AggregatedFlows = newObjects.AggregatedFlows - objects.FlowSequences = newObjects.FlowSequences objects.EgressFlowParse = newObjects.EgressFlowParse objects.IngressFlowParse = newObjects.IngressFlowParse + objects.TcpRcvKprobe = newObjects.TCPRcvKprobe objects.KfreeSkb = nil } else { if err := spec.LoadAndAssign(&objects, nil); err != nil { @@ -562,12 +531,9 @@ func NewPacketFetcher( objects.IngressFlowParse = nil objects.DirectFlows = nil objects.AggregatedFlows = nil - objects.FlowSequences = nil delete(spec.Programs, aggregatedFlowsMap) - delete(spec.Programs, flowSequencesMap) delete(spec.Programs, constSampling) delete(spec.Programs, constTraceMessages) - delete(spec.Programs, constEnableRtt) delete(spec.Programs, constEnableDNSTracking) pcaPort := 0