Skip to content

Commit

Permalink
[SIEM][Auditbeat] Fix socket dataset startup when IPv6 is disabled (e…
Browse files Browse the repository at this point in the history
…lastic#13966) (elastic#14041)

This patch fixes a few problems with the new system/socket dataset when
IPv6 has been disabled by booting the kernel with `ipv6.disable=1`.

- Detection of IPv6 can fail in an unexpected way causing a startup failure
  instead of disabling IPv6 support.
- One offset guess depended on the ability to create AF_INET6 sockets.
- A couple of offset guessing tasks depended on a connect() to a magic
  address in the range 127/8 or fd00::/8, which can cause a timeout error
  due to connect() blocking on some systems.

Fixes elastic#13953

(cherry picked from commit 284faf4)
  • Loading branch information
adriansr authored Oct 15, 2019
1 parent 883bf27 commit 14140ab
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 42 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Auditbeat*

- Socket dataset: Fix start errors when IPv6 is disabled on the kernel. {issue}13953[13953] {pull}13966[13966]

*Filebeat*

Expand Down
6 changes: 3 additions & 3 deletions x-pack/auditbeat/module/system/socket/guess/guess.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@ func guessOnce(guesser Guesser, installer helper.ProbeInstaller, ctx Context) (r
// Need to make sure that the trigger has finished before extracting
// results because there could be data-races between Trigger and Extract.
select {
case result := <-thread.C():
if result.Err != nil {
return nil, errors.Wrap(err, "trigger execution failed")
case r := <-thread.C():
if r.Err != nil {
return nil, errors.Wrap(r.Err, "trigger execution failed")
}
case <-timer.C:
return nil, errors.New("timeout while waiting for trigger to complete")
Expand Down
15 changes: 11 additions & 4 deletions x-pack/auditbeat/module/system/socket/guess/inetsockaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ func init() {
}

type guessInetSockFamily struct {
ctx Context
family int
limit int
ctx Context
family int
limit int
canIPv6 bool
}

// Name of this guess.
Expand Down Expand Up @@ -103,6 +104,12 @@ func (g *guessInetSockFamily) Prepare(ctx Context) error {
if g.limit, ok = g.ctx.Vars["INET_SOCK_V6_LIMIT"].(int); !ok {
return errors.New("required variable INET_SOCK_V6_LIMIT not found")
}
// check that this system can create AF_INET6 sockets. Otherwise revert to
// using AF_INET only.
fd, err := unix.Socket(unix.AF_INET6, unix.SOCK_DGRAM, 0)
if g.canIPv6 = err == nil; g.canIPv6 {
unix.Close(fd)
}
return nil
}

Expand All @@ -114,7 +121,7 @@ func (g *guessInetSockFamily) Terminate() error {
// Trigger creates and then closes a socket alternating between AF_INET/AF_INET6
// on each run.
func (g *guessInetSockFamily) Trigger() error {
if g.family == unix.AF_INET {
if g.canIPv6 && g.family == unix.AF_INET {
g.family = unix.AF_INET6
} else {
g.family = unix.AF_INET
Expand Down
54 changes: 37 additions & 17 deletions x-pack/auditbeat/module/system/socket/guess/sockaddrin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
package guess

import (
"bytes"
"encoding/binary"
"net"

"github.com/pkg/errors"
"golang.org/x/sys/unix"

"github.com/elastic/beats/libbeat/common"
Expand All @@ -29,19 +30,15 @@ import (

func init() {
if err := Registry.AddGuess(
&guessSockaddrIn{
address: net.TCPAddr{
IP: net.IPv4(127, 0x12, 0x34, 0x56).To4(),
Port: 0xABCD,
},
}); err != nil {
&guessSockaddrIn{}); err != nil {
panic(err)
}
}

type guessSockaddrIn struct {
ctx Context
address net.TCPAddr
ctx Context
local, remote unix.SockaddrInet4
server, client int
}

// Name of this guess.
Expand Down Expand Up @@ -81,25 +78,48 @@ func (g *guessSockaddrIn) Probes() ([]helper.ProbeDef, error) {
}

// Prepare is a no-op.
func (g *guessSockaddrIn) Prepare(ctx Context) error {
func (g *guessSockaddrIn) Prepare(ctx Context) (err error) {
g.ctx = ctx
g.local = unix.SockaddrInet4{
Port: 0,
Addr: randomLocalIP(),
}
g.remote = unix.SockaddrInet4{
Port: 0,
Addr: randomLocalIP(),
}
for bytes.Equal(g.local.Addr[:], g.remote.Addr[:]) {
g.remote.Addr = randomLocalIP()
}
if g.server, g.local, err = createSocket(g.local); err != nil {
return errors.Wrap(err, "error creating server")
}
if g.client, g.remote, err = createSocket(g.remote); err != nil {
return errors.Wrap(err, "error creating client")
}
if err = unix.Listen(g.server, 1); err != nil {
return errors.Wrap(err, "error in listen")
}
return nil
}

// Terminate is a no-op.
func (g *guessSockaddrIn) Terminate() error {
unix.Close(g.client)
unix.Close(g.server)
return nil
}

// Trigger connects a socket to a random local address (127.x.x.x).
func (g *guessSockaddrIn) Trigger() error {
dialer := net.Dialer{
Timeout: g.ctx.Timeout,
if err := unix.Connect(g.client, &g.local); err != nil {
return err
}
conn, err := dialer.Dial("tcp", g.address.String())
if err == nil {
conn.Close()
fd, _, err := unix.Accept(g.server)
if err != nil {
return err
}
unix.Close(fd)
return nil
}

Expand All @@ -116,13 +136,13 @@ func (g *guessSockaddrIn) Extract(ev interface{}) (common.MapStr, bool) {
return nil, false
}

binary.BigEndian.PutUint16(needle[:], uint16(g.address.Port))
binary.BigEndian.PutUint16(needle[:], uint16(g.local.Port))
offsetOfPort := indexAligned(arr, needle[:], offsetOfFamily+2, 2)
if offsetOfPort == -1 {
return nil, false
}

offsetOfAddr := indexAligned(arr, []byte(g.address.IP), offsetOfPort+2, 4)
offsetOfAddr := indexAligned(arr, []byte(g.local.Addr[:]), offsetOfPort+2, 4)
if offsetOfAddr == -1 {
return nil, false
}
Expand Down
66 changes: 49 additions & 17 deletions x-pack/auditbeat/module/system/socket/guess/sockaddrin6.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ package guess

import (
"encoding/binary"
"net"

"github.com/pkg/errors"
"golang.org/x/sys/unix"

"github.com/elastic/beats/libbeat/common"
Expand All @@ -30,19 +30,16 @@ import (

func init() {
if err := Registry.AddGuess(
&guessSockaddrIn6{
address: net.TCPAddr{
IP: []byte{0xFD, 0xE5, 0x7C, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xAA, 0xBB, 0xCC, 0xDD},
Port: 0xFEEF,
},
}); err != nil {
&guessSockaddrIn6{}); err != nil {
panic(err)
}
}

type guessSockaddrIn6 struct {
ctx Context
address net.TCPAddr
ctx Context
loopback helper.IPv6Loopback
clientAddr, serverAddr unix.SockaddrInet6
client, server int
}

// Name of this guess.
Expand Down Expand Up @@ -88,25 +85,60 @@ func (g *guessSockaddrIn6) Probes() ([]helper.ProbeDef, error) {
}

// Prepare is a no-op.
func (g *guessSockaddrIn6) Prepare(ctx Context) error {
func (g *guessSockaddrIn6) Prepare(ctx Context) (err error) {
g.ctx = ctx
g.loopback, err = helper.NewIPv6Loopback()
if err != nil {
return errors.Wrap(err, "detect IPv6 loopback failed")
}
defer func() {
if err != nil {
g.loopback.Cleanup()
}
}()
clientIP, err := g.loopback.AddRandomAddress()
if err != nil {
return errors.Wrap(err, "failed adding first device address")
}
serverIP, err := g.loopback.AddRandomAddress()
if err != nil {
return errors.Wrap(err, "failed adding second device address")
}
copy(g.clientAddr.Addr[:], clientIP)
copy(g.serverAddr.Addr[:], serverIP)

if g.client, g.clientAddr, err = createSocket6WithProto(unix.SOCK_STREAM, g.clientAddr); err != nil {
return errors.Wrap(err, "error creating server")
}
if g.server, g.serverAddr, err = createSocket6WithProto(unix.SOCK_STREAM, g.serverAddr); err != nil {
return errors.Wrap(err, "error creating client")
}
if err = unix.Listen(g.server, 1); err != nil {
return errors.Wrap(err, "error in listen")
}
return nil
}

// Terminate is a no-op.
func (g *guessSockaddrIn6) Terminate() error {
unix.Close(g.client)
unix.Close(g.server)
if err := g.loopback.Cleanup(); err != nil {
return err
}
return nil
}

// Trigger performs a connection attempt on the random address.
func (g *guessSockaddrIn6) Trigger() error {
dialer := net.Dialer{
Timeout: g.ctx.Timeout,
if err := unix.Connect(g.client, &g.serverAddr); err != nil {
return errors.Wrap(err, "connect failed")
}
conn, err := dialer.Dial("tcp", g.address.String())
if err == nil {
conn.Close()
fd, _, err := unix.Accept(g.server)
if err != nil {
return errors.Wrap(err, "accept failed")
}
unix.Close(fd)
return nil
}

Expand All @@ -124,13 +156,13 @@ func (g *guessSockaddrIn6) Extract(ev interface{}) (common.MapStr, bool) {
return nil, false
}

binary.BigEndian.PutUint16(needle[:], uint16(g.address.Port))
binary.BigEndian.PutUint16(needle[:], uint16(g.serverAddr.Port))
offsetOfPort := indexAligned(arr, needle[:], offsetOfFamily+2, 2)
if offsetOfPort == -1 {
return nil, false
}

offsetOfAddr := indexAligned(arr, g.address.IP, offsetOfPort+2, 1)
offsetOfAddr := indexAligned(arr, g.serverAddr.Addr[:], offsetOfPort+2, 1)
if offsetOfAddr == -1 {
return nil, false
}
Expand Down
10 changes: 9 additions & 1 deletion x-pack/auditbeat/module/system/socket/socket_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ func (m *MetricSet) Setup() (err error) {

hasIPv6, err := detectIPv6()
if err != nil {
return errors.Wrap(err, "error detecting IPv6 support")
m.log.Debugf("Error detecting IPv6 support: %v", err)
hasIPv6 = false
}
m.log.Debugf("IPv6 supported: %v", hasIPv6)
if m.config.EnableIPv6 != nil {
Expand Down Expand Up @@ -459,6 +460,13 @@ func (m *mountPoint) String() string {
}

func detectIPv6() (bool, error) {
// Check that AF_INET6 is available.
// This fails when the kernel is booted with ipv6.disable=1
fd, err := unix.Socket(unix.AF_INET6, unix.SOCK_DGRAM, 0)
if err != nil {
return false, nil
}
unix.Close(fd)
loopback, err := helper.NewIPv6Loopback()
if err != nil {
return false, err
Expand Down

0 comments on commit 14140ab

Please sign in to comment.