Skip to content

Commit 632dd18

Browse files
authored
libbeat/publisher/pipeline: fix Client.Close (elastic#20124)
Set the "closing" variable to true, not false, upon signalling the waiter to close. (cherry picked from commit e7b42d8)
1 parent d2277b1 commit 632dd18

File tree

2 files changed

+93
-1
lines changed

2 files changed

+93
-1
lines changed

libbeat/publisher/pipeline/client.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ func (w *clientCloseWaiter) signalClose() {
271271
return
272272
}
273273

274-
w.closing.Store(false)
274+
w.closing.Store(true)
275275
if w.events.Load() == 0 {
276276
w.finishClose()
277277
return

libbeat/publisher/pipeline/client_test.go

+92
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,14 @@ import (
2121
"context"
2222
"sync"
2323
"testing"
24+
"time"
2425

2526
"github.com/elastic/beats/v7/libbeat/beat"
2627
"github.com/elastic/beats/v7/libbeat/logp"
2728
"github.com/elastic/beats/v7/libbeat/outputs"
29+
"github.com/elastic/beats/v7/libbeat/publisher"
2830
"github.com/elastic/beats/v7/libbeat/publisher/queue"
31+
"github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue"
2932
"github.com/elastic/beats/v7/libbeat/tests/resources"
3033
)
3134

@@ -113,3 +116,92 @@ func TestClient(t *testing.T) {
113116
}
114117
})
115118
}
119+
120+
func TestClientWaitClose(t *testing.T) {
121+
routinesChecker := resources.NewGoroutinesChecker()
122+
defer routinesChecker.Check(t)
123+
124+
makePipeline := func(settings Settings, qu queue.Queue) *Pipeline {
125+
p, err := New(beat.Info{},
126+
Monitors{},
127+
func(queue.ACKListener) (queue.Queue, error) { return qu, nil },
128+
outputs.Group{},
129+
settings,
130+
)
131+
if err != nil {
132+
panic(err)
133+
}
134+
135+
return p
136+
}
137+
if testing.Verbose() {
138+
logp.TestingSetup()
139+
}
140+
141+
q := memqueue.NewQueue(logp.L(), memqueue.Settings{Events: 1})
142+
pipeline := makePipeline(Settings{}, q)
143+
defer pipeline.Close()
144+
145+
t.Run("WaitClose blocks", func(t *testing.T) {
146+
client, err := pipeline.ConnectWith(beat.ClientConfig{
147+
WaitClose: 500 * time.Millisecond,
148+
})
149+
if err != nil {
150+
t.Fatal(err)
151+
}
152+
defer client.Close()
153+
154+
// Send an event which never gets acknowledged.
155+
client.Publish(beat.Event{})
156+
157+
closed := make(chan struct{})
158+
go func() {
159+
defer close(closed)
160+
client.Close()
161+
}()
162+
163+
select {
164+
case <-closed:
165+
t.Fatal("expected Close to wait for event acknowledgement")
166+
case <-time.After(100 * time.Millisecond):
167+
}
168+
169+
select {
170+
case <-closed:
171+
case <-time.After(10 * time.Second):
172+
t.Fatal("expected Close to stop waiting after WaitClose elapses")
173+
}
174+
})
175+
176+
t.Run("ACKing events unblocks WaitClose", func(t *testing.T) {
177+
client, err := pipeline.ConnectWith(beat.ClientConfig{
178+
WaitClose: time.Minute,
179+
})
180+
if err != nil {
181+
t.Fatal(err)
182+
}
183+
defer client.Close()
184+
185+
// Send an event which gets acknowledged immediately.
186+
client.Publish(beat.Event{})
187+
output := newMockClient(func(batch publisher.Batch) error {
188+
batch.ACK()
189+
return nil
190+
})
191+
defer output.Close()
192+
pipeline.output.Set(outputs.Group{Clients: []outputs.Client{output}})
193+
defer pipeline.output.Set(outputs.Group{})
194+
195+
closed := make(chan struct{})
196+
go func() {
197+
defer close(closed)
198+
client.Close()
199+
}()
200+
201+
select {
202+
case <-closed:
203+
case <-time.After(10 * time.Second):
204+
t.Fatal("expected Close to stop waiting after event acknowledgement")
205+
}
206+
})
207+
}

0 commit comments

Comments
 (0)