Skip to content

Commit

Permalink
Replace Windows system probe socket endpoint localhost:3333 with name…
Browse files Browse the repository at this point in the history
…d pipe (#29028)

Co-authored-by:  <[email protected]>
  • Loading branch information
alexn-dd authored Sep 14, 2024
1 parent 846ea6e commit 10d974e
Show file tree
Hide file tree
Showing 20 changed files with 411 additions and 35 deletions.
36 changes: 36 additions & 0 deletions cmd/agent/subcommands/flare/command_other_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build !windows

// Package flare implements 'agent flare'.
package flare

import (
"net/http"
"net/http/httptest"

"github.com/stretchr/testify/require"

"github.com/DataDog/datadog-agent/pkg/config/model"
)

// NewSystemProbeTestServer starts a new mock server to handle System Probe requests.
func NewSystemProbeTestServer(_ http.Handler) (*httptest.Server, error) {
// Linux still uses a port-based system-probe, it does not need a dedicated system probe server
// for the tests.
return nil, nil
}

// InjectConnectionFailures injects a failure in TestReadProfileDataErrors.
func InjectConnectionFailures(_ model.Config, _ model.Config) {
}

// CheckExpectedConnectionFailures checks the expected errors after simulated
// connection failures.
func CheckExpectedConnectionFailures(c *commandTestSuite, err error) {
// System probe by default is disabled and no connection is attempted for it in the test.
require.Regexp(c.T(), "^4 errors occurred:\n", err.Error())
}
59 changes: 47 additions & 12 deletions cmd/agent/subcommands/flare/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,38 @@ type commandTestSuite struct {
sysprobeSocketPath string
tcpServer *httptest.Server
unixServer *httptest.Server
systemProbeServer *httptest.Server
}

func (c *commandTestSuite) SetupSuite() {
t := c.T()
c.sysprobeSocketPath = path.Join(t.TempDir(), "sysprobe.sock")
c.tcpServer, c.unixServer = c.getPprofTestServer()
}

func (c *commandTestSuite) TearDownSuite() {
c.tcpServer.Close()
if c.unixServer != nil {
c.unixServer.Close()
}
// startTestServers starts test servers from a clean state to ensure no cache responses are used.
// This should be called by each test that requires them.
func (c *commandTestSuite) startTestServers() {
t := c.T()
c.tcpServer, c.unixServer, c.systemProbeServer = c.getPprofTestServer()

t.Cleanup(func() {
if c.tcpServer != nil {
c.tcpServer.Close()
c.tcpServer = nil
}
if c.unixServer != nil {
c.unixServer.Close()
c.unixServer = nil
}
if c.systemProbeServer != nil {
c.systemProbeServer.Close()
c.systemProbeServer = nil
}
})
}

func (c *commandTestSuite) getPprofTestServer() (tcpServer *httptest.Server, unixServer *httptest.Server) {
t := c.T()
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
func newMockHandler() http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/debug/pprof/heap":
w.Write([]byte("heap_profile"))
Expand All @@ -67,17 +81,28 @@ func (c *commandTestSuite) getPprofTestServer() (tcpServer *httptest.Server, uni
w.WriteHeader(500)
}
})
}

func (c *commandTestSuite) getPprofTestServer() (tcpServer *httptest.Server, unixServer *httptest.Server, sysProbeServer *httptest.Server) {
var err error
t := c.T()

handler := newMockHandler()
tcpServer = httptest.NewServer(handler)
if runtime.GOOS == "linux" {
unixServer = httptest.NewUnstartedServer(handler)
var err error
unixServer.Listener, err = net.Listen("unix", c.sysprobeSocketPath)
require.NoError(t, err, "could not create listener for unix socket on %s", c.sysprobeSocketPath)
unixServer.Start()
}

return tcpServer, unixServer
sysProbeServer, err = NewSystemProbeTestServer(handler)
require.NoError(c.T(), err, "could not restart system probe server")
if sysProbeServer != nil {
sysProbeServer.Start()
}

return tcpServer, unixServer, sysProbeServer
}

func TestCommandTestSuite(t *testing.T) {
Expand All @@ -86,6 +111,8 @@ func TestCommandTestSuite(t *testing.T) {

func (c *commandTestSuite) TestReadProfileData() {
t := c.T()
c.startTestServers()

u, err := url.Parse(c.tcpServer.URL)
require.NoError(t, err)
port := u.Port()
Expand Down Expand Up @@ -154,6 +181,8 @@ func (c *commandTestSuite) TestReadProfileData() {

func (c *commandTestSuite) TestReadProfileDataNoTraceAgent() {
t := c.T()
c.startTestServers()

u, err := url.Parse(c.tcpServer.URL)
require.NoError(t, err)
port := u.Port()
Expand Down Expand Up @@ -217,6 +246,8 @@ func (c *commandTestSuite) TestReadProfileDataNoTraceAgent() {

func (c *commandTestSuite) TestReadProfileDataErrors() {
t := c.T()
c.startTestServers()

mockConfig := configmock.New(t)
// setting Core Agent Expvar port to 0 to ensure failing on fetch (using the default value can lead to
// successful request when running next to an Agent)
Expand All @@ -226,9 +257,13 @@ func (c *commandTestSuite) TestReadProfileDataErrors() {
mockConfig.SetWithoutSource("process_config.enabled", true)
mockConfig.SetWithoutSource("process_config.expvar_port", 0)

mockSysProbeConfig := configmock.NewSystemProbe(t)
InjectConnectionFailures(mockSysProbeConfig, mockConfig)

data, err := readProfileData(10)

require.Error(t, err)
require.Regexp(t, "^4 errors occurred:\n", err.Error())
CheckExpectedConnectionFailures(c, err)
require.Len(t, data, 0)
}

Expand Down
63 changes: 63 additions & 0 deletions cmd/agent/subcommands/flare/command_windows_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

//go:build windows

// Package flare implements 'agent flare'.
package flare

import (
"net/http"
"net/http/httptest"

"github.com/stretchr/testify/require"

"github.com/DataDog/datadog-agent/pkg/config/model"
process_net "github.com/DataDog/datadog-agent/pkg/process/net"
)

const (
// SystemProbeTestPipeName is the test named pipe for system-probe
SystemProbeTestPipeName = `\\.\pipe\dd_system_probe_test`
)

// NewSystemProbeTestServer starts a new mock server to handle System Probe requests.
func NewSystemProbeTestServer(handler http.Handler) (*httptest.Server, error) {
server := httptest.NewUnstartedServer(handler)

// Override the named pipe path for tests to avoid conflicts with the locally installed Datadog agent.
process_net.OverrideSystemProbeNamedPipePath(SystemProbeTestPipeName)
conn, err := process_net.NewSystemProbeListener("")
if err != nil {
return nil, err
}

server.Listener = conn.GetListener()
return server, nil
}

// InjectConnectionFailures injects a failure in TestReadProfileDataErrors.
func InjectConnectionFailures(mockSysProbeConfig model.Config, mockConfig model.Config) {
// Explicitly enabled system probe to exercise connections to it.
mockSysProbeConfig.SetWithoutSource("system_probe_config.enabled", true)

// Exercise connection failures for the Windows system probe named pipe clients by
// making them use a bad path.
// The system probe http server must be setup before this override.
process_net.OverrideSystemProbeNamedPipePath(`\\.\pipe\dd_system_probe_test_bad`)

// The security-agent connection is expected to fail too in this test, but
// by enabling system probe, a port will be provided to it (security agent).
// Here we make sure the security agent port is a bad one.
mockConfig.SetWithoutSource("security_agent.expvar_port", 0)
}

// CheckExpectedConnectionFailures checks the expected errors after simulated
// connection failures.
func CheckExpectedConnectionFailures(c *commandTestSuite, err error) {
// In Windows, this test explicitly simulates a system probe connection failure.
// We expect the standard socket errors (4) and a named pipe failure for system probe.
require.Regexp(c.T(), "^5 errors occurred:\n", err.Error())
}
4 changes: 2 additions & 2 deletions cmd/system-probe/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import (

// StartServer starts the HTTP and gRPC servers for the system-probe, which registers endpoints from all enabled modules.
func StartServer(cfg *sysconfigtypes.Config, telemetry telemetry.Component, wmeta workloadmeta.Component, settings settings.Component) error {
conn, err := net.NewListener(cfg.SocketAddress)
conn, err := net.NewSystemProbeListener(cfg.SocketAddress)
if err != nil {
return fmt.Errorf("error creating IPC socket: %s", err)
return err
}

mux := gorilla.NewRouter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

//go:build windows

// Package probe parses Windows crash dumps.
package probe

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package wincrashdetect

import (
"fmt"
"net"
"net/http"

Expand All @@ -24,18 +23,19 @@ import (
"github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/metrics/event"

//process_net "github.com/DataDog/datadog-agent/pkg/process/net"
process_net "github.com/DataDog/datadog-agent/pkg/process/net"

"golang.org/x/sys/windows/registry"
)

func createSystemProbeListener() (l net.Listener, close func()) {
l, err := net.Listen("tcp", "127.0.0.1:0")
// No socket address. Windows uses a fixed name pipe
conn, err := process_net.NewSystemProbeListener("")
if err != nil {
panic(err)
}
return l, func() {
_ = l.Close()
return conn.GetListener(), func() {
_ = conn.GetListener().Close()
}
}

Expand Down Expand Up @@ -69,8 +69,8 @@ func TestWinCrashReporting(t *testing.T) {
}
defer server.Close()

sock := fmt.Sprintf("localhost:%d", listener.Addr().(*net.TCPAddr).Port)
config.SystemProbe().SetWithoutSource("system_probe_config.sysprobe_socket", sock)
// no socket address is set in config for Windows since system probe
// utilizes a fixed named pipe.

/*
* the underlying system probe connector is a singleton. Therefore, we can't set up different
Expand Down
2 changes: 1 addition & 1 deletion pkg/languagedetection/detector_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func startTestUnixServer(t *testing.T, handler http.Handler) string {
t.Helper()

socketPath := path.Join(t.TempDir(), "test.sock")
listener, err := net.NewListener(socketPath)
listener, err := net.NewSystemProbeListener(socketPath)
require.NoError(t, err)
t.Cleanup(listener.Stop)

Expand Down
6 changes: 3 additions & 3 deletions pkg/process/net/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func newSystemProbe(path string) *RemoteSysProbeUtil {
MaxIdleConns: 2,
IdleConnTimeout: 30 * time.Second,
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial(netType, path)
return DialSystemProbe(netType, path)
},
TLSHandshakeTimeout: 1 * time.Second,
ResponseHeaderTimeout: 5 * time.Second,
Expand All @@ -331,7 +331,7 @@ func newSystemProbe(path string) *RemoteSysProbeUtil {
pprofClient: http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial(netType, path)
return DialSystemProbe(netType, path)
},
},
},
Expand All @@ -340,7 +340,7 @@ func newSystemProbe(path string) *RemoteSysProbeUtil {
// is that the caller will set a timeout on each request
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial(netType, path)
return DialSystemProbe(netType, path)
},
},
},
Expand Down
6 changes: 6 additions & 0 deletions pkg/process/net/common_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package net

import (
"fmt"
"net"
"os"

sysconfig "github.com/DataDog/datadog-agent/cmd/system-probe/config"
Expand Down Expand Up @@ -40,3 +41,8 @@ func CheckPath(path string) error {
}
return nil
}

// DialSystemProbe connects to the system-probe service endpoint
func DialSystemProbe(netType string, path string) (net.Conn, error) {
return net.Dial(netType, path)
}
7 changes: 7 additions & 0 deletions pkg/process/net/common_unsupported.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package net

import (
"net"

model "github.com/DataDog/agent-payload/v5/process"

"github.com/DataDog/datadog-agent/pkg/languagedetection/languagemodels"
Expand Down Expand Up @@ -73,3 +75,8 @@ func (r *RemoteSysProbeUtil) DetectLanguage([]int32) ([]languagemodels.Language,
func (r *RemoteSysProbeUtil) GetPprof(_ string) ([]byte, error) {
return nil, ErrNotImplemented
}

// DialSystemProbe connects to the system-probe service endpoint
func DialSystemProbe(netType string, path string) (net.Conn, error) {
return net.Dial(netType, path)
}
3 changes: 3 additions & 0 deletions pkg/process/net/common_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ const (
procStatsURL = "http://localhost:3333/" + string(sysconfig.ProcessModule) + "stats"
// pingURL is not used in windows, the value is added to avoid compilation error in windows
pingURL = "http://localhost:3333/" + string(sysconfig.PingModule) + "/ping/"

// SystemProbePipeName is the production named pipe for system probe
SystemProbePipeName = `\\.\pipe\dd_system_probe`
)

// CheckPath is used to make sure the globalSocketPath has been set before attempting to connect
Expand Down
14 changes: 12 additions & 2 deletions pkg/process/net/uds.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ type UDSListener struct {
socketPath string
}

// NewListener returns an idle UDSListener
func NewListener(socketAddr string) (*UDSListener, error) {
// newSocketListener creates a Unix Domain Socket Listener
func newSocketListener(socketAddr string) (*UDSListener, error) {
if len(socketAddr) == 0 {
return nil, fmt.Errorf("uds: empty socket path provided")
}
Expand Down Expand Up @@ -73,6 +73,16 @@ func NewListener(socketAddr string) (*UDSListener, error) {
return listener, nil
}

// NewSystemProbeListener returns an idle UDSListener
func NewSystemProbeListener(socketAddr string) (*UDSListener, error) {
var listener, err = newSocketListener(socketAddr)
if err != nil {
return nil, fmt.Errorf("error creating IPC socket: %s", err)
}

return listener, err
}

// GetListener will return the underlying Conn's net.Listener
func (l *UDSListener) GetListener() net.Listener {
return l.conn
Expand Down
Loading

0 comments on commit 10d974e

Please sign in to comment.