Skip to content

Commit

Permalink
NETOBSERV-1732: add logic to lkup all previous tc filters and remove …
Browse files Browse the repository at this point in the history
…them (#360)

Signed-off-by: Mohamed Mahmoud <[email protected]>
  • Loading branch information
msherif1234 authored and jotak committed Jul 16, 2024
1 parent 55c5a46 commit 54a8cd9
Showing 1 changed file with 88 additions and 2 deletions.
90 changes: 88 additions & 2 deletions pkg/ebpf/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/vishvananda/netlink"
"github.com/vishvananda/netns"
"golang.org/x/sys/unix"
kerrors "k8s.io/apimachinery/pkg/util/errors"
)

// $BPF_CLANG and $BPF_CFLAGS are set by the Makefile.
Expand All @@ -45,6 +46,8 @@ const (
constPcaPort = "pca_port"
constPcaProto = "pca_proto"
pcaRecordsMap = "packet_record"
tcEgressFilterName = "tc/tc_egress_flow_parse"
tcIngressFilterName = "tc/tc_ingress_flow_parse"
)

var log = logrus.WithField("component", "ebpf.FlowFetcher")
Expand Down Expand Up @@ -252,6 +255,84 @@ func (m *FlowFetcher) AttachTCX(iface ifaces.Interface) error {
return nil
}

func removeTCFilters(ifName string, tcDir uint32) error {
link, err := netlink.LinkByName(ifName)
if err != nil {
return err
}

filters, err := netlink.FilterList(link, tcDir)
if err != nil {
return err
}
var errs []error
for _, f := range filters {
if err := netlink.FilterDel(f); err != nil {
errs = append(errs, err)
}
}

return kerrors.NewAggregate(errs)
}

func (m *FlowFetcher) removePreviousFilters(iface ifaces.Interface) error {
ilog := log.WithField("iface", iface)
ilog.Debugf("looking for previously installed TC filters on %s", iface.Name)
links, err := netlink.LinkList()
if err != nil {
return fmt.Errorf("retrieving all netlink devices: %w", err)
}

egressDevs := []netlink.Link{}
ingressDevs := []netlink.Link{}
for _, l := range links {
if l.Attrs().Name != iface.Name {
continue
}
ingressFilters, err := netlink.FilterList(l, netlink.HANDLE_MIN_INGRESS)
if err != nil {
return fmt.Errorf("listing ingress filters: %w", err)
}
for _, filter := range ingressFilters {
if bpfFilter, ok := filter.(*netlink.BpfFilter); ok {
if strings.HasPrefix(bpfFilter.Name, tcIngressFilterName) {
ingressDevs = append(ingressDevs, l)
}
}
}

egressFilters, err := netlink.FilterList(l, netlink.HANDLE_MIN_EGRESS)
if err != nil {
return fmt.Errorf("listing egress filters: %w", err)
}
for _, filter := range egressFilters {
if bpfFilter, ok := filter.(*netlink.BpfFilter); ok {
if strings.HasPrefix(bpfFilter.Name, tcEgressFilterName) {
egressDevs = append(egressDevs, l)
}
}
}
}

for _, dev := range ingressDevs {
ilog.Debugf("removing ingress stale tc filters from %s", dev.Attrs().Name)
err = removeTCFilters(dev.Attrs().Name, netlink.HANDLE_MIN_INGRESS)
if err != nil {
ilog.WithError(err).Errorf("couldn't remove ingress tc filters from %s", dev.Attrs().Name)
}
}

for _, dev := range egressDevs {
ilog.Debugf("removing egress stale tc filters from %s", dev.Attrs().Name)
err = removeTCFilters(dev.Attrs().Name, netlink.HANDLE_MIN_EGRESS)
if err != nil {
ilog.WithError(err).Errorf("couldn't remove egress tc filters from %s", dev.Attrs().Name)
}
}

return nil
}

// Register and links the eBPF fetcher into the system. The program should invoke Unregister
// before exiting.
func (m *FlowFetcher) Register(iface ifaces.Interface) error {
Expand Down Expand Up @@ -288,6 +369,11 @@ func (m *FlowFetcher) Register(iface ifaces.Interface) error {
}
m.qdiscs[iface] = qdisc

// Remove previously installed filters
if err := m.removePreviousFilters(iface); err != nil {
return fmt.Errorf("failed to remove previous filters: %w", err)
}

if err := m.registerEgress(iface, ipvlan, handle); err != nil {
return err
}
Expand All @@ -312,7 +398,7 @@ func (m *FlowFetcher) registerEgress(iface ifaces.Interface, ipvlan netlink.Link
egressFilter := &netlink.BpfFilter{
FilterAttrs: egressAttrs,
Fd: m.objects.TcEgressFlowParse.FD(),
Name: "tc/tc_egress_flow_parse",
Name: tcEgressFilterName,
DirectAction: true,
}
if err := handle.FilterDel(egressFilter); err == nil {
Expand Down Expand Up @@ -346,7 +432,7 @@ func (m *FlowFetcher) registerIngress(iface ifaces.Interface, ipvlan netlink.Lin
ingressFilter := &netlink.BpfFilter{
FilterAttrs: ingressAttrs,
Fd: m.objects.TcIngressFlowParse.FD(),
Name: "tc/tc_ingress_flow_parse",
Name: tcIngressFilterName,
DirectAction: true,
}
if err := handle.FilterDel(ingressFilter); err == nil {
Expand Down

0 comments on commit 54a8cd9

Please sign in to comment.