Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow globs in FPM unix socket paths #7089

Merged
merged 7 commits into from
Feb 28, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions plugins/inputs/phpfpm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Get phpfpm stats using either HTTP status page or fpm socket.
## "/var/run/php5-fpm.sock"
## or using a custom fpm status path:
## "/var/run/php5-fpm.sock:fpm-custom-status-path"
## glob patterns are also supported:
## "/var/run/php*.sock"
##
## - fcgi: the URL must start with fcgi:// or cgi://, and port must be present, ie:
## "fcgi://10.0.0.12:9000/status"
Expand Down
80 changes: 72 additions & 8 deletions plugins/inputs/phpfpm/phpfpm.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/pkg/errors"
)

const (
Expand Down Expand Up @@ -95,7 +97,12 @@ func (g *phpfpm) Gather(acc telegraf.Accumulator) error {

var wg sync.WaitGroup

for _, serv := range g.Urls {
urls, err := expandUrls(g.Urls)
if err != nil {
return err
}

for _, serv := range urls {
wg.Add(1)
go func(serv string) {
defer wg.Done()
Expand Down Expand Up @@ -153,15 +160,10 @@ func (g *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error {
statusPath = "status"
}
} else {
socketAddr := strings.Split(addr, ":")
if len(socketAddr) >= 2 {
socketPath = socketAddr[0]
statusPath = socketAddr[1]
} else {
socketPath = socketAddr[0]
socketPath, statusPath = unixSocketPaths(addr)
if statusPath == "" {
statusPath = "status"
}

if _, err := os.Stat(socketPath); os.IsNotExist(err) {
return fmt.Errorf("Socket doesn't exist '%s': %s", socketPath, err)
}
Expand Down Expand Up @@ -277,6 +279,68 @@ func importMetric(r io.Reader, acc telegraf.Accumulator, addr string) (poolStat,
return stats, nil
}

func expandUrls(urls []string) ([]string, error) {
addrs := make([]string, 0, len(urls))
for _, url := range urls {
if isNetworkURL(url) {
addrs = append(addrs, url)
continue
}
paths, err := globUnixSocket(url)
if err != nil {
return nil, err
}
addrs = append(addrs, paths...)
}
return addrs, nil
}

func globUnixSocket(url string) ([]string, error) {
pattern, status := unixSocketPaths(url)
paths, err := filepath.Glob(pattern)
danielnelson marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, errors.Wrapf(err, "couldn't read the file path pattern %q", pattern)
}

addrs := make([]string, 0, len(paths))

if len(paths) == 0 {
_, err := os.Stat(pattern)
if os.IsNotExist(err) {
return nil, fmt.Errorf("Socket doesn't exist '%s': %s", pattern, err)
}
return nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand this correctly, would this check for files that contain '*' characters? I feel like this is probably not needed, this seems very edge case, but either way we should consider if it should be an error to not match a socket. In the other plugins using globs, not having matches is generally not an error but this decision it can make it difficult to debug. If there are situations where it is normal to not match a socket, then it might be better to ignore the error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it this way to keep the previous behavior, which is to error out on nonexistent socket paths. Since filepath.Glob ignores any errors other than syntax ones, I've used os.Stat to get the appropriate error. The special handling of os.ErrNotExist is there because it's also done elsewhere in the plugin.

If you think we consider that not matching any sockets is not an error, this code would indeed be a bit cleaner.

Copy link
Contributor Author

@andrenth andrenth Feb 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One example of where this handling is useful is when the unix socket path is in fact not a glob, and the file does not exist. In that case, we'd get an empty slice of strings as a result from the glob, with no error, so this extra checking restores the error message one would get before glob support.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, let's continue to show an error when the socket doesn't exist. I think we should then remove the Stat call on L166, which doesn't seem useful anymore and anyway we will report an error afterwards connecting to the socket if it doesn't exist.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the globpath module I noticed Match actually returns the given parameter when it's a static path, so the latest commit handles this case correctly now. I've also removed the redundant os.Stat call.

If the PR is OK now, do you want me to rebase before merging?

}

for _, path := range paths {
if status != "" {
status = fmt.Sprintf(":%s", status)
}
addrs = append(addrs, fmt.Sprintf("%s%s", path, status))
}

return addrs, nil
}

func unixSocketPaths(addr string) (string, string) {
var socketPath, statusPath string

socketAddr := strings.Split(addr, ":")
if len(socketAddr) >= 2 {
socketPath = socketAddr[0]
statusPath = socketAddr[1]
} else {
socketPath = socketAddr[0]
statusPath = ""
}

return socketPath, statusPath
}

func isNetworkURL(addr string) bool {
return strings.HasPrefix(addr, "http://") || strings.HasPrefix(addr, "https://") || strings.HasPrefix(addr, "fcgi://") || strings.HasPrefix(addr, "cgi://")
}

func init() {
inputs.Add("phpfpm", func() telegraf.Input {
return &phpfpm{}
Expand Down
65 changes: 65 additions & 0 deletions plugins/inputs/phpfpm/phpfpm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,71 @@ func TestPhpFpmGeneratesMetrics_From_Socket(t *testing.T) {
acc.AssertContainsTaggedFields(t, "phpfpm", fields, tags)
}

func TestPhpFpmGeneratesMetrics_From_Multiple_Sockets_With_Glob(t *testing.T) {
// Create a socket in /tmp because we always have write permission and if the
// removing of socket fail when system restart /tmp is clear so
// we don't have junk files around
var randomNumber int64
binary.Read(rand.Reader, binary.LittleEndian, &randomNumber)
socket1 := fmt.Sprintf("/tmp/test-fpm%d.sock", randomNumber)
tcp1, err := net.Listen("unix", socket1)
if err != nil {
t.Fatal("Cannot initialize server on port ")
}
defer tcp1.Close()

binary.Read(rand.Reader, binary.LittleEndian, &randomNumber)
socket2 := fmt.Sprintf("/tmp/test-fpm%d.sock", randomNumber)
tcp2, err := net.Listen("unix", socket2)
if err != nil {
t.Fatal("Cannot initialize server on port ")
}
defer tcp2.Close()

s := statServer{}
go fcgi.Serve(tcp1, s)
go fcgi.Serve(tcp2, s)

r := &phpfpm{
Urls: []string{"/tmp/test-fpm[\\-0-9]*.sock"},
}

var acc1, acc2 testutil.Accumulator

err = acc1.GatherError(r.Gather)
require.NoError(t, err)

err = acc2.GatherError(r.Gather)
require.NoError(t, err)

tags1 := map[string]string{
"pool": "www",
"url": socket1,
}

tags2 := map[string]string{
"pool": "www",
"url": socket2,
}

fields := map[string]interface{}{
"start_since": int64(1991),
"accepted_conn": int64(3),
"listen_queue": int64(1),
"max_listen_queue": int64(0),
"listen_queue_len": int64(0),
"idle_processes": int64(1),
"active_processes": int64(1),
"total_processes": int64(2),
"max_active_processes": int64(1),
"max_children_reached": int64(2),
"slow_requests": int64(1),
}

acc1.AssertContainsTaggedFields(t, "phpfpm", fields, tags1)
acc2.AssertContainsTaggedFields(t, "phpfpm", fields, tags2)
}

func TestPhpFpmGeneratesMetrics_From_Socket_Custom_Status_Path(t *testing.T) {
// Create a socket in /tmp because we always have write permission. If the
// removing of socket fail we won't have junk files around. Cuz when system
Expand Down