Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inputs.http_listener_v2): Add unix socket mode #15764

Merged
merged 13 commits into from
Sep 4, 2024
15 changes: 13 additions & 2 deletions plugins/inputs/http_listener_v2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,19 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
```toml @sample.conf
# Generic HTTP write listener
[[inputs.http_listener_v2]]
## Address and port to host HTTP listener on
service_address = ":8080"
## Address to host HTTP listener on
## can be prefixed by protocol tcp, or unix if not provided defaults to tcp
## if unix network type provided it should be followed by absolute path for unix socket
service_address = "tcp://:8080"
## service_address = "tcp://:8443"
## service_address = "unix:///tmp/telegraf.sock"

## Permission for unix sockets (only available for unix sockets)
## This setting may not be respected by some platforms. To safely restrict
## permissions it is recommended to place the socket into a previously
## created directory with the desired permissions.
## ex: socket_mode = "777"
# socket_mode = ""

## Paths to listen to.
# paths = ["/telegraf"]
Expand Down
69 changes: 60 additions & 9 deletions plugins/inputs/http_listener_v2/http_listener_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,16 @@ import (
"crypto/tls"
_ "embed"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"regexp"
"runtime"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -47,6 +53,7 @@ type TimeFunc func() time.Time
// HTTPListenerV2 is an input plugin that collects external metrics sent via HTTP
type HTTPListenerV2 struct {
ServiceAddress string `toml:"service_address"`
SocketMode string `toml:"socket_mode"`
Path string `toml:"path" deprecated:"1.20.0;1.35.0;use 'paths' instead"`
Paths []string `toml:"paths"`
PathTag bool `toml:"path_tag"`
Expand All @@ -56,7 +63,7 @@ type HTTPListenerV2 struct {
ReadTimeout config.Duration `toml:"read_timeout"`
WriteTimeout config.Duration `toml:"write_timeout"`
MaxBodySize config.Size `toml:"max_body_size"`
Port int `toml:"port"`
Port int `toml:"port" deprecated:"1.32.0;1.35.0;use 'service_address' instead"`
SuccessCode int `toml:"http_success_code"`
BasicUsername string `toml:"basic_username"`
BasicPassword string `toml:"basic_password"`
Expand All @@ -72,6 +79,7 @@ type HTTPListenerV2 struct {
close chan struct{}

listener net.Listener
url *url.URL

telegraf.Parser
acc telegraf.Accumulator
Expand All @@ -91,6 +99,49 @@ func (h *HTTPListenerV2) SetParser(parser telegraf.Parser) {

// Start starts the http listener service.
func (h *HTTPListenerV2) Start(acc telegraf.Accumulator) error {
u := h.url
address := u.Host
switch u.Scheme {
case "tcp":
case "unix":
path := filepath.FromSlash(u.Path)
if runtime.GOOS == "windows" && strings.Contains(path, ":") {
path = strings.TrimPrefix(path, `\`)
}
if err := os.Remove(path); err != nil && !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("removing socket failed: %w", err)
}
address = path
default:
return fmt.Errorf("unknown protocol %q", u.Scheme)
}

var listener net.Listener
var err error
if h.tlsConf != nil {
listener, err = tls.Listen(u.Scheme, address, h.tlsConf)
} else {
listener, err = net.Listen(u.Scheme, address)
}
if err != nil {
return err
}
h.listener = listener

if u.Scheme == "unix" && h.SocketMode != "" {
// Set permissions on socket
// Convert from octal in string to int
i, err := strconv.ParseUint(h.SocketMode, 8, 32)
if err != nil {
return fmt.Errorf("converting socket mode failed: %w", err)
}

perm := os.FileMode(uint32(i))
if err := os.Chmod(address, perm); err != nil {
return fmt.Errorf("changing socket permissions failed: %w", err)
}
}

if h.MaxBodySize == 0 {
h.MaxBodySize = config.Size(defaultMaxBodySize)
}
Expand Down Expand Up @@ -151,18 +202,18 @@ func (h *HTTPListenerV2) Init() error {
return err
}

var listener net.Listener
if tlsConf != nil {
listener, err = tls.Listen("tcp", h.ServiceAddress, tlsConf)
} else {
listener, err = net.Listen("tcp", h.ServiceAddress)
protoRegex := regexp.MustCompile(`\w://`)
if !protoRegex.MatchString(h.ServiceAddress) {
h.ServiceAddress = "tcp://" + h.ServiceAddress
}

u, err := url.Parse(h.ServiceAddress)
if err != nil {
return err
return fmt.Errorf("parsing address failed: %w", err)
}

h.url = u
h.tlsConf = tlsConf
h.listener = listener
h.Port = listener.Addr().(*net.TCPAddr).Port

if h.SuccessCode == 0 {
h.SuccessCode = http.StatusNoContent
Expand Down
44 changes: 42 additions & 2 deletions plugins/inputs/http_listener_v2/http_listener_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package http_listener_v2

import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"net"
"net/http"
"net/url"
"os"
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -109,9 +112,13 @@ func getHTTPSClient() *http.Client {
}

func createURL(listener *HTTPListenerV2, scheme string, path string, rawquery string) string {
var port int
if strings.HasPrefix(listener.ServiceAddress, "tcp://") {
port = listener.listener.Addr().(*net.TCPAddr).Port
}
u := url.URL{
Scheme: scheme,
Host: "localhost:" + strconv.Itoa(listener.Port),
Host: "localhost:" + strconv.Itoa(port),
Path: path,
RawQuery: rawquery,
}
Expand All @@ -134,7 +141,9 @@ func TestInvalidListenerConfig(t *testing.T) {
close: make(chan struct{}),
}

require.Error(t, listener.Init())
require.NoError(t, listener.Init())
acc := &testutil.Accumulator{}
require.Error(t, listener.Start(acc))

// Stop is called when any ServiceInput fails to start; it must succeed regardless of state
listener.Stop()
Expand Down Expand Up @@ -724,6 +733,37 @@ func TestServerHeaders(t *testing.T) {
require.Equal(t, "value", resp.Header.Get("key"))
}

func TestUnixSocket(t *testing.T) {
listener, err := newTestHTTPListenerV2()
require.NoError(t, err)
file, err := os.CreateTemp("", "*.socket")
require.NoError(t, err)
require.NoError(t, file.Close())
defer os.Remove(file.Name())
socketName := file.Name()
if runtime.GOOS == "windows" {
listener.ServiceAddress = "unix:///" + socketName
} else {
listener.ServiceAddress = "unix://" + socketName
}
listener.SocketMode = "777"
acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()
httpc := http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", socketName)
},
},
}
resp, err := httpc.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBufferString(testMsg))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 204, resp.StatusCode)
}

func mustReadHugeMetric() []byte {
filePath := "testdata/huge_metric"
data, err := os.ReadFile(filePath)
Expand Down
15 changes: 13 additions & 2 deletions plugins/inputs/http_listener_v2/sample.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
# Generic HTTP write listener
[[inputs.http_listener_v2]]
## Address and port to host HTTP listener on
service_address = ":8080"
## Address to host HTTP listener on
## can be prefixed by protocol tcp, or unix if not provided defaults to tcp
## if unix network type provided it should be followed by absolute path for unix socket
service_address = "tcp://:8080"
## service_address = "tcp://:8443"
## service_address = "unix:///tmp/telegraf.sock"

## Permission for unix sockets (only available for unix sockets)
## This setting may not be respected by some platforms. To safely restrict
## permissions it is recommended to place the socket into a previously
## created directory with the desired permissions.
## ex: socket_mode = "777"
# socket_mode = ""

## Paths to listen to.
# paths = ["/telegraf"]
Expand Down
Loading