Skip to content

Commit

Permalink
Add option to exclude archival messages from traffic using source por…
Browse files Browse the repository at this point in the history
…ts (#136)

* Add -exclude-srcport flag to tcp-info
* Add ExcludeConfig to netlink.MakeArchivalRecord
  • Loading branch information
stephen-soltesz authored Jun 9, 2023
1 parent 093ae42 commit d6576cc
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 42 deletions.
5 changes: 4 additions & 1 deletion cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ func fakeMsg(t *testing.T, cookie uint64, dport uint16) netlink.ArchivalRecord {
if err != nil {
t.Fatal(err)
}
mp, err := netlink.MakeArchivalRecord(&nm, true)
mp, err := netlink.MakeArchivalRecord(&nm, &netlink.ExcludeConfig{Local: true})
if err != nil {
t.Fatal(err)
}
idm, err := mp.RawIDM.Parse()
if err != nil {
t.Fatal(err)
}
for i := 0; i < 8; i++ {
idm.ID.IDiagCookie[i] = byte(cookie & 0x0FF)
cookie >>= 8
Expand Down
4 changes: 2 additions & 2 deletions collector/collector_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestRun(t *testing.T) {
if v4 == nil {
continue
}
m, err := netlink.MakeArchivalRecord(v4, false)
m, err := netlink.MakeArchivalRecord(v4, nil)
testFatal(t, err)
idm, err := m.RawIDM.Parse()
testFatal(t, err)
Expand All @@ -128,7 +128,7 @@ func TestRun(t *testing.T) {
if v6 == nil {
continue
}
m, err := netlink.MakeArchivalRecord(v6, false)
m, err := netlink.MakeArchivalRecord(v6, nil)
testFatal(t, err)
idm, err := m.RawIDM.Parse()
testFatal(t, err)
Expand Down
47 changes: 37 additions & 10 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"runtime"
"runtime/trace"
"strconv"

"github.com/m-lab/tcp-info/eventsocket"

Expand Down Expand Up @@ -51,9 +52,21 @@ flat flat% sum% cum cum%
0.01s 0.2% 95.82% 0.07s 1.39% runtime.makeslice
*/

var (
reps int
enableTrace bool
outputDir string
excludeSrcPorts = flagx.StringArray{}
)

func init() {
// Always prepend the filename and line number.
log.SetFlags(log.LstdFlags | log.Lshortfile)

flag.IntVar(&reps, "reps", 0, "How many cycles should be recorded, 0 means continuous")
flag.BoolVar(&enableTrace, "trace", false, "Enable trace")
flag.StringVar(&outputDir, "output", "", "Directory in which to put the resulting tree of data. Default is the current directory.")
flag.Var(&excludeSrcPorts, "exclude-srcport", "Exclude snapshots with these local ports from saved archives.")
}

// NOTES:
Expand All @@ -62,20 +75,17 @@ func init() {
// 3. zstd seems to result in similar file size using proto or raw output.

var (
reps = flag.Int("reps", 0, "How many cycles should be recorded, 0 means continuous")
enableTrace = flag.Bool("trace", false, "Enable trace")
outputDir = flag.String("output", "", "Directory in which to put the resulting tree of data. Default is the current directory.")

ctx, cancel = context.WithCancel(context.Background())
)

func main() {
flag.Parse()
flagx.ArgsFromEnv(flag.CommandLine)
defer cancel()

if *outputDir != "" {
rtx.PanicOnError(os.MkdirAll(*outputDir, 0755), "Could not create the output dir %s", *outputDir)
rtx.Must(os.Chdir(*outputDir), "Could not change to the directory %s", *outputDir)
if outputDir != "" {
rtx.PanicOnError(os.MkdirAll(outputDir, 0755), "Could not create the output dir %s", outputDir)
rtx.Must(os.Chdir(outputDir), "Could not change to the directory %s", outputDir)
}

// Performance instrumentation.
Expand All @@ -86,7 +96,7 @@ func main() {
promSrv := prometheusx.MustServeMetrics()
defer promSrv.Shutdown(ctx)

if *enableTrace {
if enableTrace {
traceFile, err := os.Create("trace")
rtx.Must(err, "Could not create trace file")
rtx.Must(trace.Start(traceFile), "failed to start trace: %v", err)
Expand All @@ -101,16 +111,33 @@ func main() {
rtx.Must(eventSrv.Listen(), "Could not listen on", *eventsocket.Filename)
go eventSrv.Serve(ctx)

ex := &netlink.ExcludeConfig{
Local: true,
}

if len(excludeSrcPorts) != 0 {
srcPorts := map[uint16]bool{}
for _, port := range excludeSrcPorts {
i, err := strconv.ParseInt(port, 10, 16)
if err != nil {
log.Printf("skipping; cannot convert %q to integer", port)
continue
}
srcPorts[uint16(i)] = true
}
ex.SrcPorts = srcPorts
}

// Make the saver and construct the message channel, buffering up to 2 batches
// of messages without stalling producer. We may want to increase the buffer if
// we observe main() stalling.
svrChan := make(chan netlink.MessageBlock, 2)
anon := anonymize.New(anonymize.IPAnonymizationFlag)
svr := saver.NewSaver("host", "pod", 3, eventSrv, anon)
svr := saver.NewSaver("host", "pod", 3, eventSrv, anon, ex)
go svr.MessageSaverLoop(svrChan)

// Run the collector, possibly forever.
totalSeen, totalErr := collector.Run(ctx, *reps, svrChan, svr, true)
totalSeen, totalErr := collector.Run(ctx, reps, svrChan, svr, true)

// Shut down and clean up after the collector terminates.
close(svrChan)
Expand Down
35 changes: 23 additions & 12 deletions netlink/archival-record.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ type ArchivalRecord struct {
Metadata *Metadata `json:",omitempty"`
}

// ExcludeConfig provides options for excluding some measurements from archival messages.
type ExcludeConfig struct {
// Local excludes connections from loopback, local unicast, multicast, or unspecified connections.
Local bool
// SrcPorts excludes connections from specific source ports.
SrcPorts map[uint16]bool
}

// ParseRouteAttr parses a byte array into slice of NetlinkRouteAttr struct.
// Derived from "github.com/vishvananda/netlink/nl/nl_linux.go"
func ParseRouteAttr(b []byte) ([]NetlinkRouteAttr, error) {
Expand All @@ -63,24 +71,27 @@ func ParseRouteAttr(b []byte) ([]NetlinkRouteAttr, error) {
return attrs, nil
}

// MakeArchivalRecord parses the NetlinkMessage into a ArchivalRecord. If skipLocal is true, it will return nil for
// loopback, local unicast, multicast, and unspecified connections.
// Note that Parse does not populate the Timestamp field, so caller should do so.
func MakeArchivalRecord(msg *NetlinkMessage, skipLocal bool) (*ArchivalRecord, error) {
// MakeArchivalRecord parses the NetlinkMessage into a ArchivalRecord. If
// exclude is not nil, MakeArchivalRecord will return nil for any condition
// matching the exclude config options, e.g. localhost, or source ports. Note
// that Parse does not populate the Timestamp field, so caller should do so.
func MakeArchivalRecord(msg *NetlinkMessage, exclude *ExcludeConfig) (*ArchivalRecord, error) {
if msg.Header.Type != 20 {
return nil, ErrNotType20
}
raw, attrBytes := inetdiag.SplitInetDiagMsg(msg.Data)
if raw == nil {
return nil, ErrParseFailed
}
if skipLocal {
if exclude != nil {
idm, err := raw.Parse()
if err != nil {
return nil, err
}

if isLocal(idm.ID.SrcIP()) || isLocal(idm.ID.DstIP()) {
if exclude.SrcPorts != nil && exclude.SrcPorts[idm.ID.SPort()] {
return nil, nil
}
if exclude.Local && (isLocal(idm.ID.SrcIP()) || isLocal(idm.ID.DstIP())) {
return nil, nil
}
}
Expand Down Expand Up @@ -149,10 +160,10 @@ func isLocal(addr net.IP) bool {

// Compare compares important fields to determine whether significant updates have occurred.
// We ignore a bunch of fields:
// * The TCPInfo fields matching last_* are rapidly changing, but don't have much significance.
// Are they elapsed time fields?
// * The InetDiagMsg.Expires is also rapidly changing in many connections, but also seems
// unimportant.
// - The TCPInfo fields matching last_* are rapidly changing, but don't have much significance.
// Are they elapsed time fields?
// - The InetDiagMsg.Expires is also rapidly changing in many connections, but also seems
// unimportant.
//
// Significant updates are reflected in the packet, segment and byte count updates, so we
// generally want to record a snapshot when any of those change. They are in the latter
Expand Down Expand Up @@ -290,7 +301,7 @@ func (raw *rawReader) Next() (*ArchivalRecord, error) {
if err != nil {
return nil, err
}
return MakeArchivalRecord(msg, false)
return MakeArchivalRecord(msg, nil)
}

type archiveReader struct {
Expand Down
24 changes: 12 additions & 12 deletions netlink/netlink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestParse(t *testing.T) {
err := json.Unmarshal([]byte(json1), &nm)
t.Log("Data len = ", len(nm.Data))
rtx.Must(err, "")
mp, err := netlink.MakeArchivalRecord(&nm, true)
mp, err := netlink.MakeArchivalRecord(&nm, &netlink.ExcludeConfig{Local: true})
rtx.Must(err, "")
idm, err := mp.RawIDM.Parse()
rtx.Must(err, "")
Expand Down Expand Up @@ -71,14 +71,14 @@ func TestParseGarbage(t *testing.T) {
// Truncate the data down to something that makes no sense.
badNm := nm
badNm.Data = badNm.Data[:1]
_, err = netlink.MakeArchivalRecord(&badNm, true)
_, err = netlink.MakeArchivalRecord(&badNm, &netlink.ExcludeConfig{Local: true})
if err == nil {
t.Error("The parse should have failed")
}

// Replace the header type with one that we don't support.
nm.Header.Type = 10
_, err = netlink.MakeArchivalRecord(&nm, false)
_, err = netlink.MakeArchivalRecord(&nm, nil)
if err == nil {
t.Error("Should detect wrong type")
}
Expand All @@ -91,14 +91,14 @@ func TestParseGarbage(t *testing.T) {
nm.Data[i] = byte(i)
}

_, err = netlink.MakeArchivalRecord(&nm, false)
_, err = netlink.MakeArchivalRecord(&nm, nil)
if err == nil || err.Error() != "invalid argument" {
t.Error(err)
}

// Replace length with garbage so that data is incomplete.
nm.Header.Len = 400
_, err = netlink.MakeArchivalRecord(&nm, false)
_, err = netlink.MakeArchivalRecord(&nm, nil)
if err == nil || err.Error() != "invalid argument" {
t.Error(err)
}
Expand Down Expand Up @@ -132,7 +132,7 @@ func TestCompare(t *testing.T) {
if err != nil {
t.Fatal(err)
}
mp1, err := netlink.MakeArchivalRecord(&nm, true)
mp1, err := netlink.MakeArchivalRecord(&nm, &netlink.ExcludeConfig{Local: true})
if err != nil {
t.Fatal(err)
}
Expand All @@ -143,7 +143,7 @@ func TestCompare(t *testing.T) {
if err != nil {
t.Fatal(err)
}
mp2, err := netlink.MakeArchivalRecord(&nm2, true)
mp2, err := netlink.MakeArchivalRecord(&nm2, &netlink.ExcludeConfig{Local: true})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -193,7 +193,7 @@ func TestNLMsgSerialize(t *testing.T) {
}
t.Fatal(err)
}
pm, err := netlink.MakeArchivalRecord(msg, false)
pm, err := netlink.MakeArchivalRecord(msg, nil)
rtx.Must(err, "Could not parse test data")
// Parse doesn't fill the Timestamp, so for now, populate it with something...
pm.Timestamp = time.Date(2009, time.May, 29, 23, 59, 59, 0, time.UTC)
Expand Down Expand Up @@ -248,7 +248,7 @@ func TestCompressionSize(t *testing.T) {
}
t.Fatal(err)
}
pm, err := netlink.MakeArchivalRecord(msg, false)
pm, err := netlink.MakeArchivalRecord(msg, nil)
pm.Timestamp = ts.Truncate(time.Millisecond).UTC()
ts = ts.Add(6 * time.Millisecond)
rtx.Must(err, "Could not parse test data")
Expand Down Expand Up @@ -304,7 +304,7 @@ func BenchmarkNLMsgSerialize(b *testing.B) {
}
b.Fatal(err)
}
pm, err := netlink.MakeArchivalRecord(msg, false)
pm, err := netlink.MakeArchivalRecord(msg, nil)
rtx.Must(err, "Could not parse test data")
msgs = append(msgs, pm)
}
Expand Down Expand Up @@ -341,7 +341,7 @@ func BenchmarkNLMsgParseSerializeCompress(b *testing.B) {
b.Fatal(err)
}
raw = append(raw, msg)
pm, err := netlink.MakeArchivalRecord(msg, false)
pm, err := netlink.MakeArchivalRecord(msg, nil)
rtx.Must(err, "Could not parse test data")
msgs = append(msgs, pm)
}
Expand All @@ -358,7 +358,7 @@ func BenchmarkNLMsgParseSerializeCompress(b *testing.B) {
b.StartTimer()
for i := 0; i < b.N; i++ {
for _, msg := range raw {
m, err := netlink.MakeArchivalRecord(msg, false)
m, err := netlink.MakeArchivalRecord(msg, nil)
rtx.Must(err, "Could not parse test data")
jsonBytes, err := json.Marshal(m)
rtx.Must(err, "Could not serialize %v", m)
Expand Down
6 changes: 4 additions & 2 deletions saver/saver.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,12 @@ type Saver struct {
cache *cache.Cache
stats stats
eventServer eventsocket.Server
exclude *netlink.ExcludeConfig
}

// NewSaver creates a new Saver for the given host and pod. numMarshaller controls
// how many marshalling goroutines are used to distribute the marshalling workload.
func NewSaver(host string, pod string, numMarshaller int, srv eventsocket.Server, anon anonymize.IPAnonymizer) *Saver {
func NewSaver(host string, pod string, numMarshaller int, srv eventsocket.Server, anon anonymize.IPAnonymizer, ex *netlink.ExcludeConfig) *Saver {
m := make([]MarshalChan, 0, numMarshaller)
c := cache.NewCache()
// We start with capacity of 500. This will be reallocated as needed, but this
Expand All @@ -238,6 +239,7 @@ func NewSaver(host string, pod string, numMarshaller int, srv eventsocket.Server
ClosingStats: make(map[uint64]TcpStats, 100),
cache: c,
eventServer: srv,
exclude: ex,
}
}

Expand Down Expand Up @@ -306,7 +308,7 @@ func (svr *Saver) handleType(t time.Time, msgs []*netlink.NetlinkMessage) (uint6
log.Println("Nil message")
continue
}
ar, err := netlink.MakeArchivalRecord(msg, true)
ar, err := netlink.MakeArchivalRecord(msg, svr.exclude)
if ar == nil {
if err != nil {
log.Println(err)
Expand Down
6 changes: 3 additions & 3 deletions saver/saver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (msg *TestMsg) setDPort(dport uint16) *TestMsg {
}

func (msg *TestMsg) mustAR() *netlink.ArchivalRecord {
ar, err := netlink.MakeArchivalRecord(&msg.NetlinkMessage, true)
ar, err := netlink.MakeArchivalRecord(&msg.NetlinkMessage, nil)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestHistograms(t *testing.T) {
}()
eventCounts := &countingEventSocket{}
anon := anonymize.New(anonymize.None)
svr := saver.NewSaver("foo", "bar", 1, eventCounts, anon)
svr := saver.NewSaver("foo", "bar", 1, eventCounts, anon, nil)
svrChan := make(chan netlink.MessageBlock, 0) // no buffering
go svr.MessageSaverLoop(svrChan)

Expand Down Expand Up @@ -340,7 +340,7 @@ func TestFinWait2NotImplemented(t *testing.T) {
}

anon := anonymize.New(anonymize.None)
svr := saver.NewSaver("hostname", "fakePod", 1, eventsocket.NullServer(), anon)
svr := saver.NewSaver("hostname", "fakePod", 1, eventsocket.NullServer(), anon, nil)
blockChan := make(chan netlink.MessageBlock, 0)
go svr.MessageSaverLoop(blockChan)
for i := range msgs {
Expand Down

0 comments on commit d6576cc

Please sign in to comment.