Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Auditbeat] Fixes for system/socket dataset UDP and DNS #14315

Merged
merged 4 commits into from
Oct 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 43 additions & 41 deletions x-pack/auditbeat/module/system/socket/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,15 +628,15 @@ func (e *udpv6SendMsgCall) Update(s *state) error {
}

type udpQueueRcvSkb struct {
Meta tracing.Metadata `kprobe:"metadata"`
Sock uintptr `kprobe:"sock"`
Size uint32 `kprobe:"size"`
LAddr uint32 `kprobe:"laddr"`
LPort uint16 `kprobe:"lport"`
IPHdr uint16 `kprobe:"iphdr"`
UDPHdr uint16 `kprobe:"udphdr"`
Base uintptr `kprobe:"base"`
Packet [pktHeaderDumpBytes]byte `kprobe:"packet,greedy"`
Meta tracing.Metadata `kprobe:"metadata"`
Sock uintptr `kprobe:"sock"`
Size uint32 `kprobe:"size"`
LAddr uint32 `kprobe:"laddr"`
LPort uint16 `kprobe:"lport"`
IPHdr uint16 `kprobe:"iphdr"`
UDPHdr uint16 `kprobe:"udphdr"`
Base uintptr `kprobe:"base"`
Packet [skBuffDataDumpBytes]byte `kprobe:"packet,greedy"`
}

func validIPv4Headers(ipHdr uint16, udpHdr uint16, data []byte) bool {
Expand All @@ -656,6 +656,15 @@ func validIPv6Headers(ipHdr uint16, udpHdr uint16, data []byte) bool {
}

func (e *udpQueueRcvSkb) asFlow() flow {
f := flow{
sock: e.Sock,
pid: e.Meta.PID,
inetType: inetTypeIPv4,
proto: protoUDP,
dir: directionInbound,
lastSeen: kernelTime(e.Meta.Timestamp),
local: newEndpointIPv4(e.LAddr, e.LPort, 0, 0),
}
if valid := validIPv4Headers(e.IPHdr, e.UDPHdr, e.Packet[:]); !valid {
// Check if we're dealing with pointers
// TODO: This should check for SK_BUFF_HAS_POINTERS. Instead is just
Expand All @@ -678,24 +687,16 @@ func (e *udpQueueRcvSkb) asFlow() flow {
}
}
if !valid {
return flow{}
return f
}
}
var raddr uint32
var rport uint16
// the remote is this packet's source
raddr = tracing.MachineEndian.Uint32(e.Packet[e.IPHdr+12:])
rport = tracing.MachineEndian.Uint16(e.Packet[e.UDPHdr:])
return flow{
sock: e.Sock,
pid: e.Meta.PID,
inetType: inetTypeIPv4,
proto: protoUDP,
dir: directionInbound,
lastSeen: kernelTime(e.Meta.Timestamp),
local: newEndpointIPv4(e.LAddr, e.LPort, 0, 0),
remote: newEndpointIPv4(raddr, rport, 1, uint64(e.Size)+minIPv4UdpPacketSize),
}
f.remote = newEndpointIPv4(raddr, rport, 1, uint64(e.Size)+minIPv4UdpPacketSize)
return f
}

// String returns a representation of the event.
Expand All @@ -716,19 +717,28 @@ func (e *udpQueueRcvSkb) Update(s *state) error {
}

type udpv6QueueRcvSkb struct {
Meta tracing.Metadata `kprobe:"metadata"`
Sock uintptr `kprobe:"sock"`
Size uint32 `kprobe:"size"`
LAddrA uint64 `kprobe:"laddra"`
LAddrB uint64 `kprobe:"laddrb"`
LPort uint16 `kprobe:"lport"`
IPHdr uint16 `kprobe:"iphdr"`
UDPHdr uint16 `kprobe:"udphdr"`
Base uintptr `kprobe:"base"`
Packet [pktHeaderDumpBytes]byte `kprobe:"packet,greedy"`
Meta tracing.Metadata `kprobe:"metadata"`
Sock uintptr `kprobe:"sock"`
Size uint32 `kprobe:"size"`
LAddrA uint64 `kprobe:"laddra"`
LAddrB uint64 `kprobe:"laddrb"`
LPort uint16 `kprobe:"lport"`
IPHdr uint16 `kprobe:"iphdr"`
UDPHdr uint16 `kprobe:"udphdr"`
Base uintptr `kprobe:"base"`
Packet [skBuffDataDumpBytes]byte `kprobe:"packet,greedy"`
}

func (e *udpv6QueueRcvSkb) asFlow() flow {
f := flow{
sock: e.Sock,
pid: e.Meta.PID,
inetType: inetTypeIPv6,
proto: protoUDP,
dir: directionInbound,
lastSeen: kernelTime(e.Meta.Timestamp),
local: newEndpointIPv6(e.LAddrA, e.LAddrB, e.LPort, 0, 0),
}
if valid := validIPv6Headers(e.IPHdr, e.UDPHdr, e.Packet[:]); !valid {
// Check if we're dealing with pointers
// TODO: This only works in little-endian, same as in udpQueueRcvSkb
Expand All @@ -743,7 +753,7 @@ func (e *udpv6QueueRcvSkb) asFlow() flow {
}
}
if !valid {
return flow{}
return f
}
}
var raddrA, raddrB uint64
Expand All @@ -752,16 +762,8 @@ func (e *udpv6QueueRcvSkb) asFlow() flow {
raddrA = tracing.MachineEndian.Uint64(e.Packet[e.IPHdr+8:])
raddrB = tracing.MachineEndian.Uint64(e.Packet[e.IPHdr+16:])
rport = tracing.MachineEndian.Uint16(e.Packet[e.UDPHdr:])
return flow{
sock: e.Sock,
pid: e.Meta.PID,
inetType: inetTypeIPv6,
proto: protoUDP,
dir: directionInbound,
lastSeen: kernelTime(e.Meta.Timestamp),
local: newEndpointIPv6(e.LAddrA, e.LAddrB, e.LPort, 0, 0),
remote: newEndpointIPv6(raddrA, raddrB, rport, 1, uint64(e.Size)+minIPv6UdpPacketSize),
}
f.remote = newEndpointIPv6(raddrA, raddrB, rport, 1, uint64(e.Size)+minIPv6UdpPacketSize)
return f
}

// String returns a representation of the event.
Expand Down
10 changes: 6 additions & 4 deletions x-pack/auditbeat/module/system/socket/kprobes.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"github.com/elastic/beats/x-pack/auditbeat/tracing"
)

// Enough for padding + mac_hdr + max ip_hdr + udp_hdr
const pktHeaderDumpBytes = 8 * 12
// This is how many data we dump from sk_buff->data to read full packet headers
// (IP + UDP header). This has been observed to include up to 100 bytes of
// padding.
const skBuffDataDumpBytes = 256

// ProbeTransform transforms a probe before its installed.
type ProbeTransform func(helper.ProbeDef) helper.ProbeDef
Expand Down Expand Up @@ -300,7 +302,7 @@ var sharedKProbes = []helper.ProbeDef{
Probe: tracing.Probe{
Name: "udp_queue_rcv_skb",
Address: "udp_queue_rcv_skb",
Fetchargs: "sock={{.P1}} size=+{{.SK_BUFF_LEN}}({{.P2}}):u32 laddr=+{{.INET_SOCK_LADDR}}({{.P1}}):u32 lport=+{{.INET_SOCK_LPORT}}({{.P1}}):u16 iphdr=+{{.SK_BUFF_NETWORK}}({{.P2}}):u16 udphdr=+{{.SK_BUFF_TRANSPORT}}({{.P2}}):u16 base=+{{.SK_BUFF_HEAD}}({{.P2}}) packet=" + helper.MakeMemoryDump("+{{.SK_BUFF_HEAD}}({{.P2}})", 0, pktHeaderDumpBytes),
Fetchargs: "sock={{.P1}} size=+{{.SK_BUFF_LEN}}({{.P2}}):u32 laddr=+{{.INET_SOCK_LADDR}}({{.P1}}):u32 lport=+{{.INET_SOCK_LPORT}}({{.P1}}):u16 iphdr=+{{.SK_BUFF_NETWORK}}({{.P2}}):u16 udphdr=+{{.SK_BUFF_TRANSPORT}}({{.P2}}):u16 base=+{{.SK_BUFF_HEAD}}({{.P2}}) packet=" + helper.MakeMemoryDump("+{{.SK_BUFF_HEAD}}({{.P2}})", 0, skBuffDataDumpBytes),
},
Decoder: helper.NewStructDecoder(func() interface{} { return new(udpQueueRcvSkb) }),
},
Expand Down Expand Up @@ -463,7 +465,7 @@ var ipv6KProbes = []helper.ProbeDef{
Probe: tracing.Probe{
Name: "udpv6_queue_rcv_skb",
Address: "udpv6_queue_rcv_skb",
Fetchargs: "sock={{.P1}} size=+{{.SK_BUFF_LEN}}({{.P2}}):u32 laddra={{.INET_SOCK_V6_LADDR_A}}({{.P1}}){{.INET_SOCK_V6_TERM}} laddrb={{.INET_SOCK_V6_LADDR_B}}({{.P1}}){{.INET_SOCK_V6_TERM}} lport=+{{.INET_SOCK_LPORT}}({{.P1}}):u16 iphdr=+{{.SK_BUFF_NETWORK}}({{.P2}}):u16 udphdr=+{{.SK_BUFF_TRANSPORT}}({{.P2}}):u16 base=+{{.SK_BUFF_HEAD}}({{.P2}}) packet=" + helper.MakeMemoryDump("+{{.SK_BUFF_HEAD}}({{.P2}})", 0, pktHeaderDumpBytes),
Fetchargs: "sock={{.P1}} size=+{{.SK_BUFF_LEN}}({{.P2}}):u32 laddra={{.INET_SOCK_V6_LADDR_A}}({{.P1}}){{.INET_SOCK_V6_TERM}} laddrb={{.INET_SOCK_V6_LADDR_B}}({{.P1}}){{.INET_SOCK_V6_TERM}} lport=+{{.INET_SOCK_LPORT}}({{.P1}}):u16 iphdr=+{{.SK_BUFF_NETWORK}}({{.P2}}):u16 udphdr=+{{.SK_BUFF_TRANSPORT}}({{.P2}}):u16 base=+{{.SK_BUFF_HEAD}}({{.P2}}) packet=" + helper.MakeMemoryDump("+{{.SK_BUFF_HEAD}}({{.P2}})", 0, skBuffDataDumpBytes),
},
Decoder: helper.NewStructDecoder(func() interface{} { return new(udpv6QueueRcvSkb) }),
},
Expand Down
13 changes: 12 additions & 1 deletion x-pack/auditbeat/module/system/socket/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,11 +611,19 @@ func (s *state) mutualEnrich(sock *socket, f *flow) {
if sockNoPID := sock.pid == 0; sockNoPID != (f.pid == 0) {
if sockNoPID {
sock.pid = f.pid
sock.process = f.process
} else {
f.pid = sock.pid
}
}
if sockNoProcess := sock.process == nil; sockNoProcess != (f.process == nil) {
if sockNoProcess {
sock.process = f.process
} else {
f.process = sock.process
}
} else if sock.process == nil && sock.pid != 0 {
sock.process = s.getProcess(sock.pid)
f.process = sock.process
}
}

Expand All @@ -632,6 +640,7 @@ func (s *state) createFlow(ref flow) error {

ref.createdTime = ref.lastSeenTime
s.mutualEnrich(sock, &ref)

// don't create the flow yet if it doesn't have a populated remote address
if ref.remote.addr.IP == nil {
return nil
Expand All @@ -658,6 +667,8 @@ func (s *state) OnSockDestroyed(ptr uintptr, pid uint32) error {
// Enrich with pid
if sock.pid == 0 && pid != 0 {
sock.pid = pid
}
if sock.process == nil && sock.pid != 0 {
sock.process = s.getProcess(pid)
}
// Keep the sock around in case it's a connected TCP socket, as still some
Expand Down
141 changes: 137 additions & 4 deletions x-pack/auditbeat/module/system/socket/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,145 @@ func TestTCPConnWithProcess(t *testing.T) {
}
}

func assertValue(t *testing.T, ev beat.Event, expected interface{}, field string) bool {
value, err := ev.GetValue(field)
func TestUDPOutgoingSinglePacketWithProcess(t *testing.T) {
const (
localIP = "192.168.33.10"
remoteIP = "172.19.12.13"
localPort = 38842
remotePort = 53
sock uintptr = 0xff1234
)
st := makeState(nil, (*logWrapper)(t), time.Second, 0, time.Second)
lPort, rPort := be16(localPort), be16(remotePort)
lAddr, rAddr := ipv4(localIP), ipv4(remoteIP)
evs := []event{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this test format. It's easy to follow what's happening.

callExecve(meta(1234, 1234, 1), []string{"/usr/bin/exfil-udp"}),
&commitCreds{Meta: meta(1234, 1234, 2), UID: 501, GID: 20, EUID: 501, EGID: 20},
&execveRet{Meta: meta(1234, 1234, 2), Retval: 1234},
&inetCreate{Meta: meta(1234, 1235, 5), Proto: 0},
&sockInitData{Meta: meta(1234, 1235, 5), Sock: sock},
&udpSendMsgCall{
Meta: meta(1234, 1235, 6),
Sock: sock,
Size: 123,
LAddr: lAddr,
RAddr: rAddr,
AltRAddr: 0,
LPort: lPort,
RPort: rPort,
AltRPort: 0,
},
&inetReleaseCall{Meta: meta(1234, 1235, 17), Sock: sock},
&doExit{Meta: meta(1234, 1234, 18)},
}
if err := feedEvents(evs, st, t); err != nil {
t.Fatal(err)
}
st.ExpireOlder()
flows, err := getFlows(st.DoneFlows(), all)
if err != nil {
t.Fatal(err, "field", field)
t.Fatal(err)
}
return assert.Equal(t, expected, value)
assert.Len(t, flows, 1)
flow := flows[0]
t.Log("read flow", flow)
for field, expected := range map[string]interface{}{
"source.ip": localIP,
"source.port": localPort,
"source.packets": uint64(1),
"source.bytes": uint64(151),
"client.ip": localIP,
"client.port": localPort,
"destination.ip": remoteIP,
"destination.port": remotePort,
"destination.packets": uint64(0),
"destination.bytes": uint64(0),
"server.ip": remoteIP,
"server.port": remotePort,
"network.direction": "outbound",
"network.transport": "udp",
"network.type": "ipv4",
"process.pid": 1234,
"process.name": "exfil-udp",
"user.id": "501",
} {
assertValue(t, flow, expected, field)
}
}

func TestUDPIncomingSinglePacketWithProcess(t *testing.T) {
const (
localIP = "192.168.33.10"
remoteIP = "172.19.12.13"
localPort = 38842
remotePort = 53
sock uintptr = 0xff1234
)
st := makeState(nil, (*logWrapper)(t), time.Second, 0, time.Second)
lPort, rPort := be16(localPort), be16(remotePort)
lAddr, rAddr := ipv4(localIP), ipv4(remoteIP)
var packet [256]byte
var ipHdr, udpHdr uint16 = 2, 64
packet[ipHdr] = 0x45
tracing.MachineEndian.PutUint32(packet[ipHdr+12:], rAddr)
tracing.MachineEndian.PutUint16(packet[udpHdr:], rPort)
evs := []event{
callExecve(meta(1234, 1234, 1), []string{"/usr/bin/exfil-udp"}),
&commitCreds{Meta: meta(1234, 1234, 2), UID: 501, GID: 20, EUID: 501, EGID: 20},
&execveRet{Meta: meta(1234, 1234, 2), Retval: 1234},
&inetCreate{Meta: meta(1234, 1235, 5), Proto: 0},
&sockInitData{Meta: meta(1234, 1235, 5), Sock: sock},
&udpQueueRcvSkb{
Meta: meta(1234, 1235, 5),
Sock: sock,
Size: 123,
LAddr: lAddr,
LPort: lPort,
IPHdr: ipHdr,
UDPHdr: udpHdr,
Packet: packet,
},
&inetReleaseCall{Meta: meta(1234, 1235, 17), Sock: sock},
&doExit{Meta: meta(1234, 1234, 18)},
}
if err := feedEvents(evs, st, t); err != nil {
t.Fatal(err)
}
st.ExpireOlder()
flows, err := getFlows(st.DoneFlows(), all)
if err != nil {
t.Fatal(err)
}
assert.Len(t, flows, 1)
flow := flows[0]
t.Log("read flow", flow)
for field, expected := range map[string]interface{}{
"source.ip": remoteIP,
"source.port": remotePort,
"source.packets": uint64(1),
"source.bytes": uint64(151),
"client.ip": remoteIP,
"client.port": remotePort,
"destination.ip": localIP,
"destination.port": localPort,
"destination.packets": uint64(0),
"destination.bytes": uint64(0),
"server.ip": localIP,
"server.port": localPort,
"network.direction": "inbound",
"network.transport": "udp",
"network.type": "ipv4",
"process.pid": 1234,
"process.name": "exfil-udp",
"user.id": "501",
} {
assertValue(t, flow, expected, field)
}
}

func assertValue(t *testing.T, ev beat.Event, expected interface{}, field string) bool {
value, err := ev.GetValue(field)
return assert.Nil(t, err, field) && assert.Equal(t, expected, value, field)
}

func be16(val uint16) uint16 {
Expand Down
Loading