Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #13966 to 7.4: [SIEM][Auditbeat] Fix socket dataset startup when IPv6 is disabled #14041

Merged
merged 1 commit into from
Oct 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Auditbeat*

- Socket dataset: Fix start errors when IPv6 is disabled on the kernel. {issue}13953[13953] {pull}13966[13966]

*Filebeat*

Expand Down
6 changes: 3 additions & 3 deletions x-pack/auditbeat/module/system/socket/guess/guess.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@ func guessOnce(guesser Guesser, installer helper.ProbeInstaller, ctx Context) (r
// Need to make sure that the trigger has finished before extracting
// results because there could be data-races between Trigger and Extract.
select {
case result := <-thread.C():
if result.Err != nil {
return nil, errors.Wrap(err, "trigger execution failed")
case r := <-thread.C():
if r.Err != nil {
return nil, errors.Wrap(r.Err, "trigger execution failed")
}
case <-timer.C:
return nil, errors.New("timeout while waiting for trigger to complete")
Expand Down
15 changes: 11 additions & 4 deletions x-pack/auditbeat/module/system/socket/guess/inetsockaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ func init() {
}

type guessInetSockFamily struct {
ctx Context
family int
limit int
ctx Context
family int
limit int
canIPv6 bool
}

// Name of this guess.
Expand Down Expand Up @@ -103,6 +104,12 @@ func (g *guessInetSockFamily) Prepare(ctx Context) error {
if g.limit, ok = g.ctx.Vars["INET_SOCK_V6_LIMIT"].(int); !ok {
return errors.New("required variable INET_SOCK_V6_LIMIT not found")
}
// check that this system can create AF_INET6 sockets. Otherwise revert to
// using AF_INET only.
fd, err := unix.Socket(unix.AF_INET6, unix.SOCK_DGRAM, 0)
if g.canIPv6 = err == nil; g.canIPv6 {
unix.Close(fd)
}
return nil
}

Expand All @@ -114,7 +121,7 @@ func (g *guessInetSockFamily) Terminate() error {
// Trigger creates and then closes a socket alternating between AF_INET/AF_INET6
// on each run.
func (g *guessInetSockFamily) Trigger() error {
if g.family == unix.AF_INET {
if g.canIPv6 && g.family == unix.AF_INET {
g.family = unix.AF_INET6
} else {
g.family = unix.AF_INET
Expand Down
54 changes: 37 additions & 17 deletions x-pack/auditbeat/module/system/socket/guess/sockaddrin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
package guess

import (
"bytes"
"encoding/binary"
"net"

"github.com/pkg/errors"
"golang.org/x/sys/unix"

"github.com/elastic/beats/libbeat/common"
Expand All @@ -29,19 +30,15 @@ import (

func init() {
if err := Registry.AddGuess(
&guessSockaddrIn{
address: net.TCPAddr{
IP: net.IPv4(127, 0x12, 0x34, 0x56).To4(),
Port: 0xABCD,
},
}); err != nil {
&guessSockaddrIn{}); err != nil {
panic(err)
}
}

type guessSockaddrIn struct {
ctx Context
address net.TCPAddr
ctx Context
local, remote unix.SockaddrInet4
server, client int
}

// Name of this guess.
Expand Down Expand Up @@ -81,25 +78,48 @@ func (g *guessSockaddrIn) Probes() ([]helper.ProbeDef, error) {
}

// Prepare is a no-op.
func (g *guessSockaddrIn) Prepare(ctx Context) error {
func (g *guessSockaddrIn) Prepare(ctx Context) (err error) {
g.ctx = ctx
g.local = unix.SockaddrInet4{
Port: 0,
Addr: randomLocalIP(),
}
g.remote = unix.SockaddrInet4{
Port: 0,
Addr: randomLocalIP(),
}
for bytes.Equal(g.local.Addr[:], g.remote.Addr[:]) {
g.remote.Addr = randomLocalIP()
}
if g.server, g.local, err = createSocket(g.local); err != nil {
return errors.Wrap(err, "error creating server")
}
if g.client, g.remote, err = createSocket(g.remote); err != nil {
return errors.Wrap(err, "error creating client")
}
if err = unix.Listen(g.server, 1); err != nil {
return errors.Wrap(err, "error in listen")
}
return nil
}

// Terminate is a no-op.
func (g *guessSockaddrIn) Terminate() error {
unix.Close(g.client)
unix.Close(g.server)
return nil
}

// Trigger connects a socket to a random local address (127.x.x.x).
func (g *guessSockaddrIn) Trigger() error {
dialer := net.Dialer{
Timeout: g.ctx.Timeout,
if err := unix.Connect(g.client, &g.local); err != nil {
return err
}
conn, err := dialer.Dial("tcp", g.address.String())
if err == nil {
conn.Close()
fd, _, err := unix.Accept(g.server)
if err != nil {
return err
}
unix.Close(fd)
return nil
}

Expand All @@ -116,13 +136,13 @@ func (g *guessSockaddrIn) Extract(ev interface{}) (common.MapStr, bool) {
return nil, false
}

binary.BigEndian.PutUint16(needle[:], uint16(g.address.Port))
binary.BigEndian.PutUint16(needle[:], uint16(g.local.Port))
offsetOfPort := indexAligned(arr, needle[:], offsetOfFamily+2, 2)
if offsetOfPort == -1 {
return nil, false
}

offsetOfAddr := indexAligned(arr, []byte(g.address.IP), offsetOfPort+2, 4)
offsetOfAddr := indexAligned(arr, []byte(g.local.Addr[:]), offsetOfPort+2, 4)
if offsetOfAddr == -1 {
return nil, false
}
Expand Down
66 changes: 49 additions & 17 deletions x-pack/auditbeat/module/system/socket/guess/sockaddrin6.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ package guess

import (
"encoding/binary"
"net"

"github.com/pkg/errors"
"golang.org/x/sys/unix"

"github.com/elastic/beats/libbeat/common"
Expand All @@ -30,19 +30,16 @@ import (

func init() {
if err := Registry.AddGuess(
&guessSockaddrIn6{
address: net.TCPAddr{
IP: []byte{0xFD, 0xE5, 0x7C, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xAA, 0xBB, 0xCC, 0xDD},
Port: 0xFEEF,
},
}); err != nil {
&guessSockaddrIn6{}); err != nil {
panic(err)
}
}

type guessSockaddrIn6 struct {
ctx Context
address net.TCPAddr
ctx Context
loopback helper.IPv6Loopback
clientAddr, serverAddr unix.SockaddrInet6
client, server int
}

// Name of this guess.
Expand Down Expand Up @@ -88,25 +85,60 @@ func (g *guessSockaddrIn6) Probes() ([]helper.ProbeDef, error) {
}

// Prepare is a no-op.
func (g *guessSockaddrIn6) Prepare(ctx Context) error {
func (g *guessSockaddrIn6) Prepare(ctx Context) (err error) {
g.ctx = ctx
g.loopback, err = helper.NewIPv6Loopback()
if err != nil {
return errors.Wrap(err, "detect IPv6 loopback failed")
}
defer func() {
if err != nil {
g.loopback.Cleanup()
}
}()
clientIP, err := g.loopback.AddRandomAddress()
if err != nil {
return errors.Wrap(err, "failed adding first device address")
}
serverIP, err := g.loopback.AddRandomAddress()
if err != nil {
return errors.Wrap(err, "failed adding second device address")
}
copy(g.clientAddr.Addr[:], clientIP)
copy(g.serverAddr.Addr[:], serverIP)

if g.client, g.clientAddr, err = createSocket6WithProto(unix.SOCK_STREAM, g.clientAddr); err != nil {
return errors.Wrap(err, "error creating server")
}
if g.server, g.serverAddr, err = createSocket6WithProto(unix.SOCK_STREAM, g.serverAddr); err != nil {
return errors.Wrap(err, "error creating client")
}
if err = unix.Listen(g.server, 1); err != nil {
return errors.Wrap(err, "error in listen")
}
return nil
}

// Terminate is a no-op.
func (g *guessSockaddrIn6) Terminate() error {
unix.Close(g.client)
unix.Close(g.server)
if err := g.loopback.Cleanup(); err != nil {
return err
}
return nil
}

// Trigger performs a connection attempt on the random address.
func (g *guessSockaddrIn6) Trigger() error {
dialer := net.Dialer{
Timeout: g.ctx.Timeout,
if err := unix.Connect(g.client, &g.serverAddr); err != nil {
return errors.Wrap(err, "connect failed")
}
conn, err := dialer.Dial("tcp", g.address.String())
if err == nil {
conn.Close()
fd, _, err := unix.Accept(g.server)
if err != nil {
return errors.Wrap(err, "accept failed")
}
unix.Close(fd)
return nil
}

Expand All @@ -124,13 +156,13 @@ func (g *guessSockaddrIn6) Extract(ev interface{}) (common.MapStr, bool) {
return nil, false
}

binary.BigEndian.PutUint16(needle[:], uint16(g.address.Port))
binary.BigEndian.PutUint16(needle[:], uint16(g.serverAddr.Port))
offsetOfPort := indexAligned(arr, needle[:], offsetOfFamily+2, 2)
if offsetOfPort == -1 {
return nil, false
}

offsetOfAddr := indexAligned(arr, g.address.IP, offsetOfPort+2, 1)
offsetOfAddr := indexAligned(arr, g.serverAddr.Addr[:], offsetOfPort+2, 1)
if offsetOfAddr == -1 {
return nil, false
}
Expand Down
10 changes: 9 additions & 1 deletion x-pack/auditbeat/module/system/socket/socket_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ func (m *MetricSet) Setup() (err error) {

hasIPv6, err := detectIPv6()
if err != nil {
return errors.Wrap(err, "error detecting IPv6 support")
m.log.Debugf("Error detecting IPv6 support: %v", err)
hasIPv6 = false
}
m.log.Debugf("IPv6 supported: %v", hasIPv6)
if m.config.EnableIPv6 != nil {
Expand Down Expand Up @@ -459,6 +460,13 @@ func (m *mountPoint) String() string {
}

func detectIPv6() (bool, error) {
// Check that AF_INET6 is available.
// This fails when the kernel is booted with ipv6.disable=1
fd, err := unix.Socket(unix.AF_INET6, unix.SOCK_DGRAM, 0)
if err != nil {
return false, nil
}
unix.Close(fd)
loopback, err := helper.NewIPv6Loopback()
if err != nil {
return false, err
Expand Down