Skip to content

Commit

Permalink
probe: conntrack: fix output parsing
Browse files Browse the repository at this point in the history
With net.netfilter.nf_conntrack_acct = 1, conntrack adds the following
fields in the output: packets=3 bytes=164

And with SELinux (e.g. Fedora), conntrack adds: secctx=...

The parsing with fmt.Sscanf introduced in #2095 was unfortunately
rejecting lines with those fields. This patch fixes that by adding more
complicated parsing in decodeFlowKeyValues() with FieldsFunc and SplitN.

Fixes #2117
Regression from #2095
  • Loading branch information
alban committed Jan 17, 2017
1 parent 75ed573 commit f1e2b5d
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 40 deletions.
114 changes: 75 additions & 39 deletions probe/endpoint/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"io/ioutil"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"unicode"

log "github.com/Sirupsen/logrus"

Expand Down Expand Up @@ -250,6 +252,58 @@ func removeInplace(s, sep []byte) []byte {
return s[:len(s)-len(sep)]
}

// decodeFlowKeyValues parses the key-values from a conntrack line and updates the flow
// It only considers the following key-values:
// src=127.0.0.1 dst=127.0.0.1 sport=58958 dport=6784 src=127.0.0.1 dst=127.0.0.1 sport=6784 dport=58958 id=1595499776
// Keys can be present twice, so the order is important.
// Conntrack could add other key-values such as secctx=, packets=, bytes=. Those are ignored.
func decodeFlowKeyValues(line []byte, f *flow) error {
var err error
for _, field := range strings.FieldsFunc(string(line), func(c rune) bool { return unicode.IsSpace(c) }) {
kv := strings.SplitN(field, "=", 2)
if len(kv) != 2 {
continue
}
key := kv[0]
value := kv[1]
firstTupleSet := f.Original.Layer4.DstPort != 0
switch {
case key == "src":
if !firstTupleSet {
f.Original.Layer3.SrcIP = value
} else {
f.Reply.Layer3.SrcIP = value
}

case key == "dst":
if !firstTupleSet {
f.Original.Layer3.DstIP = value
} else {
f.Reply.Layer3.DstIP = value
}

case key == "sport":
if !firstTupleSet {
f.Original.Layer4.SrcPort, err = strconv.Atoi(value)
} else {
f.Reply.Layer4.SrcPort, err = strconv.Atoi(value)
}

case key == "dport":
if !firstTupleSet {
f.Original.Layer4.DstPort, err = strconv.Atoi(value)
} else {
f.Reply.Layer4.DstPort, err = strconv.Atoi(value)
}

case key == "id":
f.Independent.ID, err = strconv.ParseInt(value, 10, 64)
}
}

return err
}

func decodeStreamedFlow(scanner *bufio.Scanner) (flow, error) {
var (
// Use ints for parsing unused fields since their allocations
Expand All @@ -273,42 +327,29 @@ func decodeStreamedFlow(scanner *bufio.Scanner) (flow, error) {
line = bytes.TrimLeft(line, " ")
if bytes.HasPrefix(line, destroyTypeB) {
// Destroy events don't have a timeout or state field
_, err = fmt.Sscanf(string(line), "%s %s %d src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%d",
_, err = fmt.Sscanf(string(line), "%s %s %d",
&f.Type,
&f.Original.Layer4.Proto,
&unused[0],
&f.Original.Layer3.SrcIP,
&f.Original.Layer3.DstIP,
&f.Original.Layer4.SrcPort,
&f.Original.Layer4.DstPort,
&f.Reply.Layer3.SrcIP,
&f.Reply.Layer3.DstIP,
&f.Reply.Layer4.SrcPort,
&f.Reply.Layer4.DstPort,
&f.Independent.ID,
)
} else {
_, err = fmt.Sscanf(string(line), "%s %s %d %d %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d id=%d",
_, err = fmt.Sscanf(string(line), "%s %s %d %d %s",
&f.Type,
&f.Original.Layer4.Proto,
&unused[0],
&unused[1],
&f.Independent.State,
&f.Original.Layer3.SrcIP,
&f.Original.Layer3.DstIP,
&f.Original.Layer4.SrcPort,
&f.Original.Layer4.DstPort,
&f.Reply.Layer3.SrcIP,
&f.Reply.Layer3.DstIP,
&f.Reply.Layer4.SrcPort,
&f.Reply.Layer4.DstPort,
&f.Independent.ID,
)
}
if err != nil {
return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err)
}

err = decodeFlowKeyValues(line, &f)
if err != nil {
return flow{}, fmt.Errorf("Error parsing streamed flow %q: %v ", line, err)
}

f.Reply.Layer4.Proto = f.Original.Layer4.Proto
return f, nil
}
Expand Down Expand Up @@ -353,32 +394,27 @@ func decodeDumpedFlow(scanner *bufio.Scanner) (flow, error) {
f flow
)

// Example:
// " tcp 6 431997 ESTABLISHED src=10.32.0.1 dst=10.32.0.1 sport=50274 dport=4040 src=10.32.0.1 dst=10.32.0.1 sport=4040 dport=50274 [ASSURED] mark=0 use=1 id=407401088c"
// Examples with different formats:
// With SELinux, there is a "secctx="
// After "sudo sysctl net.netfilter.nf_conntrack_acct=1", there is "packets=" and "bytes="
//
// "tcp 6 431997 ESTABLISHED src=10.32.0.1 dst=10.32.0.1 sport=50274 dport=4040 src=10.32.0.1 dst=10.32.0.1 sport=4040 dport=50274 [ASSURED] mark=0 use=1 id=407401088"
// "tcp 6 431998 ESTABLISHED src=10.0.2.2 dst=10.0.2.15 sport=49911 dport=22 src=10.0.2.15 dst=10.0.2.2 sport=22 dport=49911 [ASSURED] mark=0 use=1 id=2993966208"
// "tcp 6 108 ESTABLISHED src=172.17.0.5 dst=172.17.0.2 sport=47010 dport=80 src=172.17.0.2 dst=172.17.0.5 sport=80 dport=47010 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=4001098880"
// "tcp 6 431970 ESTABLISHED src=192.168.35.116 dst=216.58.213.227 sport=49862 dport=443 packets=11 bytes=1337 src=216.58.213.227 dst=192.168.35.116 sport=443 dport=49862 packets=8 bytes=716 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=943643840"

// remove tags since they are optional and make parsing harder
line, err := getUntaggedLine(scanner)
if err != nil {
return flow{}, err
}

_, err = fmt.Sscanf(string(line), "%s %d %d %s src=%s dst=%s sport=%d dport=%d src=%s dst=%s sport=%d dport=%d mark=%d use=%d id=%d",
&f.Original.Layer4.Proto,
&unused[0],
&unused[1],
&f.Independent.State,
&f.Original.Layer3.SrcIP,
&f.Original.Layer3.DstIP,
&f.Original.Layer4.SrcPort,
&f.Original.Layer4.DstPort,
&f.Reply.Layer3.SrcIP,
&f.Reply.Layer3.DstIP,
&f.Reply.Layer4.SrcPort,
&f.Reply.Layer4.DstPort,
&unused[2],
&unused[3],
&f.Independent.ID,
)
_, err = fmt.Sscanf(string(line), "%s %d %d %s", &f.Original.Layer4.Proto, &unused[0], &unused[1], &f.Independent.State)
if err != nil {
return flow{}, fmt.Errorf("Error parsing dumped flow %q: %v ", line, err)
}

err = decodeFlowKeyValues(line, &f)
if err != nil {
return flow{}, fmt.Errorf("Error parsing dumped flow %q: %v ", line, err)
}
Expand Down
62 changes: 61 additions & 1 deletion probe/endpoint/conntrack_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,11 @@ func TestStreamedFlowDecoding(t *testing.T) {
}

// Obtained through conntrack -L -p tcp -o id
const dumpedFlowsSource = `tcp 6 431998 ESTABLISHED src=10.0.2.2 dst=10.0.2.15 sport=49911 dport=22 src=10.0.2.15 dst=10.0.2.2 sport=22 dport=49911 [ASSURED] mark=0 use=1 id=2993966208`
// With SELinux, there is a "secctx="
// After "sudo sysctl net.netfilter.nf_conntrack_acct=1", there is "packets=" and "bytes="
const dumpedFlowsSource = `tcp 6 431998 ESTABLISHED src=10.0.2.2 dst=10.0.2.15 sport=49911 dport=22 src=10.0.2.15 dst=10.0.2.2 sport=22 dport=49911 [ASSURED] mark=0 use=1 id=2993966208
tcp 6 108 ESTABLISHED src=172.17.0.5 dst=172.17.0.2 sport=47010 dport=80 src=172.17.0.2 dst=172.17.0.5 sport=80 dport=47010 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=4001098880
tcp 6 431970 ESTABLISHED src=192.168.35.116 dst=216.58.213.227 sport=49862 dport=443 packets=11 bytes=1337 src=216.58.213.227 dst=192.168.35.116 sport=443 dport=49862 packets=8 bytes=716 [ASSURED] mark=0 secctx=system_u:object_r:unlabeled_t:s0 use=1 id=943643840`

var wantDumpedFlows = []flow{
{
Expand Down Expand Up @@ -186,6 +190,62 @@ var wantDumpedFlows = []flow{
State: "ESTABLISHED",
},
},
{
Original: meta{
Layer3: layer3{
SrcIP: "172.17.0.5",
DstIP: "172.17.0.2",
},
Layer4: layer4{
SrcPort: 47010,
DstPort: 80,
Proto: "tcp",
},
},
Reply: meta{
Layer3: layer3{
SrcIP: "172.17.0.2",
DstIP: "172.17.0.5",
},
Layer4: layer4{
SrcPort: 80,
DstPort: 47010,
Proto: "tcp",
},
},
Independent: meta{
ID: 4001098880,
State: "ESTABLISHED",
},
},
{
Original: meta{
Layer3: layer3{
SrcIP: "192.168.35.116",
DstIP: "216.58.213.227",
},
Layer4: layer4{
SrcPort: 49862,
DstPort: 443,
Proto: "tcp",
},
},
Reply: meta{
Layer3: layer3{
SrcIP: "216.58.213.227",
DstIP: "192.168.35.116",
},
Layer4: layer4{
SrcPort: 443,
DstPort: 49862,
Proto: "tcp",
},
},
Independent: meta{
ID: 943643840,
State: "ESTABLISHED",
},
},
}

func TestDumpedFlowDecoding(t *testing.T) {
Expand Down

0 comments on commit f1e2b5d

Please sign in to comment.