Skip to content

Commit 6dab14c

Browse files
authored
[Elastic Agent] Fix service application stop timeout issue (#20256)
* Add tests. * Fix test. * Add CHANGELOG.
1 parent e278101 commit 6dab14c

File tree

10 files changed

+280
-24
lines changed

10 files changed

+280
-24
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ x-pack/dockerlogbeat/temproot.tar
3030
x-pack/elastic-agent/elastic_agent
3131
x-pack/elastic-agent/fleet.yml
3232
x-pack/elastic-agent/pkg/agent/operation/tests/scripts/configurable-1.0-darwin-x86_64/configurable
33+
x-pack/elastic-agent/pkg/agent/operation/tests/scripts/serviceable-1.0-darwin-x86_64/serviceable
3334

3435
# Editor swap files
3536
*.swp

x-pack/elastic-agent/CHANGELOG.asciidoc

+1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
- Improve GRPC stop to be more relaxed {pull}20118[20118]
5757
- Prevent closing closed reader {pull}20214[20214]
5858
- Fix Windows service installation script {pull}20203[20203]
59+
- Fix timeout issue stopping service applications {pull}20256[20256]
5960

6061
==== New features
6162

x-pack/elastic-agent/magefile.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -167,12 +167,15 @@ func (Build) Clean() {
167167
func (Build) TestBinaries() error {
168168
p := filepath.Join("pkg", "agent", "operation", "tests", "scripts")
169169

170-
binaryName := "configurable"
170+
configurableName := "configurable"
171+
serviceableName := "serviceable"
171172
if runtime.GOOS == "windows" {
172-
binaryName += ".exe"
173+
configurableName += ".exe"
174+
serviceableName += ".exe"
173175
}
174176
return combineErr(
175-
RunGo("build", "-o", filepath.Join(p, "configurable-1.0-darwin-x86_64", binaryName), filepath.Join(p, "configurable-1.0-darwin-x86_64", "main.go")),
177+
RunGo("build", "-o", filepath.Join(p, "configurable-1.0-darwin-x86_64", configurableName), filepath.Join(p, "configurable-1.0-darwin-x86_64", "main.go")),
178+
RunGo("build", "-o", filepath.Join(p, "serviceable-1.0-darwin-x86_64", serviceableName), filepath.Join(p, "serviceable-1.0-darwin-x86_64", "main.go")),
176179
)
177180
}
178181

x-pack/elastic-agent/pkg/agent/operation/common_test.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -94,15 +94,13 @@ func getLogger() *logger.Logger {
9494
}
9595

9696
func getProgram(binary, version string) *app.Descriptor {
97+
spec := program.SupportedMap[binary]
9798
downloadCfg := &artifact.Config{
9899
InstallPath: installPath,
99100
OperatingSystem: "darwin",
100101
Architecture: "32",
101102
}
102-
return app.NewDescriptor(program.Spec{
103-
Name: binary,
104-
Cmd: binary,
105-
}, version, downloadCfg, nil)
103+
return app.NewDescriptor(spec, version, downloadCfg, nil)
106104
}
107105

108106
func getAbsPath(path string) string {

x-pack/elastic-agent/pkg/agent/operation/operator_test.go

+109-8
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ package operation
77
import (
88
"fmt"
99
"math/rand"
10+
"net"
1011
"os"
12+
"os/exec"
1113
"path/filepath"
1214
"runtime"
1315
"testing"
@@ -27,17 +29,26 @@ func TestMain(m *testing.M) {
2729
Cmd: "configurable",
2830
Args: []string{},
2931
}
32+
port, err := getFreePort()
33+
if err != nil {
34+
panic(err)
35+
}
36+
serviceSpec := program.Spec{
37+
ServicePort: port,
38+
Name: "serviceable",
39+
Cmd: "serviceable",
40+
Args: []string{fmt.Sprintf("%d", port)},
41+
}
3042

31-
program.Supported = append(program.Supported, configurableSpec)
43+
program.Supported = append(program.Supported, configurableSpec, serviceSpec)
44+
program.SupportedMap["configurable"] = configurableSpec
45+
program.SupportedMap["serviceable"] = serviceSpec
3246

33-
p := getProgram("configurable", "1.0")
34-
spec := p.Spec()
35-
path := spec.BinaryPath
36-
if runtime.GOOS == "windows" {
37-
path += ".exe"
47+
if err := isAvailable("configurable", "1.0"); err != nil {
48+
panic(err)
3849
}
39-
if s, err := os.Stat(path); err != nil || s == nil {
40-
panic(fmt.Errorf("binary not available %s", spec.BinaryPath))
50+
if err := isAvailable("serviceable", "1.0"); err != nil {
51+
panic(err)
4152
}
4253

4354
os.Exit(m.Run())
@@ -366,3 +377,93 @@ func TestConfigurableStartStop(t *testing.T) {
366377
})
367378
}
368379
}
380+
381+
func TestConfigurableService(t *testing.T) {
382+
p := getProgram("serviceable", "1.0")
383+
384+
operator := getTestOperator(t, downloadPath, installPath, p)
385+
if err := operator.start(p, nil); err != nil {
386+
t.Fatal(err)
387+
}
388+
defer operator.stop(p) // failure catch, to ensure no sub-process stays running
389+
390+
// emulating a service, so we need to start the binary here in the test
391+
spec := p.Spec()
392+
cmd := exec.Command(spec.BinaryPath, fmt.Sprintf("%d", p.ServicePort()))
393+
cmd.Env = append(cmd.Env, os.Environ()...)
394+
cmd.Dir = filepath.Dir(spec.BinaryPath)
395+
cmd.Stdout = os.Stdout
396+
cmd.Stderr = os.Stderr
397+
if err := cmd.Start(); err != nil {
398+
t.Fatal(err)
399+
}
400+
401+
waitFor(t, func() error {
402+
items := operator.State()
403+
item, ok := items[p.ID()]
404+
if !ok {
405+
return fmt.Errorf("no state for process")
406+
}
407+
if item.Status != state.Running {
408+
return fmt.Errorf("process never went to running")
409+
}
410+
return nil
411+
})
412+
413+
// try to configure
414+
cfg := make(map[string]interface{})
415+
tstFilePath := filepath.Join(os.TempDir(), fmt.Sprintf("tmp%d", rand.Uint32()))
416+
cfg["TestFile"] = tstFilePath
417+
if err := operator.pushConfig(p, cfg); err != nil {
418+
t.Fatalf("failed to config: %v", err)
419+
}
420+
421+
waitFor(t, func() error {
422+
if s, err := os.Stat(tstFilePath); err != nil || s == nil {
423+
return fmt.Errorf("failed to create a file using Config call %s", tstFilePath)
424+
}
425+
return nil
426+
})
427+
428+
items := operator.State()
429+
item0, ok := items[p.ID()]
430+
if !ok || item0.Status != state.Running {
431+
t.Fatalf("Process no longer running after config %#v", items)
432+
}
433+
434+
// stop the process
435+
if err := operator.stop(p); err != nil {
436+
t.Fatalf("Failed to stop service: %v", err)
437+
}
438+
439+
if err := cmd.Wait(); err != nil {
440+
t.Fatalf("Process failed: %v", err)
441+
}
442+
}
443+
444+
func isAvailable(name, version string) error {
445+
p := getProgram(name, version)
446+
spec := p.Spec()
447+
path := spec.BinaryPath
448+
if runtime.GOOS == "windows" {
449+
path += ".exe"
450+
}
451+
if s, err := os.Stat(path); err != nil || s == nil {
452+
return fmt.Errorf("binary not available %s", spec.BinaryPath)
453+
}
454+
return nil
455+
}
456+
457+
// getFreePort finds a free port.
458+
func getFreePort() (int, error) {
459+
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
460+
if err != nil {
461+
return 0, err
462+
}
463+
l, err := net.ListenTCP("tcp", addr)
464+
if err != nil {
465+
return 0, err
466+
}
467+
defer l.Close()
468+
return l.Addr().(*net.TCPAddr).Port, nil
469+
}
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
Testing program emulating tool which is configurable using GRPC communication channel.
1+
Testing program emulating tool which is configurable using GRPC communication channel when running as a sub-process.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Testing program emulating tool which is configurable using GRPC communication channel when running as an external service.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package main
6+
7+
import (
8+
"context"
9+
"crypto/tls"
10+
"crypto/x509"
11+
"fmt"
12+
"net"
13+
"os"
14+
"path/filepath"
15+
"strconv"
16+
17+
protobuf "github.com/golang/protobuf/proto"
18+
"google.golang.org/grpc"
19+
"google.golang.org/grpc/credentials"
20+
"gopkg.in/yaml.v2"
21+
22+
"github.com/elastic/elastic-agent-client/v7/pkg/client"
23+
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
24+
)
25+
26+
func main() {
27+
srvPort, err := strconv.Atoi(os.Args[1])
28+
if err != nil {
29+
panic(err)
30+
}
31+
f, _ := os.OpenFile(filepath.Join(os.TempDir(), "testing.out"), os.O_APPEND|os.O_CREATE|os.O_RDWR, 0666)
32+
f.WriteString("starting \n")
33+
ctx, cancel := context.WithCancel(context.Background())
34+
s := &configServer{
35+
f: f,
36+
ctx: ctx,
37+
cancel: cancel,
38+
}
39+
f.WriteString(fmt.Sprintf("reading creds from port: %d\n", srvPort))
40+
client, err := clientFromNet(srvPort, s)
41+
if err != nil {
42+
f.WriteString(err.Error())
43+
panic(err)
44+
}
45+
s.client = client
46+
err = client.Start(ctx)
47+
if err != nil {
48+
f.WriteString(err.Error())
49+
panic(err)
50+
}
51+
<-ctx.Done()
52+
f.WriteString("finished \n")
53+
}
54+
55+
type configServer struct {
56+
f *os.File
57+
ctx context.Context
58+
cancel context.CancelFunc
59+
client client.Client
60+
}
61+
62+
func (s *configServer) OnConfig(cfgString string) {
63+
s.client.Status(proto.StateObserved_CONFIGURING, "Writing config file", nil)
64+
65+
testCfg := &TestConfig{}
66+
if err := yaml.Unmarshal([]byte(cfgString), &testCfg); err != nil {
67+
s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to unmarshall config: %s", err), nil)
68+
return
69+
}
70+
71+
if testCfg.TestFile != "" {
72+
tf, err := os.Create(testCfg.TestFile)
73+
if err != nil {
74+
s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to create file %s: %s", testCfg.TestFile, err), nil)
75+
return
76+
}
77+
78+
err = tf.Close()
79+
if err != nil {
80+
s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to close file %s: %s", testCfg.TestFile, err), nil)
81+
return
82+
}
83+
}
84+
85+
s.client.Status(proto.StateObserved_HEALTHY, "Running", map[string]interface{}{
86+
"status": proto.StateObserved_HEALTHY,
87+
"message": "Running",
88+
})
89+
}
90+
91+
func (s *configServer) OnStop() {
92+
s.client.Status(proto.StateObserved_STOPPING, "Stopping", nil)
93+
s.cancel()
94+
}
95+
96+
func (s *configServer) OnError(err error) {
97+
s.f.WriteString(err.Error())
98+
}
99+
100+
// TestConfig is a configuration for testing Config calls
101+
type TestConfig struct {
102+
TestFile string `config:"TestFile" yaml:"TestFile"`
103+
}
104+
105+
func getCreds(port int) (*proto.ConnInfo, error) {
106+
c, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", port))
107+
if err != nil {
108+
return nil, err
109+
}
110+
defer c.Close()
111+
buf := make([]byte, 1024*1024)
112+
n, err := c.Read(buf)
113+
if err != nil {
114+
return nil, err
115+
}
116+
var connInfo proto.ConnInfo
117+
err = protobuf.Unmarshal(buf[:n], &connInfo)
118+
if err != nil {
119+
return nil, err
120+
}
121+
return &connInfo, nil
122+
}
123+
124+
func clientFromNet(port int, impl client.StateInterface, actions ...client.Action) (client.Client, error) {
125+
connInfo, err := getCreds(port)
126+
if err != nil {
127+
return nil, err
128+
}
129+
cert, err := tls.X509KeyPair(connInfo.PeerCert, connInfo.PeerKey)
130+
if err != nil {
131+
return nil, err
132+
}
133+
caCertPool := x509.NewCertPool()
134+
caCertPool.AppendCertsFromPEM(connInfo.CaCert)
135+
trans := credentials.NewTLS(&tls.Config{
136+
ServerName: connInfo.ServerName,
137+
Certificates: []tls.Certificate{cert},
138+
RootCAs: caCertPool,
139+
})
140+
return client.New(connInfo.Addr, connInfo.Token, impl, actions, grpc.WithTransportCredentials(trans)), nil
141+
}

x-pack/elastic-agent/pkg/core/plugin/process/app.go

+11-5
Original file line numberDiff line numberDiff line change
@@ -123,20 +123,26 @@ func (a *Application) Started() bool {
123123
// Stop stops the current application.
124124
func (a *Application) Stop() {
125125
a.appLock.Lock()
126-
defer a.appLock.Unlock()
126+
status := a.state.Status
127+
srvState := a.srvState
128+
a.appLock.Unlock()
127129

128-
if a.state.Status == state.Stopped {
130+
if status == state.Stopped {
129131
return
130132
}
131133

132134
stopSig := os.Interrupt
133-
if a.srvState != nil {
134-
if err := a.srvState.Stop(a.processConfig.StopTimeout); err != nil {
135+
if srvState != nil {
136+
if err := srvState.Stop(a.processConfig.StopTimeout); err != nil {
135137
// kill the process if stop through GRPC doesn't work
136138
stopSig = os.Kill
137139
}
138-
a.srvState = nil
139140
}
141+
142+
a.appLock.Lock()
143+
defer a.appLock.Unlock()
144+
145+
a.srvState = nil
140146
if a.state.ProcessInfo != nil {
141147
if err := a.state.ProcessInfo.Process.Signal(stopSig); err == nil {
142148
// no error on signal, so wait for it to stop

x-pack/elastic-agent/pkg/core/plugin/service/app.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -220,21 +220,25 @@ func (a *Application) Configure(_ context.Context, config map[string]interface{}
220220
// Stop stops the current application.
221221
func (a *Application) Stop() {
222222
a.appLock.Lock()
223-
defer a.appLock.Unlock()
223+
srvState := a.srvState
224+
a.appLock.Unlock()
224225

225-
if a.srvState == nil {
226+
if srvState == nil {
226227
return
227228
}
228229

229-
if err := a.srvState.Stop(a.processConfig.StopTimeout); err != nil {
230+
if err := srvState.Stop(a.processConfig.StopTimeout); err != nil {
231+
a.appLock.Lock()
230232
a.setState(state.Failed, errors.New(err, "Failed to stopped").Error(), nil)
231233
} else {
234+
a.appLock.Lock()
232235
a.setState(state.Stopped, "Stopped", nil)
233236
}
234237
a.srvState = nil
235238

236239
a.cleanUp()
237240
a.stopCredsListener()
241+
a.appLock.Unlock()
238242
}
239243

240244
// Shutdown disconnects the service, but doesn't signal it to stop.

0 commit comments

Comments
 (0)