Skip to content

Commit

Permalink
feat(inputs.http_listener_v2): add unix socket mode
Browse files Browse the repository at this point in the history
  • Loading branch information
bazko1 committed Aug 21, 2024
1 parent 0bfb587 commit 06bfdea
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 10 deletions.
11 changes: 11 additions & 0 deletions plugins/inputs/http_listener_v2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## Address and port to host HTTP listener on
service_address = ":8080"

## Network type to listen to default tcp
# if set to unix service_address will be interpreted as unix socket path
# network = "tcp"

## Permission for unix sockets (only available for network unix)
## This setting may not be respected by some platforms. To safely restrict
## permissions it is recommended to place the socket into a previously
## created directory with the desired permissions.
## ex: socket_mode = "777"
# socket_mode = ""

## Paths to listen to.
# paths = ["/telegraf"]

Expand Down
45 changes: 42 additions & 3 deletions plugins/inputs/http_listener_v2/http_listener_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@ import (
"crypto/tls"
_ "embed"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -47,6 +52,8 @@ type TimeFunc func() time.Time
// HTTPListenerV2 is an input plugin that collects external metrics sent via HTTP
type HTTPListenerV2 struct {
ServiceAddress string `toml:"service_address"`
Network string `toml:"network"`
SocketMode string `toml:"socket_mode"`
Path string `toml:"path" deprecated:"1.20.0;1.35.0;use 'paths' instead"`
Paths []string `toml:"paths"`
PathTag bool `toml:"path_tag"`
Expand Down Expand Up @@ -151,18 +158,49 @@ func (h *HTTPListenerV2) Init() error {
return err
}

switch h.Network {
case "tcp":
case "unix":
path := filepath.FromSlash(h.ServiceAddress)
if runtime.GOOS == "windows" && strings.Contains(path, ":") {
path = strings.TrimPrefix(path, `\`)
}
if err := os.Remove(path); err != nil && !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("removing socket failed: %w", err)
}
default:
return fmt.Errorf("unknown protocol %q", h.Network)
}

var listener net.Listener
if tlsConf != nil {
listener, err = tls.Listen("tcp", h.ServiceAddress, tlsConf)
listener, err = tls.Listen(h.Network, h.ServiceAddress, tlsConf)
} else {
listener, err = net.Listen("tcp", h.ServiceAddress)
listener, err = net.Listen(h.Network, h.ServiceAddress)
}
if err != nil {
return err
}
h.tlsConf = tlsConf
h.listener = listener
h.Port = listener.Addr().(*net.TCPAddr).Port

if h.Network == "tcp" {
h.Port = listener.Addr().(*net.TCPAddr).Port
}

if h.Network == "unix" && h.SocketMode != "" {
// Set permissions on socket
// Convert from octal in string to int
i, err := strconv.ParseUint(h.SocketMode, 8, 32)
if err != nil {
return fmt.Errorf("converting socket mode failed: %w", err)
}

perm := os.FileMode(uint32(i))
if err := os.Chmod(h.ServiceAddress, perm); err != nil {
return fmt.Errorf("changing socket permissions failed: %w", err)
}
}

if h.SuccessCode == 0 {
h.SuccessCode = http.StatusNoContent
Expand Down Expand Up @@ -375,6 +413,7 @@ func init() {
inputs.Add("http_listener_v2", func() telegraf.Input {
return &HTTPListenerV2{
ServiceAddress: ":8080",
Network: "tcp",
TimeFunc: time.Now,
Paths: []string{"/telegraf"},
Methods: []string{"POST", "PUT"},
Expand Down
49 changes: 42 additions & 7 deletions plugins/inputs/http_listener_v2/http_listener_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package http_listener_v2

import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"net"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -42,9 +44,7 @@ cpu_load_short,host=server06 value=12.0 1422568543702900257
basicPassword = "super-secure-password!"
)

var (
pki = testutil.NewPKI("../../../testutil/pki")
)
var pki = testutil.NewPKI("../../../testutil/pki")

func newTestHTTPListenerV2() (*HTTPListenerV2, error) {
parser := &influx.Parser{}
Expand All @@ -54,6 +54,7 @@ func newTestHTTPListenerV2() (*HTTPListenerV2, error) {

listener := &HTTPListenerV2{
Log: testutil.Logger{},
Network: "tcp",
ServiceAddress: "localhost:0",
Path: "/write",
Methods: []string{"POST"},
Expand Down Expand Up @@ -84,6 +85,7 @@ func newTestHTTPSListenerV2() (*HTTPListenerV2, error) {

listener := &HTTPListenerV2{
Log: testutil.Logger{},
Network: "tcp",
ServiceAddress: "localhost:0",
Path: "/write",
Methods: []string{"POST"},
Expand Down Expand Up @@ -124,6 +126,7 @@ func TestInvalidListenerConfig(t *testing.T) {

listener := &HTTPListenerV2{
Log: testutil.Logger{},
Network: "tcp",
ServiceAddress: "address_without_port",
Path: "/write",
Methods: []string{"POST"},
Expand Down Expand Up @@ -231,8 +234,10 @@ func TestWriteHTTP(t *testing.T) {
require.EqualValues(t, 204, resp.StatusCode)

acc.Wait(2)
hostTags := []string{"server02", "server03",
"server04", "server05", "server06"}
hostTags := []string{
"server02", "server03",
"server04", "server05", "server06",
}
for _, hostTag := range hostTags {
acc.AssertContainsTaggedFields(t, "cpu_load_short",
map[string]interface{}{"value": float64(12)},
Expand Down Expand Up @@ -359,6 +364,7 @@ func TestWriteHTTPExactMaxBodySize(t *testing.T) {

listener := &HTTPListenerV2{
Log: testutil.Logger{},
Network: "tcp",
ServiceAddress: "localhost:0",
Path: "/write",
Methods: []string{"POST"},
Expand All @@ -385,6 +391,7 @@ func TestWriteHTTPVerySmallMaxBody(t *testing.T) {

listener := &HTTPListenerV2{
Log: testutil.Logger{},
Network: "tcp",
ServiceAddress: "localhost:0",
Path: "/write",
Methods: []string{"POST"},
Expand Down Expand Up @@ -429,8 +436,10 @@ func TestWriteHTTPGzippedData(t *testing.T) {
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 204, resp.StatusCode)

hostTags := []string{"server02", "server03",
"server04", "server05", "server06"}
hostTags := []string{
"server02", "server03",
"server04", "server05", "server06",
}
acc.Wait(len(hostTags))
for _, hostTag := range hostTags {
acc.AssertContainsTaggedFields(t, "cpu_load_short",
Expand Down Expand Up @@ -724,6 +733,32 @@ func TestServerHeaders(t *testing.T) {
require.Equal(t, "value", resp.Header.Get("key"))
}

func TestUnixSocket(t *testing.T) {
listener, err := newTestHTTPListenerV2()
require.NoError(t, err)
listener.Network = "unix"
file, err := os.CreateTemp("", "*.socket")
require.NoError(t, err)
defer os.Remove(file.Name())
socketName := file.Name()
listener.ServiceAddress = socketName
listener.SocketMode = "777"
acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
httpc := http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", socketName)
},
},
}
resp, err := httpc.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBufferString(testMsg))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 204, resp.StatusCode)
}

func mustReadHugeMetric() []byte {
filePath := "testdata/huge_metric"
data, err := os.ReadFile(filePath)
Expand Down
11 changes: 11 additions & 0 deletions plugins/inputs/http_listener_v2/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,17 @@
## Address and port to host HTTP listener on
service_address = ":8080"

## Network type to listen to default tcp
# if set to unix service_address will be interpreted as unix socket path
# network = "tcp"

## Permission for unix sockets (only available for network unix)
## This setting may not be respected by some platforms. To safely restrict
## permissions it is recommended to place the socket into a previously
## created directory with the desired permissions.
## ex: socket_mode = "777"
# socket_mode = ""

## Paths to listen to.
# paths = ["/telegraf"]

Expand Down

0 comments on commit 06bfdea

Please sign in to comment.