Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inputs.http_listener_v2): Add unix socket mode #15764

Merged
merged 13 commits into from
Sep 4, 2024
13 changes: 12 additions & 1 deletion plugins/inputs/http_listener_v2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,19 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
```toml @sample.conf
# Generic HTTP write listener
[[inputs.http_listener_v2]]
## Address and port to host HTTP listener on
## Address to host HTTP listener on
# can be prefixed by protocol tcp, or unix if not provided defaults to tcp
# if unix network type provided it should be followed by absolute path for unix socket
service_address = ":8080"
# service_address = "tcp://:8443"
# service_address = "unix:///tmp/telegraf.sock"

## Permission for unix sockets (only available for unix sockets)
## This setting may not be respected by some platforms. To safely restrict
## permissions it is recommended to place the socket into a previously
## created directory with the desired permissions.
## ex: socket_mode = "777"
# socket_mode = ""

## Paths to listen to.
# paths = ["/telegraf"]
Expand Down
63 changes: 60 additions & 3 deletions plugins/inputs/http_listener_v2/http_listener_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,16 @@ import (
"crypto/tls"
_ "embed"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"regexp"
"runtime"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -47,6 +53,7 @@ type TimeFunc func() time.Time
// HTTPListenerV2 is an input plugin that collects external metrics sent via HTTP
type HTTPListenerV2 struct {
ServiceAddress string `toml:"service_address"`
SocketMode string `toml:"socket_mode"`
Path string `toml:"path" deprecated:"1.20.0;1.35.0;use 'paths' instead"`
Paths []string `toml:"paths"`
PathTag bool `toml:"path_tag"`
Expand Down Expand Up @@ -151,18 +158,68 @@ func (h *HTTPListenerV2) Init() error {
return err
}

protoRegex := regexp.MustCompile(`\w://`)
if !protoRegex.MatchString(h.ServiceAddress) {
h.ServiceAddress = "tcp://" + h.ServiceAddress
}

var u *url.URL
// Parse and check the address
if runtime.GOOS == "windows" && strings.HasPrefix(h.ServiceAddress, "unix://") {
u = &url.URL{Scheme: "unix", Path: strings.TrimPrefix(h.ServiceAddress, "unix://")}
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really necessary? Parsing a unix URL should also work on Windows, shouldn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it was problematic as sample address will look like unix://c:/Users/bazko1/temp/telegraf.socket the url.Parse starts to interpret the part after c: as port and it results in an parse error.

You can check that in socket_listener tests for windows are skipped with comment that the unixgram is not supported even though the unix socket is.
See: https://github.com/influxdata/telegraf/blob/master/plugins/inputs/socket_listener/socket_listener_test.go#L125-L128

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the correct way is to use unix:///C:/Users/bazko1/... (i.e. three slashes) according to golang/go@844b625...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes great I tested it and this works I will remove the windows case then.

u, err = url.Parse(h.ServiceAddress)
if err != nil {
return fmt.Errorf("parsing address failed: %w", err)
}
}

address := u.Host

switch u.Scheme {
case "tcp":
case "unix":
path := filepath.FromSlash(u.Path)
if runtime.GOOS == "windows" && strings.Contains(path, ":") {
path = strings.TrimPrefix(path, `\`)
}
if err := os.Remove(path); err != nil && !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("removing socket failed: %w", err)
}
address = path
default:
return fmt.Errorf("unknown protocol %q", u.Scheme)
}

var listener net.Listener
if tlsConf != nil {
listener, err = tls.Listen("tcp", h.ServiceAddress, tlsConf)
listener, err = tls.Listen(u.Scheme, address, tlsConf)
} else {
listener, err = net.Listen("tcp", h.ServiceAddress)
listener, err = net.Listen(u.Scheme, address)
}
if err != nil {
return err
}
h.tlsConf = tlsConf
h.listener = listener
h.Port = listener.Addr().(*net.TCPAddr).Port

if u.Scheme == "tcp" {
h.Port = listener.Addr().(*net.TCPAddr).Port
}

if u.Scheme == "unix" && h.SocketMode != "" {
// Set permissions on socket
// Convert from octal in string to int
i, err := strconv.ParseUint(h.SocketMode, 8, 32)
if err != nil {
return fmt.Errorf("converting socket mode failed: %w", err)
}

perm := os.FileMode(uint32(i))
if err := os.Chmod(address, perm); err != nil {
return fmt.Errorf("changing socket permissions failed: %w", err)
}
}

if h.SuccessCode == 0 {
h.SuccessCode = http.StatusNoContent
Expand Down
66 changes: 66 additions & 0 deletions plugins/inputs/http_listener_v2/http_listener_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package http_listener_v2

import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"runtime"
"strconv"
"sync"
Expand Down Expand Up @@ -724,6 +727,69 @@ func TestServerHeaders(t *testing.T) {
require.Equal(t, "value", resp.Header.Get("key"))
}

func TestUnixSocket(t *testing.T) {
listener, err := newTestHTTPListenerV2()
require.NoError(t, err)
file, err := os.CreateTemp("", "*.socket")
require.NoError(t, err)
require.NoError(t, file.Close())
defer os.Remove(file.Name())
socketName := file.Name()
listener.ServiceAddress = "unix://" + socketName
listener.SocketMode = "777"
acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()
httpc := http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", socketName)
},
},
}
resp, err := httpc.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBufferString(testMsg))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 204, resp.StatusCode)
}

func TestServiceAddressURL(t *testing.T) {
unixSocket := filepath.FromSlash(os.TempDir() + "/test.sock")
cases := []struct {
serviceAddress, expectedAddress string
expectedPort int
shouldError bool
}{
{":8080", "[::]:8080", 8080, false},
{"localhost:4123", "127.0.0.1:4123", 4123, false},
{"tcp://localhost:4321", "127.0.0.1:4321", 4321, false},
{"127.0.0.1:9443", "127.0.0.1:9443", 9443, false},
{"tcp://127.0.0.1:8443", "127.0.0.1:8443", 8443, false},
{"tcp://:8443", "[::]:8443", 8443, false},
// port not provided
{"8.8.8.8", "", 0, true},
{"unix://" + unixSocket, unixSocket, 0, false},
// wrong protocol
{"notexistent:///tmp/test.sock", "", 0, true},
}
for _, c := range cases {
listener, err := newTestHTTPListenerV2()
require.NoError(t, err)
listener.ServiceAddress = c.serviceAddress
err = listener.Init()
require.Equal(t, c.shouldError, err != nil, "Init returned wrong error result error is: %q", err)
if c.shouldError {
continue
}
acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
require.Equal(t, c.expectedAddress, listener.listener.Addr().String())
require.Equal(t, c.expectedPort, listener.Port)
listener.Stop()
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is problematic in CI environments. Imagine another test using the same port, if they by chance run at the same time one of them will fail and we get flaky tests. I would remove the test here or solely stick to the URL parsing part without actually "starting" the listener...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree such test might be problematic when running in parallel with others, but afaik they are run synchronously and there are also other tests that actually, start listening on port.
Anyway my logic should be tested enough without Start as it is located in Init function so I will remove the Start method call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well actually the Init already calls the net.Listen so that the port gets busy and we need to stop the internal dial.Listener.

I think my changes are big enough especially with special windows case that having unit test for url parsing would be nice so please reconsider the idea of fully removing test, but if you insist I will remove.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well starting the listener in Init() is also wrong, this should be done in Start()... Anyhow, as you didn't introduce this, I don't blame you. ;-)

Regarding the tests: Yes, the tests in this file/package are executes sequentially, but all packages are executed in parallel (in the best case)... So if e.g. outputs.http opens a server on this port we run into trouble... So it is acceptable to specify no port or port zero as this will allow the OS to choose a free port but the above will definitively bite us. Been there suffered that.

In any case, what you do is to test if url.Parse works I think and I guess we should assume this works at least as long as this test will open a fixed port... So either change the test/code to not starting listen or remove the fixed ports.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay lets make it right then I agree that the test can be actually removed especially when windows unix path works fine with url.Parse.
I will move the .Listen part to Start then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 45792b5

func mustReadHugeMetric() []byte {
filePath := "testdata/huge_metric"
data, err := os.ReadFile(filePath)
Expand Down
13 changes: 12 additions & 1 deletion plugins/inputs/http_listener_v2/sample.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
# Generic HTTP write listener
[[inputs.http_listener_v2]]
## Address and port to host HTTP listener on
## Address to host HTTP listener on
# can be prefixed by protocol tcp, or unix if not provided defaults to tcp
# if unix network type provided it should be followed by absolute path for unix socket
service_address = ":8080"
# service_address = "tcp://:8443"
# service_address = "unix:///tmp/telegraf.sock"

## Permission for unix sockets (only available for unix sockets)
## This setting may not be respected by some platforms. To safely restrict
## permissions it is recommended to place the socket into a previously
## created directory with the desired permissions.
## ex: socket_mode = "777"
# socket_mode = ""

## Paths to listen to.
# paths = ["/telegraf"]
Expand Down
Loading