From af4c4673835e27c3a58a4fa39fae90875cfcccb4 Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Mon, 6 Jul 2020 19:43:31 +0200 Subject: [PATCH] Composable ACKer (#19632) This change replaces the ACK handler functions with a single interface that makes it easier to combine ACK handlers. The global ACK handler is removed from the pipeline, requiring Beats to wrap and compose per input ACK handlers with their own ones. Although the PR is quite big, the main difference is that the `ACKCount`, `ACKEvents`, and `ACKLastEvents` handlers have been replaced by a single interface (`beat.ACKer`). The original ACKer implementations from `libbeat/publisher/pipeline/acker.go` and `libbeat/publisher/pipeline/client_acker.go` have been moved `libbeat/common/acker`. The former private implementation is now exposed as Helpers for writing and combining ACK handlers. Support for global ACK handlers has been removed. The `acker.Combine` and `acker.ConnectionOnly` are the only new additions to the code base. The global ACK handler support was introduced for filebeat, that did require some support for combine events from multiple inputs before applying state updates. With the introduction of the v2 input API this requirement will go away, as per input type managers are responsible for handling state update and ACKs. In order to run old and new architecture in parallel, we need to combine ACK handling from input managers, existing input, custom registrar ACKer in filebeat, and event counting support (also via ACK handling) for shutdown. Exposing the interface and providing combinators (acker.Combine) for merging ACK handlers into one helps with the integration. The v2 Input API gives implementors more flexibility in how to handle event publishing, coordination, and state handling shall be implemented. With the original ACK support the callbacks have been deregistered the moment inputs are stopped automatically. But for cursor based inputs we need to continue handling ACKs, even after the input is gone. The interface and helpers provide greater control over ACK handling after shutdown, which is required for the journald, winlog, and file/log inputs. --- CHANGELOG-developer.next.asciidoc | 2 + filebeat/beater/acker.go | 64 ++- filebeat/beater/acker_test.go | 9 +- filebeat/beater/filebeat.go | 24 +- filebeat/input/kafka/input.go | 15 +- filebeat/input/v2/input-cursor/input.go | 11 +- .../input/v2/input-cursor/manager_test.go | 22 +- journalbeat/beater/journalbeat.go | 28 +- journalbeat/input/input.go | 15 +- libbeat/beat/pipeline.go | 73 ++- libbeat/common/acker/acker.go | 341 ++++++++++++ libbeat/common/acker/acker_test.go | 250 +++++++++ libbeat/publisher/pipeline/acker.go | 499 ------------------ libbeat/publisher/pipeline/acker_test.go | 53 -- libbeat/publisher/pipeline/client.go | 113 +++- libbeat/publisher/pipeline/client_ack.go | 118 ----- libbeat/publisher/pipeline/config.go | 16 +- libbeat/publisher/pipeline/consumer.go | 39 +- libbeat/publisher/pipeline/nilpipeline.go | 27 +- libbeat/publisher/pipeline/pipeline.go | 92 +--- libbeat/publisher/pipeline/pipeline_ack.go | 323 ------------ libbeat/publisher/pipeline/stress/gen.go | 5 +- libbeat/publisher/pipetool/pipetool.go | 12 + libbeat/publisher/testing/testing.go | 4 - metricbeat/cmd/test/modules.go | 5 - winlogbeat/beater/eventlogger.go | 14 +- winlogbeat/beater/winlogbeat.go | 8 - .../pipelinemanager/clientLogReader.go | 5 +- x-pack/filebeat/input/googlepubsub/input.go | 21 +- .../input/googlepubsub/pubsub_test.go | 35 +- x-pack/filebeat/input/o365audit/input.go | 19 +- x-pack/filebeat/input/s3/input.go | 15 +- .../functionbeat/function/core/sync_client.go | 58 +- .../function/core/sync_client_test.go | 100 +--- 34 files changed, 955 insertions(+), 1480 deletions(-) create mode 100644 libbeat/common/acker/acker.go create mode 100644 libbeat/common/acker/acker_test.go delete mode 100644 libbeat/publisher/pipeline/acker.go delete mode 100644 libbeat/publisher/pipeline/acker_test.go delete mode 100644 libbeat/publisher/pipeline/client_ack.go delete mode 100644 libbeat/publisher/pipeline/pipeline_ack.go diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 2449f89403a..20b7c8f8a58 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -49,6 +49,8 @@ The list below covers the major changes between 7.0.0-rc2 and master only. - `management.ConfigManager` has been renamed to `management.Manager`. {pull}19114[19114] - `UpdateStatus` has been added to the `management.Manager` interface. {pull}19114[19114] - Remove `common.MapStrPointer` parameter from `cfgfile.Runnerfactory` interface. {pull}19135[19135] +- Replace `ACKCount`, `ACKEvents`, and `ACKLastEvent` callbacks with `ACKHandler` and interface in `beat.ClientConfig`. {pull}19632[19632] +- Remove global ACK handler support via `SetACKHandler` from publisher pipeline. {pull}19632[19632] ==== Bugfixes diff --git a/filebeat/beater/acker.go b/filebeat/beater/acker.go index 35ef0ca0beb..c24cd7af13d 100644 --- a/filebeat/beater/acker.go +++ b/filebeat/beater/acker.go @@ -19,17 +19,11 @@ package beater import ( "github.com/elastic/beats/v7/filebeat/input/file" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common/acker" "github.com/elastic/beats/v7/libbeat/logp" ) -// eventAcker handles publisher pipeline ACKs and forwards -// them to the registrar or directly to the stateless logger. -type eventACKer struct { - stateful statefulLogger - stateless statelessLogger - log *logp.Logger -} - type statefulLogger interface { Published(states []file.State) } @@ -38,35 +32,37 @@ type statelessLogger interface { Published(c int) bool } -func newEventACKer(stateless statelessLogger, stateful statefulLogger) *eventACKer { - return &eventACKer{stateless: stateless, stateful: stateful, log: logp.NewLogger("acker")} -} +// eventAcker handles publisher pipeline ACKs and forwards +// them to the registrar or directly to the stateless logger. +func eventACKer(statelessOut statelessLogger, statefulOut statefulLogger) beat.ACKer { + log := logp.NewLogger("acker") -func (a *eventACKer) ackEvents(data []interface{}) { - stateless := 0 - states := make([]file.State, 0, len(data)) - for _, datum := range data { - if datum == nil { - stateless++ - continue - } + return acker.EventPrivateReporter(func(_ int, data []interface{}) { + stateless := 0 + states := make([]file.State, 0, len(data)) + for _, datum := range data { + if datum == nil { + stateless++ + continue + } - st, ok := datum.(file.State) - if !ok { - stateless++ - continue - } + st, ok := datum.(file.State) + if !ok { + stateless++ + continue + } - states = append(states, st) - } + states = append(states, st) + } - if len(states) > 0 { - a.log.Debugw("stateful ack", "count", len(states)) - a.stateful.Published(states) - } + if len(states) > 0 { + log.Debugw("stateful ack", "count", len(states)) + statefulOut.Published(states) + } - if stateless > 0 { - a.log.Debugw("stateless ack", "count", stateless) - a.stateless.Published(stateless) - } + if stateless > 0 { + log.Debugw("stateless ack", "count", stateless) + statelessOut.Published(stateless) + } + }) } diff --git a/filebeat/beater/acker_test.go b/filebeat/beater/acker_test.go index 2d195c8aabd..19a8476bbf5 100644 --- a/filebeat/beater/acker_test.go +++ b/filebeat/beater/acker_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/elastic/beats/v7/filebeat/input/file" + "github.com/elastic/beats/v7/libbeat/beat" ) type mockStatefulLogger struct { @@ -78,9 +79,13 @@ func TestACKer(t *testing.T) { sl := &mockStatelessLogger{} sf := &mockStatefulLogger{} - h := newEventACKer(sl, sf) + h := eventACKer(sl, sf) - h.ackEvents(test.data) + for _, datum := range test.data { + h.AddEvent(beat.Event{Private: datum}, true) + } + + h.ACKEvents(len(test.data)) assert.Equal(t, test.stateless, sl.count) assert.Equal(t, test.stateful, sf.states) }) diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 7de7b5d541f..9bdfab36b73 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -229,16 +229,20 @@ func (fb *Filebeat) Run(b *beat.Beat) error { // Make sure all events that were published in registrarChannel := newRegistrarLogger(registrar) - err = b.Publisher.SetACKHandler(beat.PipelineACKHandler{ - ACKEvents: newEventACKer(finishedLogger, registrarChannel).ackEvents, - }) - if err != nil { - logp.Err("Failed to install the registry with the publisher pipeline: %v", err) - return err - } - - fb.pipeline = pipetool.WithDefaultGuarantees(b.Publisher, beat.GuaranteedSend) - fb.pipeline = withPipelineEventCounter(fb.pipeline, wgEvents) + // setup event counting for startup and a global common ACKer, such that all events will be + // routed to the reigstrar after they've been ACKed. + // Events with Private==nil or the type of private != file.State are directly + // forwarded to `finishedLogger`. Events from the `logs` input will first be forwarded + // to the registrar via `registrarChannel`, which finally forwards the events to finishedLogger as well. + // The finishedLogger decrements the counters in wgEvents after all events have been securely processed + // by the registry. + fb.pipeline = withPipelineEventCounter(b.Publisher, wgEvents) + fb.pipeline = pipetool.WithACKer(fb.pipeline, eventACKer(finishedLogger, registrarChannel)) + + // Filebeat by default required infinite retry. Let's configure this for all + // inputs by default. Inputs (and InputController) can overwrite the sending + // guarantees explicitly when connecting with the pipeline. + fb.pipeline = pipetool.WithDefaultGuarantees(fb.pipeline, beat.GuaranteedSend) outDone := make(chan struct{}) // outDone closes down all active pipeline connections pipelineConnector := channel.NewOutletFactory(outDone).Create diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index b29ae5354d5..add3c664224 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -31,6 +31,7 @@ import ( "github.com/elastic/beats/v7/filebeat/input" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/acker" "github.com/elastic/beats/v7/libbeat/common/backoff" "github.com/elastic/beats/v7/libbeat/common/kafka" "github.com/elastic/beats/v7/libbeat/logp" @@ -69,13 +70,15 @@ func NewInput( } out, err := connector.ConnectWith(cfg, beat.ClientConfig{ - ACKEvents: func(events []interface{}) { - for _, event := range events { - if meta, ok := event.(eventMeta); ok { - meta.handler.ack(meta.message) + ACKHandler: acker.ConnectionOnly( + acker.EventPrivateReporter(func(_ int, events []interface{}) { + for _, event := range events { + if meta, ok := event.(eventMeta); ok { + meta.handler.ack(meta.message) + } } - } - }, + }), + ), CloseRef: doneChannelContext(inputContext.Done), WaitClose: config.WaitClose, }) diff --git a/filebeat/input/v2/input-cursor/input.go b/filebeat/input/v2/input-cursor/input.go index a768823d9f6..2efc9ea93a8 100644 --- a/filebeat/input/v2/input-cursor/input.go +++ b/filebeat/input/v2/input-cursor/input.go @@ -30,6 +30,7 @@ import ( input "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common/acker" "github.com/elastic/beats/v7/libbeat/logp" ) @@ -145,8 +146,8 @@ func (inp *managedInput) runSource( }() client, err := pipeline.ConnectWith(beat.ClientConfig{ - CloseRef: ctx.Cancelation, - ACKEvents: newInputACKHandler(ctx.Logger), + CloseRef: ctx.Cancelation, + ACKHandler: newInputACKHandler(ctx.Logger), }) if err != nil { return err @@ -174,8 +175,8 @@ func (inp *managedInput) createSourceID(s Source) string { return fmt.Sprintf("%v::%v", inp.manager.Type, s.Name()) } -func newInputACKHandler(log *logp.Logger) func([]interface{}) { - return func(private []interface{}) { +func newInputACKHandler(log *logp.Logger) beat.ACKer { + return acker.EventPrivateReporter(func(acked int, private []interface{}) { var n uint var last int for i := 0; i < len(private); i++ { @@ -196,5 +197,5 @@ func newInputACKHandler(log *logp.Logger) func([]interface{}) { return } private[last].(*updateOp).Execute(n) - } + }) } diff --git a/filebeat/input/v2/input-cursor/manager_test.go b/filebeat/input/v2/input-cursor/manager_test.go index 3e7aefb64bd..cb4d813ec5e 100644 --- a/filebeat/input/v2/input-cursor/manager_test.go +++ b/filebeat/input/v2/input-cursor/manager_test.go @@ -434,24 +434,16 @@ func TestManager_InputsRun(t *testing.T) { defer cancel() // setup publishing pipeline and capture ACKer, so we can simulate progress in the Output - var acker func([]interface{}) - var activeEventPrivate []interface{} - - ackEvents := func(n int) { - data, rest := activeEventPrivate[:n], activeEventPrivate[n:] - activeEventPrivate = rest - acker(data) - } - + var acker beat.ACKer var wgACKer sync.WaitGroup wgACKer.Add(1) pipeline := &pubtest.FakeConnector{ ConnectFunc: func(cfg beat.ClientConfig) (beat.Client, error) { defer wgACKer.Done() - acker = cfg.ACKEvents + acker = cfg.ACKHandler return &pubtest.FakeClient{ PublishFunc: func(event beat.Event) { - activeEventPrivate = append(activeEventPrivate, event.Private) + acker.AddEvent(event, true) }, }, nil }, @@ -478,19 +470,19 @@ func TestManager_InputsRun(t *testing.T) { require.Equal(t, nil, store.snapshot()["test::key"].Cursor) // ACK first 2 events and check snapshot state - ackEvents(2) + acker.ACKEvents(2) require.Equal(t, "test-cursor-state2", store.snapshot()["test::key"].Cursor) // ACK 1 events and check snapshot state (3 events published) - ackEvents(1) + acker.ACKEvents(1) require.Equal(t, "test-cursor-state3", store.snapshot()["test::key"].Cursor) // ACK event without cursor update and check snapshot state not modified - ackEvents(1) + acker.ACKEvents(1) require.Equal(t, "test-cursor-state3", store.snapshot()["test::key"].Cursor) // ACK rest - ackEvents(3) + acker.ACKEvents(3) require.Equal(t, "test-cursor-state6", store.snapshot()["test::key"].Cursor) }) } diff --git a/journalbeat/beater/journalbeat.go b/journalbeat/beater/journalbeat.go index a2dcab6d7b6..6694dbd8a9a 100644 --- a/journalbeat/beater/journalbeat.go +++ b/journalbeat/beater/journalbeat.go @@ -27,8 +27,10 @@ import ( "github.com/elastic/beats/v7/journalbeat/input" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/acker" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/publisher/pipetool" "github.com/elastic/beats/v7/journalbeat/config" _ "github.com/elastic/beats/v7/journalbeat/include" @@ -67,7 +69,7 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { var inputs []*input.Input for _, c := range config.Inputs { - i, err := input.New(c, b, done, cp.States()) + i, err := input.New(c, b.Info, done, cp.States()) if err != nil { return nil, err } @@ -91,24 +93,18 @@ func (bt *Journalbeat) Run(b *beat.Beat) error { bt.logger.Info("journalbeat is running! Hit CTRL-C to stop it.") defer bt.logger.Info("journalbeat is stopping") - err := bt.pipeline.SetACKHandler(beat.PipelineACKHandler{ - ACKLastEvents: func(data []interface{}) { - for _, datum := range data { - if st, ok := datum.(checkpoint.JournalState); ok { - bt.checkpoint.PersistState(st) - } - } - }, - }) - if err != nil { - return err - } defer bt.checkpoint.Shutdown() + pipeline := pipetool.WithACKer(b.Publisher, acker.LastEventPrivateReporter(func(_ int, private interface{}) { + if st, ok := private.(checkpoint.JournalState); ok { + bt.checkpoint.PersistState(st) + } + })) + var wg sync.WaitGroup for _, i := range bt.inputs { wg.Add(1) - go bt.runInput(i, &wg) + go bt.runInput(i, &wg, pipeline) } wg.Wait() @@ -116,9 +112,9 @@ func (bt *Journalbeat) Run(b *beat.Beat) error { return nil } -func (bt *Journalbeat) runInput(i *input.Input, wg *sync.WaitGroup) { +func (bt *Journalbeat) runInput(i *input.Input, wg *sync.WaitGroup, pipeline beat.Pipeline) { defer wg.Done() - i.Run() + i.Run(pipeline) } // Stop stops the beat and its inputs. diff --git a/journalbeat/input/input.go b/journalbeat/input/input.go index 368c2f52c13..f8cbf1fbf73 100644 --- a/journalbeat/input/input.go +++ b/journalbeat/input/input.go @@ -23,6 +23,7 @@ import ( "github.com/elastic/beats/v7/libbeat/processors/add_formatted_index" + "github.com/elastic/beats/v7/libbeat/common/acker" "github.com/elastic/beats/v7/libbeat/common/fmtstr" "github.com/elastic/beats/v7/journalbeat/checkpoint" @@ -38,7 +39,6 @@ type Input struct { readers []*reader.Reader done chan struct{} config Config - pipeline beat.Pipeline client beat.Client states map[string]checkpoint.JournalState logger *logp.Logger @@ -49,7 +49,7 @@ type Input struct { // New returns a new Inout func New( c *common.Config, - b *beat.Beat, + info beat.Info, done chan struct{}, states map[string]checkpoint.JournalState, ) (*Input, error) { @@ -104,7 +104,7 @@ func New( readers = append(readers, r) } - inputProcessors, err := processorsForInput(b.Info, config) + inputProcessors, err := processorsForInput(info, config) if err != nil { return nil, err } @@ -115,7 +115,6 @@ func New( readers: readers, done: done, config: config, - pipeline: b.Publisher, states: states, logger: logger, eventMeta: config.EventMetadata, @@ -125,18 +124,18 @@ func New( // Run connects to the output, collects entries from the readers // and then publishes the events. -func (i *Input) Run() { +func (i *Input) Run(pipeline beat.Pipeline) { var err error - i.client, err = i.pipeline.ConnectWith(beat.ClientConfig{ + i.client, err = pipeline.ConnectWith(beat.ClientConfig{ PublishMode: beat.GuaranteedSend, Processing: beat.ProcessingConfig{ EventMetadata: i.eventMeta, Meta: nil, Processor: i.processors, }, - ACKCount: func(n int) { + ACKHandler: acker.Counting(func(n int) { i.logger.Debugw("journalbeat successfully published events", "event.count", n) - }, + }), }) if err != nil { i.logger.Error("Error connecting to output: %v", err) diff --git a/libbeat/beat/pipeline.go b/libbeat/beat/pipeline.go index 9d6fbf04e86..699c96d8b62 100644 --- a/libbeat/beat/pipeline.go +++ b/libbeat/beat/pipeline.go @@ -23,17 +23,15 @@ import ( "github.com/elastic/beats/v7/libbeat/common" ) +// Pipeline provides access to libbeat event publishing by creating a Client +// instance. type Pipeline interface { - PipelineConnector - SetACKHandler(PipelineACKHandler) error -} - -// PipelineConnector creates a publishing Client. This is typically backed by a Pipeline. -type PipelineConnector interface { ConnectWith(ClientConfig) (Client, error) Connect() (Client, error) } +type PipelineConnector = Pipeline + // Client holds a connection to the beats publisher pipeline type Client interface { Publish(Event) @@ -56,28 +54,38 @@ type ClientConfig struct { // is configured WaitClose time.Duration + // Configure ACK callback. + ACKHandler ACKer + // Events configures callbacks for common client callbacks Events ClientEventer +} - // ACK handler strategies. - // Note: ack handlers are run in another go-routine owned by the publisher pipeline. - // They should not block for to long, to not block the internal buffers for - // too long (buffers can only be freed after ACK has been processed). - // Note: It's not supported to configure multiple ack handler types. Use at - // most one. - - // ACKCount reports the number of published events recently acknowledged - // by the pipeline. - ACKCount func(int) - - // ACKEvents reports the events private data of recently acknowledged events. - // Note: The slice passed must be copied if the events are to be processed - // after the handler returns. - ACKEvents func([]interface{}) - - // ACKLastEvent reports the last ACKed event out of a batch of ACKed events only. - // Only the events 'Private' field will be reported. - ACKLastEvent func(interface{}) +// ACKer can be registered with a Client when connecting to the pipeline. +// The ACKer will be informed when events are added or dropped by the processors, +// and when an event has been ACKed by the outputs. +// +// Due to event publishing and ACKing are asynchronous operations, the +// operations on ACKer are normally executed in different go routines. ACKers +// are required to be multi-threading safe. +type ACKer interface { + // AddEvent informs the ACKer that a new event has been send to the client. + // AddEvent is called after the processors have handled the event. If the + // event has been dropped by the processor `published` will be set to true. + // This allows the ACKer to do some bookeeping for dropped events. + AddEvent(event Event, published bool) + + // ACK Events from the output and pipeline queue are forwarded to ACKEvents. + // The number of reported events only matches the known number of events downstream. + // ACKers might need to keep track of dropped events by themselves. + ACKEvents(n int) + + // Close informs the ACKer that the Client used to publish to the pipeline has been closed. + // No new events should be published anymore. The ACKEvents method still will be actively called + // as long as there are pending events for the client in the pipeline. The Close signal can be used + // to supress any ACK event propagation if required. + // Close might be called from another go-routine than AddEvent and ACKEvents. + Close() } // CloseRef allows users to close the client asynchronously. @@ -128,21 +136,6 @@ type ClientEventer interface { DroppedOnPublish(Event) // event has been dropped, while waiting for the queue } -// PipelineACKHandler configures some pipeline-wide event ACK handler. -type PipelineACKHandler struct { - // ACKCount reports the number of published events recently acknowledged - // by the pipeline. - ACKCount func(int) - - // ACKEvents reports the events recently acknowledged by the pipeline. - // Only the events 'Private' field will be reported. - ACKEvents func([]interface{}) - - // ACKLastEvent reports the last ACKed event per pipeline client. - // Only the events 'Private' field will be reported. - ACKLastEvents func([]interface{}) -} - type ProcessorList interface { Processor All() []Processor diff --git a/libbeat/common/acker/acker.go b/libbeat/common/acker/acker.go new file mode 100644 index 00000000000..59a25513a7e --- /dev/null +++ b/libbeat/common/acker/acker.go @@ -0,0 +1,341 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package acker + +import ( + "sync" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common/atomic" +) + +// Nil creates an ACKer that does nothing. +func Nil() beat.ACKer { + return nilACKer{} +} + +type nilACKer struct{} + +func (nilACKer) AddEvent(event beat.Event, published bool) {} +func (nilACKer) ACKEvents(n int) {} +func (nilACKer) Close() {} + +// RawCounting reports the number of ACKed events as has been reported by the outputs or queue. +// The ACKer does not keep track of dropped events. Events after the client has +// been closed will still be reported. +func RawCounting(fn func(int)) beat.ACKer { + return countACKer(fn) +} + +type countACKer func(int) + +func (countACKer) AddEvent(_ beat.Event, _ bool) {} +func (fn countACKer) ACKEvents(n int) { fn(n) } +func (countACKer) Close() {} + +// TrackingCounter keeps track of published and dropped events. It reports +// the number of acked events from the queue in the 'acked' argument and the +// total number of events published via the Client in the 'total' argument. +// The TrackingCountACKer keeps track of the order of events being send and events being acked. +// If N events have been acked by the output, then `total` will include all events dropped in between +// the last forwarded N events and the 'tail' of dropped events. For example (X = send, D = dropped): +// +// index: 0 1 2 3 4 5 6 7 8 9 10 11 +// event: X X D D X D D X D X X X +// +// If the output ACKs 3 events, then all events from index 0 to 6 will be reported because: +// - the drop sequence for events 2 and 3 is inbetween the number of forwarded and ACKed events +// - events 5-6 have been dropped as well, but event 7 is not ACKed yet +// +// If there is no event currently tracked by this ACKer and the next event is dropped by the processors, +// then `fn` will be called immediately with acked=0 and total=1. +func TrackingCounter(fn func(acked, total int)) beat.ACKer { + a := &trackingACKer{fn: fn} + init := &gapInfo{} + a.lst.head = init + a.lst.tail = init + return a +} + +// Counting returns an ACK count for all events a client has tried to publish. +// The ACKer keeps track of dropped events as well, and adjusts the ACK from the outputs accordingly. +func Counting(fn func(n int)) beat.ACKer { + return TrackingCounter(func(_ int, total int) { + fn(total) + }) +} + +type trackingACKer struct { + fn func(acked, total int) + events atomic.Uint32 + lst gapList +} + +type gapList struct { + sync.Mutex + head, tail *gapInfo +} + +type gapInfo struct { + sync.Mutex + next *gapInfo + send, dropped int +} + +func (a *trackingACKer) AddEvent(_ beat.Event, published bool) { + a.events.Inc() + if published { + a.addPublishedEvent() + } else { + a.addDropEvent() + } +} + +// addPublishedEvent increments the 'send' counter in the current gapInfo +// element in the tail of the list. If events have been dropped, we append a +// new empty gapInfo element. +func (a *trackingACKer) addPublishedEvent() { + a.lst.Lock() + + current := a.lst.tail + current.Lock() + if current.dropped > 0 { + tmp := &gapInfo{} + tmp.Lock() + + a.lst.tail.next = tmp + a.lst.tail = tmp + current.Unlock() + current = tmp + } + a.lst.Unlock() + + current.send++ + current.Unlock() +} + +// addDropEvent increments the 'dropped' counter in the gapInfo element in the +// tail of the list. The callback will be run with total=1 and acked=0 if the +// acker state is empty and no events have been send yet. +func (a *trackingACKer) addDropEvent() { + a.lst.Lock() + + current := a.lst.tail + current.Lock() + + if current.send == 0 && current.next == nil { + // send can only be 0 if no no events/gaps present yet + if a.lst.head != a.lst.tail { + panic("gap list expected to be empty") + } + + a.fn(0, 1) + a.lst.Unlock() + current.Unlock() + + a.events.Dec() + return + } + + a.lst.Unlock() + current.dropped++ + current.Unlock() +} + +func (a *trackingACKer) ACKEvents(n int) { + var ( + total = 0 + acked = n + emptyLst bool + ) + + for n > 0 { + if emptyLst { + panic("too many events acked") + } + + a.lst.Lock() + current := a.lst.head + current.Lock() + + // advance list if we detect that the current head will be completely consumed + // by this ACK event. + if n >= current.send { + next := current.next + emptyLst = next == nil + if !emptyLst { + // advance list all event in current entry have been send and list as + // more then 1 gapInfo entry. If only 1 entry is present, list item will be + // reset and reused + a.lst.head = next + } + } + // hand over lock list-entry, so ACK handler and producer can operate + // on potentially different list ends + a.lst.Unlock() + + if n < current.send { + current.send -= n + total += n + n = 0 + } else { + total += current.send + current.dropped + n -= current.send + current.dropped = 0 + current.send = 0 + } + current.Unlock() + } + + a.events.Sub(uint32(total)) + a.fn(acked, total) +} + +func (a *trackingACKer) Close() {} + +// EventPrivateReporter reports all private fields from all events that have +// been published or removed. +// +// The EventPrivateFieldsACKer keeps track of the order of events being send +// and events being acked. If N events have been acked by the output, then +// `total` will include all events dropped in between the last forwarded N +// events and the 'tail' of dropped events. For example (X = send, D = +// dropped): +// +// index: 0 1 2 3 4 5 6 7 8 9 10 11 +// event: X X D D X D D X D X X X +// +// If the output ACKs 3 events, then all events from index 0 to 6 will be reported because: +// - the drop sequence for events 2 and 3 is inbetween the number of forwarded and ACKed events +// - events 5-6 have been dropped as well, but event 7 is not ACKed yet +func EventPrivateReporter(fn func(acked int, data []interface{})) beat.ACKer { + a := &eventDataACKer{fn: fn} + a.ACKer = TrackingCounter(a.onACK) + return a +} + +type eventDataACKer struct { + beat.ACKer + mu sync.Mutex + data []interface{} + fn func(acked int, data []interface{}) +} + +func (a *eventDataACKer) AddEvent(event beat.Event, published bool) { + a.mu.Lock() + a.data = append(a.data, event.Private) + a.mu.Unlock() + a.ACKer.AddEvent(event, published) +} + +func (a *eventDataACKer) onACK(acked, total int) { + if total == 0 { + return + } + + a.mu.Lock() + data := a.data[:total] + a.data = a.data[total:] + a.mu.Unlock() + + if len(data) > 0 { + a.fn(acked, data) + } +} + +// LastEventPrivateReporter reports only the 'latest' published and acked +// event if a batch of events have been ACKed. +func LastEventPrivateReporter(fn func(acked int, data interface{})) beat.ACKer { + ignored := 0 + return EventPrivateReporter(func(acked int, data []interface{}) { + for i := len(data) - 1; i >= 0; i-- { + if d := data[i]; d != nil { + fn(ignored+acked, d) + ignored = 0 + return + } + } + + // complete batch has been ignored due to missing data -> add count + ignored += acked + }) +} + +// Combine forwards events to a list of ackers. +func Combine(as ...beat.ACKer) beat.ACKer { + return ackerList(as) +} + +type ackerList []beat.ACKer + +func (l ackerList) AddEvent(event beat.Event, published bool) { + for _, a := range l { + a.AddEvent(event, published) + } +} + +func (l ackerList) ACKEvents(n int) { + for _, a := range l { + a.ACKEvents(n) + } +} + +func (l ackerList) Close() { + for _, a := range l { + a.Close() + } +} + +// ConnectionOnly ensures that the given ACKer is only used for as long as the +// pipeline Client is active. Once the Client is closed, the ACKer will drop +// its internal state and no more ACK events will be processed. +func ConnectionOnly(a beat.ACKer) beat.ACKer { + return &clientOnlyACKer{acker: a} +} + +type clientOnlyACKer struct { + mu sync.Mutex + acker beat.ACKer +} + +func (a *clientOnlyACKer) AddEvent(event beat.Event, published bool) { + a.mu.Lock() + defer a.mu.Unlock() + if sub := a.acker; sub != nil { + sub.AddEvent(event, published) + } +} + +func (a *clientOnlyACKer) ACKEvents(n int) { + a.mu.Lock() + sub := a.acker + a.mu.Unlock() + if sub != nil { + sub.ACKEvents(n) + } +} + +func (a *clientOnlyACKer) Close() { + a.mu.Lock() + sub := a.acker + a.acker = nil // drop the internal ACKer on Close and allow the runtime to gc accumulated state. + a.mu.Unlock() + if sub != nil { + sub.Close() + } +} diff --git a/libbeat/common/acker/acker_test.go b/libbeat/common/acker/acker_test.go new file mode 100644 index 00000000000..2f02ed5f673 --- /dev/null +++ b/libbeat/common/acker/acker_test.go @@ -0,0 +1,250 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package acker + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/beat" +) + +type fakeACKer struct { + AddEventFunc func(event beat.Event, published bool) + ACKEventsFunc func(n int) + CloseFunc func() +} + +func TestNil(t *testing.T) { + acker := Nil() + require.NotNil(t, acker) + + // check acker can be used without panic: + acker.AddEvent(beat.Event{}, false) + acker.AddEvent(beat.Event{}, true) + acker.ACKEvents(3) + acker.Close() +} + +func TestCounting(t *testing.T) { + t.Run("ack count is passed through", func(t *testing.T) { + var n int + acker := RawCounting(func(acked int) { n = acked }) + acker.ACKEvents(3) + require.Equal(t, 3, n) + }) +} + +func TestTracking(t *testing.T) { + t.Run("dropped event is acked immediately if empty", func(t *testing.T) { + var acked, total int + TrackingCounter(func(a, t int) { acked, total = a, t }).AddEvent(beat.Event{}, false) + require.Equal(t, 0, acked) + require.Equal(t, 1, total) + }) + + t.Run("no dropped events", func(t *testing.T) { + var acked, total int + acker := TrackingCounter(func(a, t int) { acked, total = a, t }) + acker.AddEvent(beat.Event{}, true) + acker.AddEvent(beat.Event{}, true) + acker.ACKEvents(2) + require.Equal(t, 2, acked) + require.Equal(t, 2, total) + }) + + t.Run("acking published includes dropped events in middle", func(t *testing.T) { + var acked, total int + acker := TrackingCounter(func(a, t int) { acked, total = a, t }) + acker.AddEvent(beat.Event{}, true) + acker.AddEvent(beat.Event{}, false) + acker.AddEvent(beat.Event{}, false) + acker.AddEvent(beat.Event{}, true) + acker.ACKEvents(2) + require.Equal(t, 2, acked) + require.Equal(t, 4, total) + }) + + t.Run("acking published includes dropped events at end of ACK interval", func(t *testing.T) { + var acked, total int + acker := TrackingCounter(func(a, t int) { acked, total = a, t }) + acker.AddEvent(beat.Event{}, true) + acker.AddEvent(beat.Event{}, true) + acker.AddEvent(beat.Event{}, false) + acker.AddEvent(beat.Event{}, false) + acker.AddEvent(beat.Event{}, true) + acker.ACKEvents(2) + require.Equal(t, 2, acked) + require.Equal(t, 4, total) + }) + + t.Run("partial ACKs", func(t *testing.T) { + var acked, total int + acker := TrackingCounter(func(a, t int) { acked, total = a, t }) + acker.AddEvent(beat.Event{}, true) + acker.AddEvent(beat.Event{}, true) + acker.AddEvent(beat.Event{}, true) + acker.AddEvent(beat.Event{}, true) + acker.AddEvent(beat.Event{}, false) + acker.AddEvent(beat.Event{}, true) + acker.AddEvent(beat.Event{}, true) + + acker.ACKEvents(2) + require.Equal(t, 2, acked) + require.Equal(t, 2, total) + + acker.ACKEvents(2) + require.Equal(t, 2, acked) + require.Equal(t, 3, total) + }) +} + +func TestEventPrivateReporter(t *testing.T) { + t.Run("dropped event is acked immediately if empty", func(t *testing.T) { + var acked int + var data []interface{} + acker := EventPrivateReporter(func(a int, d []interface{}) { acked, data = a, d }) + acker.AddEvent(beat.Event{Private: 1}, false) + require.Equal(t, 0, acked) + require.Equal(t, []interface{}{1}, data) + }) + + t.Run("no dropped events", func(t *testing.T) { + var acked int + var data []interface{} + acker := EventPrivateReporter(func(a int, d []interface{}) { acked, data = a, d }) + acker.AddEvent(beat.Event{Private: 1}, true) + acker.AddEvent(beat.Event{Private: 2}, true) + acker.AddEvent(beat.Event{Private: 3}, true) + acker.ACKEvents(3) + require.Equal(t, 3, acked) + require.Equal(t, []interface{}{1, 2, 3}, data) + }) + + t.Run("private of dropped events is included", func(t *testing.T) { + var acked int + var data []interface{} + acker := EventPrivateReporter(func(a int, d []interface{}) { acked, data = a, d }) + acker.AddEvent(beat.Event{Private: 1}, true) + acker.AddEvent(beat.Event{Private: 2}, false) + acker.AddEvent(beat.Event{Private: 3}, true) + acker.ACKEvents(2) + require.Equal(t, 2, acked) + require.Equal(t, []interface{}{1, 2, 3}, data) + }) +} + +func TestLastEventPrivateReporter(t *testing.T) { + t.Run("dropped event with private is acked immediately if empty", func(t *testing.T) { + var acked int + var datum interface{} + acker := LastEventPrivateReporter(func(a int, d interface{}) { acked, datum = a, d }) + acker.AddEvent(beat.Event{Private: 1}, false) + require.Equal(t, 0, acked) + require.Equal(t, 1, datum) + }) + + t.Run("dropped event without private is ignored", func(t *testing.T) { + var called bool + acker := LastEventPrivateReporter(func(_ int, _ interface{}) { called = true }) + acker.AddEvent(beat.Event{Private: nil}, false) + require.False(t, called) + }) + + t.Run("no dropped events", func(t *testing.T) { + var acked int + var data interface{} + acker := LastEventPrivateReporter(func(a int, d interface{}) { acked, data = a, d }) + acker.AddEvent(beat.Event{Private: 1}, true) + acker.AddEvent(beat.Event{Private: 2}, true) + acker.AddEvent(beat.Event{Private: 3}, true) + acker.ACKEvents(3) + require.Equal(t, 3, acked) + require.Equal(t, 3, data) + }) +} + +func TestCombine(t *testing.T) { + t.Run("AddEvent distributes", func(t *testing.T) { + var a1, a2 int + acker := Combine(countACKerOps(&a1, nil, nil), countACKerOps(&a2, nil, nil)) + acker.AddEvent(beat.Event{}, true) + require.Equal(t, 1, a1) + require.Equal(t, 1, a2) + }) + + t.Run("ACKEvents distributes", func(t *testing.T) { + var a1, a2 int + acker := Combine(countACKerOps(nil, &a1, nil), countACKerOps(nil, &a2, nil)) + acker.ACKEvents(1) + require.Equal(t, 1, a1) + require.Equal(t, 1, a2) + }) + + t.Run("Close distributes", func(t *testing.T) { + var c1, c2 int + acker := Combine(countACKerOps(nil, nil, &c1), countACKerOps(nil, nil, &c2)) + acker.Close() + require.Equal(t, 1, c1) + require.Equal(t, 1, c2) + }) +} + +func TestConnectionOnly(t *testing.T) { + t.Run("passes ACKs if not closed", func(t *testing.T) { + var n int + acker := ConnectionOnly(RawCounting(func(acked int) { n = acked })) + acker.ACKEvents(3) + require.Equal(t, 3, n) + }) + + t.Run("ignores ACKs after close", func(t *testing.T) { + var n int + acker := ConnectionOnly(RawCounting(func(acked int) { n = acked })) + acker.Close() + acker.ACKEvents(3) + require.Equal(t, 0, n) + }) +} + +func countACKerOps(add, acked, close *int) beat.ACKer { + return &fakeACKer{ + AddEventFunc: func(_ beat.Event, _ bool) { *add++ }, + ACKEventsFunc: func(_ int) { *acked++ }, + CloseFunc: func() { *close++ }, + } +} + +func (f *fakeACKer) AddEvent(event beat.Event, published bool) { + if f.AddEventFunc != nil { + f.AddEventFunc(event, published) + } +} + +func (f *fakeACKer) ACKEvents(n int) { + if f.ACKEventsFunc != nil { + f.ACKEventsFunc(n) + } +} + +func (f *fakeACKer) Close() { + if f.CloseFunc != nil { + f.CloseFunc() + } +} diff --git a/libbeat/publisher/pipeline/acker.go b/libbeat/publisher/pipeline/acker.go deleted file mode 100644 index ad0b0366416..00000000000 --- a/libbeat/publisher/pipeline/acker.go +++ /dev/null @@ -1,499 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package pipeline - -import ( - "sync" - "time" - - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/atomic" -) - -// acker is used to account for published and non-published events to be ACKed -// to the beats client. -// All pipeline and client ACK handling support is provided by acker instances. -type acker interface { - close() - wait() - addEvent(event beat.Event, published bool) bool - ackEvents(int) -} - -// emptyACK ignores any ACK signals and events. -type emptyACK struct{} - -var nilACKer acker = (*emptyACK)(nil) - -func (*emptyACK) close() {} -func (*emptyACK) wait() {} -func (*emptyACK) addEvent(_ beat.Event, _ bool) bool { return true } -func (*emptyACK) ackEvents(_ int) {} - -type ackerFn struct { - Close func() - AddEvent func(beat.Event, bool) bool - AckEvents func(int) -} - -func (a *ackerFn) close() { a.Close() } -func (a *ackerFn) addEvent(e beat.Event, b bool) bool { return a.AddEvent(e, b) } -func (a *ackerFn) ackEvents(n int) { a.AckEvents(n) } - -// countACK is used when broker ACK events can be simply forwarded to the -// producers ACKCount callback. -// The countACK is only applicable if no processors are configured. -// ACKs for closed clients will be ignored. -type countACK struct { - pipeline *Pipeline - fn func(total, acked int) -} - -func newCountACK(pipeline *Pipeline, fn func(total, acked int)) *countACK { - a := &countACK{fn: fn, pipeline: pipeline} - return a -} - -func (a *countACK) close() {} -func (a *countACK) wait() {} -func (a *countACK) addEvent(_ beat.Event, _ bool) bool { return true } -func (a *countACK) ackEvents(n int) { - if a.pipeline.ackActive.Load() { - a.fn(n, n) - } -} - -// gapCountACK returns event ACKs to the producer, taking account for dropped events. -// Events being dropped by processors will always be ACKed with the last batch ACKed -// by the broker. This way clients waiting for ACKs can expect all processed -// events being always ACKed. -type gapCountACK struct { - pipeline *Pipeline - - fn func(total int, acked int) - - done chan struct{} - - drop chan struct{} - acks chan int - - events atomic.Uint32 - lst gapList -} - -type gapList struct { - sync.Mutex - head, tail *gapInfo -} - -type gapInfo struct { - sync.Mutex - next *gapInfo - send, dropped int -} - -func newGapCountACK(pipeline *Pipeline, fn func(total, acked int)) *gapCountACK { - a := &gapCountACK{} - a.init(pipeline, fn) - return a -} - -func (a *gapCountACK) init(pipeline *Pipeline, fn func(int, int)) { - *a = gapCountACK{ - pipeline: pipeline, - fn: fn, - done: make(chan struct{}), - drop: make(chan struct{}), - acks: make(chan int, 1), - } - - init := &gapInfo{} - a.lst.head = init - a.lst.tail = init - - go a.ackLoop() -} - -func (a *gapCountACK) ackLoop() { - // close channels, as no more events should be ACKed: - // - once pipeline is closed - // - all events of the closed client have been acked/processed by the pipeline - - acks, drop := a.acks, a.drop - closing := false - - for { - select { - case <-a.done: - closing = true - a.done = nil - if a.events.Load() == 0 { - // stop worker, if all events accounted for have been ACKed. - // If new events are added after this acker won't handle them, which may - // result in duplicates - return - } - - case <-a.pipeline.ackDone: - return - - case n := <-acks: - empty := a.handleACK(n) - if empty && closing && a.events.Load() == 0 { - // stop worker, if and only if all events accounted for have been ACKed - return - } - - case <-drop: - // TODO: accumulate multiple drop events + flush count with timer - a.events.Sub(1) - a.fn(1, 0) - } - } -} - -func (a *gapCountACK) handleACK(n int) bool { - // collect items and compute total count from gapList - - var ( - total = 0 - acked = n - emptyLst bool - ) - - for n > 0 { - if emptyLst { - panic("too many events acked") - } - - a.lst.Lock() - current := a.lst.head - current.Lock() - - if n >= current.send { - nxt := current.next - emptyLst = nxt == nil - if !emptyLst { - // advance list all event in current entry have been send and list as - // more then 1 gapInfo entry. If only 1 entry is present, list item will be - // reset and reused - a.lst.head = nxt - } - } - - // hand over lock list-entry, so ACK handler and producer can operate - // on potentially different list ends - a.lst.Unlock() - - if n < current.send { - current.send -= n - total += n - n = 0 - } else { - total += current.send + current.dropped - n -= current.send - current.dropped = 0 - current.send = 0 - } - current.Unlock() - } - - a.events.Sub(uint32(total)) - a.fn(total, acked) - return emptyLst -} - -func (a *gapCountACK) close() { - // close client only, pipeline itself still can handle pending ACKs - close(a.done) -} - -func (a *gapCountACK) wait() {} - -func (a *gapCountACK) addEvent(_ beat.Event, published bool) bool { - // if gapList is empty and event is being dropped, forward drop event to ack - // loop worker: - - a.events.Inc() - if !published { - a.addDropEvent() - } else { - a.addPublishedEvent() - } - - return true -} - -func (a *gapCountACK) addDropEvent() { - a.lst.Lock() - - current := a.lst.tail - current.Lock() - - if current.send == 0 && current.next == nil { - // send can only be 0 if no no events/gaps present yet - if a.lst.head != a.lst.tail { - panic("gap list expected to be empty") - } - - current.Unlock() - a.lst.Unlock() - - a.drop <- struct{}{} - } else { - a.lst.Unlock() - - current.dropped++ - current.Unlock() - } -} - -func (a *gapCountACK) addPublishedEvent() { - // event is publisher -> add a new gap list entry if gap is present in current - // gapInfo - - a.lst.Lock() - - current := a.lst.tail - current.Lock() - - if current.dropped > 0 { - tmp := &gapInfo{} - a.lst.tail.next = tmp - a.lst.tail = tmp - - current.Unlock() - tmp.Lock() - current = tmp - } - - a.lst.Unlock() - - current.send++ - current.Unlock() -} - -func (a *gapCountACK) ackEvents(n int) { - select { - case <-a.pipeline.ackDone: // pipeline is closing down -> ignore event - a.acks = nil - case a.acks <- n: // send ack event to worker - } -} - -// boundGapCountACK guards a gapCountACK instance by bounding the maximum number of -// active events. -// As beats might accumulate state while waiting for ACK, the boundGapCountACK blocks -// if too many events have been filtered out by processors. -type boundGapCountACK struct { - active bool - fn func(total, acked int) - - acker gapCountACK - sema *sema -} - -func newBoundGapCountACK( - pipeline *Pipeline, - sema *sema, - fn func(total, acked int), -) *boundGapCountACK { - a := &boundGapCountACK{active: true, sema: sema, fn: fn} - a.acker.init(pipeline, a.onACK) - return a -} - -func (a *boundGapCountACK) close() { a.acker.close() } -func (a *boundGapCountACK) wait() { a.acker.wait() } - -func (a *boundGapCountACK) addEvent(event beat.Event, published bool) bool { - a.sema.inc() - return a.acker.addEvent(event, published) -} - -func (a *boundGapCountACK) ackEvents(n int) { a.acker.ackEvents(n) } -func (a *boundGapCountACK) onACK(total, acked int) { - a.sema.release(total) - a.fn(total, acked) -} - -// eventDataACK reports all dropped and ACKed events private fields. -// An instance of eventDataACK requires a counting ACKer (boundGapCountACK or countACK), -// for accounting for potentially dropped events. -type eventDataACK struct { - mutex sync.Mutex - - acker acker - pipeline *Pipeline - - // TODO: replace with more efficient dynamic sized ring-buffer? - data []interface{} - fn func(data []interface{}, acked int) -} - -func newEventACK( - pipeline *Pipeline, - canDrop bool, - sema *sema, - fn func([]interface{}, int), -) *eventDataACK { - a := &eventDataACK{pipeline: pipeline, fn: fn} - a.acker = makeCountACK(pipeline, canDrop, sema, a.onACK) - - return a -} - -func makeCountACK(pipeline *Pipeline, canDrop bool, sema *sema, fn func(int, int)) acker { - if canDrop { - return newBoundGapCountACK(pipeline, sema, fn) - } - return newCountACK(pipeline, fn) -} - -func (a *eventDataACK) close() { a.acker.close() } - -func (a *eventDataACK) wait() { a.acker.wait() } - -func (a *eventDataACK) addEvent(event beat.Event, published bool) bool { - a.mutex.Lock() - active := a.pipeline.ackActive.Load() - if active { - a.data = append(a.data, event.Private) - } - a.mutex.Unlock() - - if active { - return a.acker.addEvent(event, published) - } - return false -} - -func (a *eventDataACK) ackEvents(n int) { a.acker.ackEvents(n) } -func (a *eventDataACK) onACK(total, acked int) { - n := total - - a.mutex.Lock() - data := a.data[:n] - a.data = a.data[n:] - a.mutex.Unlock() - - if len(data) > 0 && a.pipeline.ackActive.Load() { - a.fn(data, acked) - } -} - -// waitACK keeps track of events being produced and ACKs for events. -// On close waitACK will wait for pending events to be ACKed by the broker. -// The acker continues the closing operation if all events have been published -// or the maximum configured sleep time has been reached. -type waitACK struct { - acker acker - - signalAll chan struct{} // ack loop notifies `close` that all events have been acked - signalDone chan struct{} // shutdown handler telling `wait` that shutdown has been completed - waitClose time.Duration - - active atomic.Bool - - // number of active events - events atomic.Uint64 - - afterClose func() -} - -func newWaitACK(acker acker, timeout time.Duration, afterClose func()) *waitACK { - return &waitACK{ - acker: acker, - signalAll: make(chan struct{}, 1), - signalDone: make(chan struct{}), - waitClose: timeout, - active: atomic.MakeBool(true), - afterClose: afterClose, - } -} - -func (a *waitACK) close() { - a.active.Store(false) - - if a.events.Load() == 0 { - a.finishClose() - return - } - - // start routine to propagate shutdown signals or timeouts to anyone - // being blocked in wait. - go func() { - defer a.finishClose() - - select { - case <-a.signalAll: - case <-time.After(a.waitClose): - } - }() -} - -func (a *waitACK) finishClose() { - a.acker.close() - a.afterClose() - close(a.signalDone) -} - -func (a *waitACK) wait() { - <-a.signalDone -} - -func (a *waitACK) addEvent(event beat.Event, published bool) bool { - if published { - a.events.Inc() - } - return a.acker.addEvent(event, published) -} - -func (a *waitACK) ackEvents(n int) { - // return ACK signal to upper layers - a.acker.ackEvents(n) - a.releaseEvents(n) -} - -func (a *waitACK) releaseEvents(n int) { - value := a.events.Sub(uint64(n)) - if value != 0 { - return - } - - // send done signal, if close is waiting - if !a.active.Load() { - a.signalAll <- struct{}{} - } -} - -// closeACKer simply wraps any other acker. It calls a custom function after -// the underlying acker has been closed. -type closeACKer struct { - acker - afterClose func() -} - -func newCloseACKer(a acker, fn func()) acker { - return &closeACKer{acker: a, afterClose: fn} -} - -func (a closeACKer) close() { - a.acker.close() - a.afterClose() -} diff --git a/libbeat/publisher/pipeline/acker_test.go b/libbeat/publisher/pipeline/acker_test.go deleted file mode 100644 index 632c5a5d24c..00000000000 --- a/libbeat/publisher/pipeline/acker_test.go +++ /dev/null @@ -1,53 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package pipeline - -import ( - "reflect" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestNewCountACK(t *testing.T) { - dummyPipeline := &Pipeline{} - dummyFunc := func(total, acked int) {} - cAck := newCountACK(dummyPipeline, dummyFunc) - assert.Equal(t, dummyPipeline, cAck.pipeline) - assert.Equal(t, reflect.ValueOf(dummyFunc).Pointer(), reflect.ValueOf(cAck.fn).Pointer()) -} - -func TestMakeCountACK(t *testing.T) { - dummyPipeline := &Pipeline{} - dummyFunc := func(total, acked int) {} - dummySema := &sema{} - tests := []struct { - canDrop bool - sema *sema - fn func(total, acked int) - pipeline *Pipeline - expectedOutputType reflect.Value - }{ - {canDrop: false, sema: dummySema, fn: dummyFunc, pipeline: dummyPipeline, expectedOutputType: reflect.ValueOf(&countACK{})}, - {canDrop: true, sema: dummySema, fn: dummyFunc, pipeline: dummyPipeline, expectedOutputType: reflect.ValueOf(&boundGapCountACK{})}, - } - for _, test := range tests { - output := makeCountACK(test.pipeline, test.canDrop, test.sema, test.fn) - assert.Equal(t, test.expectedOutputType.String(), reflect.ValueOf(output).String()) - } -} diff --git a/libbeat/publisher/pipeline/client.go b/libbeat/publisher/pipeline/client.go index d8413d97b30..07b40f276fc 100644 --- a/libbeat/publisher/pipeline/client.go +++ b/libbeat/publisher/pipeline/client.go @@ -19,6 +19,7 @@ package pipeline import ( "sync" + "time" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/atomic" @@ -37,7 +38,8 @@ type client struct { processors beat.Processor producer queue.Producer mutex sync.Mutex - acker acker + acker beat.ACKer + waiter *clientCloseWaiter eventFlags publisher.EventFlags canDrop bool @@ -52,6 +54,15 @@ type client struct { eventer beat.ClientEventer } +type clientCloseWaiter struct { + events atomic.Uint32 + closing atomic.Bool + + signalAll chan struct{} // ack loop notifies `close` that all events have been acked + signalDone chan struct{} // shutdown handler telling `wait` that shutdown has been completed + waitClose time.Duration +} + func (c *client) PublishAll(events []beat.Event) { c.mutex.Lock() defer c.mutex.Unlock() @@ -99,13 +110,7 @@ func (c *client) publish(e beat.Event) { e = *event } - open := c.acker.addEvent(e, publish) - if !open { - // client is closing down -> report event as dropped and return - c.onDroppedOnPublish(e) - return - } - + c.acker.AddEvent(e, publish) if !publish { c.onFilteredOut(e) return @@ -143,34 +148,30 @@ func (c *client) Close() error { // first stop ack handling. ACK handler might block on wait (with timeout), waiting // for pending events to be ACKed. - c.doClose() - log.Debug("client: wait for acker to finish") - c.acker.wait() - log.Debug("client: acker shut down") - return nil -} - -func (c *client) doClose() { c.closeOnce.Do(func() { close(c.done) - log := c.logger() - c.isOpen.Store(false) c.onClosing() log.Debug("client: closing acker") - c.acker.close() // this must trigger a direct/indirect call to 'unlink' + c.waiter.signalClose() + c.waiter.wait() + + c.acker.Close() + log.Debug("client: done closing acker") + + log.Debug("client: unlink from queue") + c.unlink() + log.Debug("client: done unlink") }) + return nil } -// unlink is the final step of closing a client. It must be executed only after -// it is guaranteed that the underlying acker has been closed and will not -// accept any new publish or ACK events. -// This method is normally registered with the ACKer and triggered by it. +// unlink is the final step of closing a client. It cancells the connect of the +// client as producer to the queue. func (c *client) unlink() { log := c.logger() - log.Debug("client: done closing acker") n := c.producer.Cancel() // close connection to queue log.Debugf("client: cancelled %v events", n) @@ -233,3 +234,67 @@ func (c *client) onDroppedOnPublish(e beat.Event) { c.eventer.DroppedOnPublish(e) } } + +func newClientCloseWaiter(timeout time.Duration) *clientCloseWaiter { + return &clientCloseWaiter{ + signalAll: make(chan struct{}, 1), + signalDone: make(chan struct{}), + waitClose: timeout, + } +} + +func (w *clientCloseWaiter) AddEvent(_ beat.Event, published bool) { + if published { + w.events.Inc() + } +} + +func (w *clientCloseWaiter) ACKEvents(n int) { + value := w.events.Sub(uint32(n)) + if value != 0 { + return + } + + // send done signal, if close is waiting + if w.closing.Load() { + w.signalAll <- struct{}{} + } +} + +// The Close signal from the pipeline is ignored. Instead the client +// explicitely uses `signalClose` and `wait` before it continues with the +// closing sequence. +func (w *clientCloseWaiter) Close() {} + +func (w *clientCloseWaiter) signalClose() { + if w == nil { + return + } + + w.closing.Store(false) + if w.events.Load() == 0 { + w.finishClose() + return + } + + // start routine to propagate shutdown signals or timeouts to anyone + // being blocked in wait. + go func() { + defer w.finishClose() + + select { + case <-w.signalAll: + case <-time.After(w.waitClose): + } + }() +} + +func (w *clientCloseWaiter) finishClose() { + close(w.signalDone) +} + +func (w *clientCloseWaiter) wait() { + if w != nil { + <-w.signalDone + } +} diff --git a/libbeat/publisher/pipeline/client_ack.go b/libbeat/publisher/pipeline/client_ack.go deleted file mode 100644 index cbd8fc9bb2d..00000000000 --- a/libbeat/publisher/pipeline/client_ack.go +++ /dev/null @@ -1,118 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package pipeline - -import ( - "time" - - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/atomic" -) - -type clientACKer struct { - acker - active atomic.Bool -} - -func (p *Pipeline) makeACKer( - canDrop bool, - cfg *beat.ClientConfig, - waitClose time.Duration, - afterClose func(), -) acker { - var ( - bld = p.ackBuilder - acker acker - ) - - sema := p.eventSema - switch { - case cfg.ACKCount != nil: - acker = bld.createCountACKer(canDrop, sema, cfg.ACKCount) - case cfg.ACKEvents != nil: - acker = bld.createEventACKer(canDrop, sema, cfg.ACKEvents) - case cfg.ACKLastEvent != nil: - cb := lastEventACK(cfg.ACKLastEvent) - acker = bld.createEventACKer(canDrop, sema, cb) - default: - if waitClose <= 0 { - acker = bld.createPipelineACKer(canDrop, sema) - } else { - acker = bld.createCountACKer(canDrop, sema, func(_ int) {}) - } - } - - if waitClose <= 0 { - return newCloseACKer(acker, afterClose) - } - return newWaitACK(acker, waitClose, afterClose) -} - -func lastEventACK(fn func(interface{})) func([]interface{}) { - return func(events []interface{}) { - fn(events[len(events)-1]) - } -} - -func (a *clientACKer) lift(acker acker) { - a.active = atomic.MakeBool(true) - a.acker = acker -} - -func (a *clientACKer) Active() bool { - return a.active.Load() -} - -func (a *clientACKer) close() { - a.active.Store(false) - a.acker.close() -} - -func (a *clientACKer) addEvent(event beat.Event, published bool) bool { - if a.active.Load() { - return a.acker.addEvent(event, published) - } - return false -} - -func (a *clientACKer) ackEvents(n int) { - a.acker.ackEvents(n) -} - -func buildClientCountACK( - pipeline *Pipeline, - canDrop bool, - sema *sema, - mk func(*clientACKer) func(int, int), -) acker { - guard := &clientACKer{} - cb := mk(guard) - guard.lift(makeCountACK(pipeline, canDrop, sema, cb)) - return guard -} - -func buildClientEventACK( - pipeline *Pipeline, - canDrop bool, - sema *sema, - mk func(*clientACKer) func([]interface{}, int), -) acker { - guard := &clientACKer{} - guard.lift(newEventACK(pipeline, canDrop, sema, mk(guard))) - return guard -} diff --git a/libbeat/publisher/pipeline/config.go b/libbeat/publisher/pipeline/config.go index 775e75d95e2..12633ff5b3e 100644 --- a/libbeat/publisher/pipeline/config.go +++ b/libbeat/publisher/pipeline/config.go @@ -48,23 +48,9 @@ func validateClientConfig(c *beat.ClientConfig) error { return fmt.Errorf("unknown publish mode %v", m) } - fnCount := 0 - countPtr := func(b bool) { - if b { - fnCount++ - } - } - - countPtr(c.ACKCount != nil) - countPtr(c.ACKEvents != nil) - countPtr(c.ACKLastEvent != nil) - if fnCount > 1 { - return fmt.Errorf("At most one of ACKCount, ACKEvents, ACKLastEvent can be configured") - } - // ACK handlers can not be registered DropIfFull is set, as dropping events // due to full broker can not be accounted for in the clients acker. - if fnCount != 0 && withDrop { + if c.ACKHandler != nil && withDrop { return errors.New("ACK handlers with DropIfFull mode not supported") } diff --git a/libbeat/publisher/pipeline/consumer.go b/libbeat/publisher/pipeline/consumer.go index a5c4a97e25a..20e2bf7ebc9 100644 --- a/libbeat/publisher/pipeline/consumer.go +++ b/libbeat/publisher/pipeline/consumer.go @@ -18,6 +18,9 @@ package pipeline import ( + "errors" + "sync" + "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/publisher/queue" @@ -29,13 +32,12 @@ import ( // is receiving cancelled batches from outputs to be closed on output reloading. type eventConsumer struct { logger *logp.Logger - done chan struct{} - - ctx *batchContext + ctx *batchContext pause atomic.Bool wait atomic.Bool sig chan consumerSignal + wg sync.WaitGroup queue queue.Queue consumer queue.Consumer @@ -55,8 +57,11 @@ const ( sigConsumerCheck consumerEventTag = iota sigConsumerUpdateOutput sigConsumerUpdateInput + sigStop ) +var errStopped = errors.New("stopped") + func newEventConsumer( log *logp.Logger, queue queue.Queue, @@ -64,7 +69,6 @@ func newEventConsumer( ) *eventConsumer { c := &eventConsumer{ logger: log, - done: make(chan struct{}), sig: make(chan consumerSignal, 3), out: nil, @@ -74,13 +78,19 @@ func newEventConsumer( } c.pause.Store(true) - go c.loop(c.consumer) + + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.loop(c.consumer) + }() return c } func (c *eventConsumer) close() { c.consumer.Close() - close(c.done) + c.sig <- consumerSignal{tag: sigStop} + c.wg.Wait() } func (c *eventConsumer) sigWait() { @@ -142,8 +152,11 @@ func (c *eventConsumer) loop(consumer queue.Consumer) { paused = true ) - handleSignal := func(sig consumerSignal) { + handleSignal := func(sig consumerSignal) error { switch sig.tag { + case sigStop: + return errStopped + case sigConsumerCheck: case sigConsumerUpdateOutput: @@ -159,6 +172,7 @@ func (c *eventConsumer) loop(consumer queue.Consumer) { } else { out = nil } + return nil } for { @@ -182,17 +196,18 @@ func (c *eventConsumer) loop(consumer queue.Consumer) { select { case sig := <-c.sig: - handleSignal(sig) + if err := handleSignal(sig); err != nil { + return + } continue default: } select { - case <-c.done: - log.Debug("stop pipeline event consumer") - return case sig := <-c.sig: - handleSignal(sig) + if err := handleSignal(sig); err != nil { + return + } case out <- batch: batch = nil if paused { diff --git a/libbeat/publisher/pipeline/nilpipeline.go b/libbeat/publisher/pipeline/nilpipeline.go index cf1b276db91..6d699dff27b 100644 --- a/libbeat/publisher/pipeline/nilpipeline.go +++ b/libbeat/publisher/pipeline/nilpipeline.go @@ -24,10 +24,8 @@ import ( type nilPipeline struct{} type nilClient struct { - eventer beat.ClientEventer - ackCount func(int) - ackEvents func([]interface{}) - ackLastEvent func(interface{}) + eventer beat.ClientEventer + acker beat.ACKer } var _nilPipeline = (*nilPipeline)(nil) @@ -44,10 +42,8 @@ func (p *nilPipeline) Connect() (beat.Client, error) { func (p *nilPipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { return &nilClient{ - eventer: cfg.Events, - ackCount: cfg.ACKCount, - ackEvents: cfg.ACKEvents, - ackLastEvent: cfg.ACKLastEvent, + eventer: cfg.Events, + acker: cfg.ACKHandler, }, nil } @@ -61,18 +57,11 @@ func (c *nilClient) PublishAll(events []beat.Event) { return } - if c.ackLastEvent != nil { - c.ackLastEvent(events[L-1].Private) - } - if c.ackEvents != nil { - tmp := make([]interface{}, L) - for i := range events { - tmp[i] = events[i].Private + if c.acker != nil { + for _, event := range events { + c.acker.AddEvent(event, true) } - c.ackEvents(tmp) - } - if c.ackCount != nil { - c.ackCount(L) + c.acker.ACKEvents(len(events)) } } diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index 2e6d5b7b77e..80b439d96e6 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -21,13 +21,13 @@ package pipeline import ( - "errors" "reflect" "sync" "time" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/acker" "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/logp" @@ -71,11 +71,7 @@ type Pipeline struct { waitCloser *waitCloser // pipeline ack - ackMode pipelineACKMode - ackActive atomic.Bool - ackDone chan struct{} - ackBuilder ackBuilder - eventSema *sema + eventSema *sema // closeRef signal propagation support guardStartSigPropagation sync.Once @@ -128,7 +124,6 @@ type pipelineEventer struct { observer queueObserver waitClose *waitCloser - cb *pipelineEventCB } type waitCloser struct { @@ -162,8 +157,6 @@ func New( waitCloseTimeout: settings.WaitClose, processors: settings.Processors, } - p.ackBuilder = &pipelineEmptyACK{p} - p.ackActive = atomic.MakeBool(true) if monitors.Metrics != nil { p.observer = newMetricsObserver(monitors.Metrics) @@ -197,45 +190,6 @@ func New( return p, nil } -// SetACKHandler sets a global ACK handler on all events published to the pipeline. -// SetACKHandler must be called before any connection is made. -func (p *Pipeline) SetACKHandler(handler beat.PipelineACKHandler) error { - p.eventer.mutex.Lock() - defer p.eventer.mutex.Unlock() - - if !p.eventer.modifyable { - return errors.New("can not set ack handler on already active pipeline") - } - - // TODO: check only one type being configured - - cb, err := newPipelineEventCB(handler) - if err != nil { - return err - } - - if cb == nil { - p.ackBuilder = &pipelineEmptyACK{p} - p.eventer.cb = nil - return nil - } - - p.eventer.cb = cb - if cb.mode == countACKMode { - p.ackBuilder = &pipelineCountACK{ - pipeline: p, - cb: cb.onCounts, - } - } else { - p.ackBuilder = &pipelineEventsACK{ - pipeline: p, - cb: cb.onEvents, - } - } - - return nil -} - // Close stops the pipeline, outputs and queue. // If WaitClose with WaitOnPipelineClose mode is configured, Close will block // for a duration of WaitClose, if there are still active events in the pipeline. @@ -292,9 +246,8 @@ func (p *Pipeline) Connect() (beat.Client, error) { // If not set otherwise the defaut publish mode is OutputChooses. func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { var ( - canDrop bool - dropOnCancel bool - eventFlags publisher.EventFlags + canDrop bool + eventFlags publisher.EventFlags ) err := validateClientConfig(&cfg) @@ -309,7 +262,6 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { switch cfg.PublishMode { case beat.GuaranteedSend: eventFlags = publisher.GuaranteedSend - dropOnCancel = true case beat.DropIfFull: canDrop = true } @@ -343,12 +295,9 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { reportEvents: reportEvents, } - acker := p.makeACKer(processors != nil, &cfg, waitClose, client.unlink) - producerCfg := queue.ProducerConfig{ - // Cancel events from queue if acker is configured - // and no pipeline-wide ACK handler is registered. - DropOnCancel: dropOnCancel && acker != nil && p.eventer.cb == nil, - } + ackHandler := cfg.ACKHandler + + producerCfg := queue.ProducerConfig{} if reportEvents || cfg.Events != nil { producerCfg.OnDrop = func(event beat.Event) { @@ -361,13 +310,27 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { } } - if acker != nil { - producerCfg.ACK = acker.ackEvents + var waiter *clientCloseWaiter + if waitClose > 0 { + waiter = newClientCloseWaiter(waitClose) + } + + if waiter != nil { + if ackHandler == nil { + ackHandler = waiter + } else { + ackHandler = acker.Combine(waiter, ackHandler) + } + } + + if ackHandler != nil { + producerCfg.ACK = ackHandler.ACKEvents } else { - acker = newCloseACKer(nilACKer, client.unlink) + ackHandler = acker.Nil() } - client.acker = acker + client.acker = ackHandler + client.waiter = waiter client.producer = p.queue.Producer(producerCfg) p.observer.clientConnected() @@ -429,7 +392,7 @@ func (p *Pipeline) runSignalPropagation() { isSig := (chosen & 1) == 1 if isSig { client := clients[i] - client.doClose() + client.Close() } // remove: @@ -470,9 +433,6 @@ func (e *pipelineEventer) OnACK(n int) { if wc := e.waitClose; wc != nil { wc.dec(n) } - if e.cb != nil { - e.cb.reportQueueACK(n) - } } func (e *waitCloser) inc() { diff --git a/libbeat/publisher/pipeline/pipeline_ack.go b/libbeat/publisher/pipeline/pipeline_ack.go deleted file mode 100644 index e9efb390f4f..00000000000 --- a/libbeat/publisher/pipeline/pipeline_ack.go +++ /dev/null @@ -1,323 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package pipeline - -import ( - "errors" - - "github.com/elastic/beats/v7/libbeat/beat" -) - -type ackBuilder interface { - createPipelineACKer(canDrop bool, sema *sema) acker - createCountACKer(canDrop bool, sema *sema, fn func(int)) acker - createEventACKer(canDrop bool, sema *sema, fn func([]interface{})) acker -} - -type pipelineEmptyACK struct { - pipeline *Pipeline -} - -func (b *pipelineEmptyACK) createPipelineACKer(canDrop bool, sema *sema) acker { - return nilACKer -} - -func (b *pipelineEmptyACK) createCountACKer(canDrop bool, sema *sema, fn func(int)) acker { - return buildClientCountACK(b.pipeline, canDrop, sema, func(guard *clientACKer) func(int, int) { - return func(total, acked int) { - if guard.Active() { - fn(total) - } - } - }) -} - -func (b *pipelineEmptyACK) createEventACKer( - canDrop bool, - sema *sema, - fn func([]interface{}), -) acker { - return buildClientEventACK(b.pipeline, canDrop, sema, func(guard *clientACKer) func([]interface{}, int) { - return func(events []interface{}, acked int) { - if guard.Active() { - fn(events) - } - } - }) -} - -type pipelineCountACK struct { - pipeline *Pipeline - cb func(int, int) -} - -func (b *pipelineCountACK) createPipelineACKer(canDrop bool, sema *sema) acker { - return makeCountACK(b.pipeline, canDrop, sema, b.cb) -} - -func (b *pipelineCountACK) createCountACKer(canDrop bool, sema *sema, fn func(int)) acker { - return buildClientCountACK(b.pipeline, canDrop, sema, func(guard *clientACKer) func(int, int) { - return func(total, acked int) { - b.cb(total, acked) - if guard.Active() { - fn(total) - } - } - }) -} - -func (b *pipelineCountACK) createEventACKer( - canDrop bool, - sema *sema, - fn func([]interface{}), -) acker { - return buildClientEventACK(b.pipeline, canDrop, sema, func(guard *clientACKer) func([]interface{}, int) { - return func(data []interface{}, acked int) { - b.cb(len(data), acked) - if guard.Active() { - fn(data) - } - } - }) -} - -type pipelineEventsACK struct { - pipeline *Pipeline - cb func([]interface{}, int) -} - -func (b *pipelineEventsACK) createPipelineACKer(canDrop bool, sema *sema) acker { - return newEventACK(b.pipeline, canDrop, sema, b.cb) -} - -func (b *pipelineEventsACK) createCountACKer(canDrop bool, sema *sema, fn func(int)) acker { - return buildClientEventACK(b.pipeline, canDrop, sema, func(guard *clientACKer) func([]interface{}, int) { - return func(data []interface{}, acked int) { - b.cb(data, acked) - if guard.Active() { - fn(len(data)) - } - } - }) -} - -func (b *pipelineEventsACK) createEventACKer(canDrop bool, sema *sema, fn func([]interface{})) acker { - return buildClientEventACK(b.pipeline, canDrop, sema, func(guard *clientACKer) func([]interface{}, int) { - return func(data []interface{}, acked int) { - b.cb(data, acked) - if guard.Active() { - fn(data) - } - } - }) -} - -// pipelineEventCB internally handles active ACKs in the pipeline. -// It receives ACK events from the queue and the individual clients. -// Once the queue returns an ACK to the pipelineEventCB, the worker loop will collect -// events from all clients having published events in the last batch of events -// being ACKed. -// the PipelineACKHandler will be notified, once all events being ACKed -// (including dropped events) have been collected. Only one ACK-event is handled -// at a time. The pipeline global and clients ACK handler will be blocked for the time -// an ACK event is being processed. -type pipelineEventCB struct { - done chan struct{} - - acks chan int - - events chan eventsDataMsg - droppedEvents chan eventsDataMsg - - mode pipelineACKMode - handler beat.PipelineACKHandler -} - -type eventsDataMsg struct { - data []interface{} - total, acked int - sig chan struct{} -} - -type pipelineACKMode uint8 - -const ( - noACKMode pipelineACKMode = iota - countACKMode - eventsACKMode - lastEventsACKMode -) - -func newPipelineEventCB(handler beat.PipelineACKHandler) (*pipelineEventCB, error) { - mode := noACKMode - if handler.ACKCount != nil { - mode = countACKMode - } - if handler.ACKEvents != nil { - if mode != noACKMode { - return nil, errors.New("only one callback can be set") - } - mode = eventsACKMode - } - if handler.ACKLastEvents != nil { - if mode != noACKMode { - return nil, errors.New("only one callback can be set") - } - mode = lastEventsACKMode - } - - // yay, no work - if mode == noACKMode { - return nil, nil - } - - cb := &pipelineEventCB{ - acks: make(chan int), - mode: mode, - handler: handler, - events: make(chan eventsDataMsg), - droppedEvents: make(chan eventsDataMsg), - } - go cb.worker() - return cb, nil -} - -func (p *pipelineEventCB) close() { - close(p.done) -} - -// reportEvents sends a batch of ACKed events to the ACKer. -// The events array contains send and dropped events. The `acked` counters -// indicates the total number of events acked by the pipeline. -// That is, the number of dropped events is given by `len(events) - acked`. -// A client can report events with acked=0, iff the client has no waiting events -// in the pipeline (ACK ordering requirements) -// -// Note: the call blocks, until the ACK handler has collected all active events -// from all clients. This ensure an ACK event being fully 'captured' -// by the pipeline, before receiving/processing another ACK event. -// In the meantime the queue has the chance of batching-up more ACK events, -// such that only one ACK event is being reported to the pipeline handler -func (p *pipelineEventCB) onEvents(data []interface{}, acked int) { - p.pushMsg(eventsDataMsg{data: data, total: len(data), acked: acked}) -} - -func (p *pipelineEventCB) onCounts(total, acked int) { - p.pushMsg(eventsDataMsg{total: total, acked: acked}) -} - -func (p *pipelineEventCB) pushMsg(msg eventsDataMsg) { - if msg.acked == 0 { - p.droppedEvents <- msg - } else { - msg.sig = make(chan struct{}) - p.events <- msg - <-msg.sig - } -} - -// Starts a new ACKed event. -func (p *pipelineEventCB) reportQueueACK(acked int) { - p.acks <- acked -} - -func (p *pipelineEventCB) worker() { - defer close(p.acks) - defer close(p.events) - defer close(p.droppedEvents) - - for { - select { - case count := <-p.acks: - exit := p.collect(count) - if exit { - return - } - - // short circuit dropped events, but have client block until all events - // have been processed by pipeline ack handler - case msg := <-p.droppedEvents: - p.reportEventsData(msg.data, msg.total) - if msg.sig != nil { - close(msg.sig) - } - - case <-p.done: - return - } - } -} - -func (p *pipelineEventCB) collect(count int) (exit bool) { - var ( - signalers []chan struct{} - data []interface{} - acked int - total int - ) - - for acked < count { - var msg eventsDataMsg - select { - case msg = <-p.events: - case msg = <-p.droppedEvents: - case <-p.done: - exit = true - return - } - - if msg.sig != nil { - signalers = append(signalers, msg.sig) - } - total += msg.total - acked += msg.acked - - if count-acked < 0 { - panic("ack count mismatch") - } - - switch p.mode { - case eventsACKMode: - data = append(data, msg.data...) - - case lastEventsACKMode: - if L := len(msg.data); L > 0 { - data = append(data, msg.data[L-1]) - } - } - } - - // signal clients we processed all active ACKs, as reported by queue - for _, sig := range signalers { - close(sig) - } - p.reportEventsData(data, total) - return -} - -func (p *pipelineEventCB) reportEventsData(data []interface{}, total int) { - // report ACK back to the beat - switch p.mode { - case countACKMode: - p.handler.ACKCount(total) - case eventsACKMode: - p.handler.ACKEvents(data) - case lastEventsACKMode: - p.handler.ACKLastEvents(data) - } -} diff --git a/libbeat/publisher/pipeline/stress/gen.go b/libbeat/publisher/pipeline/stress/gen.go index 149278304ea..71bedc70fdc 100644 --- a/libbeat/publisher/pipeline/stress/gen.go +++ b/libbeat/publisher/pipeline/stress/gen.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/acker" "github.com/elastic/beats/v7/libbeat/common/atomic" ) @@ -68,9 +69,9 @@ func generate( logger := logp.NewLogger("publisher_pipeline_stress_generate") if config.ACK { - settings.ACKCount = func(n int) { + settings.ACKHandler = acker.Counting(func(n int) { logger.Infof("Pipeline client (%v) ACKS; %v", id, n) - } + }) } if m := config.PublishMode; m != "" { diff --git a/libbeat/publisher/pipetool/pipetool.go b/libbeat/publisher/pipetool/pipetool.go index af734f1d97a..46e77da3b1f 100644 --- a/libbeat/publisher/pipetool/pipetool.go +++ b/libbeat/publisher/pipetool/pipetool.go @@ -20,6 +20,7 @@ package pipetool import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/acker" ) // connectEditPipeline modifies the client configuration using edit before calling @@ -86,6 +87,17 @@ func WithDefaultGuarantees(pipeline beat.PipelineConnector, mode beat.PublishMod }) } +func WithACKer(pipeline beat.PipelineConnector, a beat.ACKer) beat.PipelineConnector { + return WithClientConfigEdit(pipeline, func(cfg beat.ClientConfig) (beat.ClientConfig, error) { + if h := cfg.ACKHandler; h != nil { + cfg.ACKHandler = acker.Combine(a, h) + } else { + cfg.ACKHandler = a + } + return cfg, nil + }) +} + // WithClientWrapper calls wrap on beat.Client instance, after a successful // call to `pipeline.Connect` or `pipeline.ConnectWith`. The wrap function can // wrap the client to provide additional functionality. diff --git a/libbeat/publisher/testing/testing.go b/libbeat/publisher/testing/testing.go index 401aa833c9d..5b5e592d69e 100644 --- a/libbeat/publisher/testing/testing.go +++ b/libbeat/publisher/testing/testing.go @@ -44,10 +44,6 @@ func (pub *TestPublisher) ConnectWith(_ beat.ClientConfig) (beat.Client, error) return pub.client, nil } -func (pub *TestPublisher) SetACKHandler(_ beat.PipelineACKHandler) error { - panic("Not supported") -} - func NewChanClient(bufSize int) *ChanClient { return NewChanClientWith(make(chan beat.Event, bufSize)) } diff --git a/metricbeat/cmd/test/modules.go b/metricbeat/cmd/test/modules.go index 6bf312d17b9..335f3666f61 100644 --- a/metricbeat/cmd/test/modules.go +++ b/metricbeat/cmd/test/modules.go @@ -90,8 +90,3 @@ type publisher struct { func newPublisher() *publisher { return &publisher{pipeline.NewNilPipeline()} } - -// SetACKHandler is a dummy implementation of the ack handler for the test publisher. -func (*publisher) SetACKHandler(beat.PipelineACKHandler) error { - return nil -} diff --git a/winlogbeat/beater/eventlogger.go b/winlogbeat/beater/eventlogger.go index 6dc85d1e140..b17aa7d1394 100644 --- a/winlogbeat/beater/eventlogger.go +++ b/winlogbeat/beater/eventlogger.go @@ -23,10 +23,12 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/acker" "github.com/elastic/beats/v7/libbeat/common/fmtstr" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/beats/v7/libbeat/processors/add_formatted_index" + "github.com/elastic/beats/v7/libbeat/publisher/pipetool" "github.com/elastic/beats/v7/winlogbeat/checkpoint" "github.com/elastic/beats/v7/winlogbeat/eventlog" @@ -81,10 +83,10 @@ func (e *eventLogger) connect(pipeline beat.Pipeline) (beat.Client, error) { Processor: e.processors, KeepNull: e.keepNull, }, - ACKCount: func(n int) { + ACKHandler: acker.Counting(func(n int) { addPublished(api, n) logp.Info("EventLog[%s] successfully published %d events", api, n) - }, + }), }) } @@ -92,13 +94,17 @@ func (e *eventLogger) run( done <-chan struct{}, pipeline beat.Pipeline, state checkpoint.EventLogState, - acker *eventACKer, + eventACKer *eventACKer, ) { api := e.source // Initialize per event log metrics. initMetrics(api.Name()) + pipeline = pipetool.WithACKer(pipeline, acker.EventPrivateReporter(func(_ int, private []interface{}) { + eventACKer.ACKEvents(private) + })) + client, err := e.connect(pipeline) if err != nil { logp.Warn("EventLog[%s] Pipeline error. Failed to connect to publisher pipeline", @@ -155,7 +161,7 @@ func (e *eventLogger) run( continue } - acker.Add(len(records)) + eventACKer.Add(len(records)) for _, lr := range records { client.Publish(lr.ToEvent()) } diff --git a/winlogbeat/beater/winlogbeat.go b/winlogbeat/beater/winlogbeat.go index f057830592c..1215e0cadd8 100644 --- a/winlogbeat/beater/winlogbeat.go +++ b/winlogbeat/beater/winlogbeat.go @@ -133,14 +133,6 @@ func (eb *Winlogbeat) Run(b *beat.Beat) error { // Initialize metrics. initMetrics("total") - // setup global event ACK handler - err := eb.pipeline.SetACKHandler(beat.PipelineACKHandler{ - ACKEvents: acker.ACKEvents, - }) - if err != nil { - return err - } - var wg sync.WaitGroup for _, log := range eb.eventLogs { state, _ := persistedState[log.source.Name()] diff --git a/x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go b/x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go index 9d87cc32a1e..62d54c88b6a 100644 --- a/x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go +++ b/x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go @@ -14,6 +14,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/acker" helper "github.com/elastic/beats/v7/libbeat/common/docker" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/x-pack/dockerlogbeat/pipereader" @@ -38,9 +39,9 @@ func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereade WaitClose: 0, } clientLogger := logp.NewLogger("clientLogReader") - settings.ACKCount = func(n int) { + settings.ACKHandler = acker.Counting(func(n int) { clientLogger.Debugf("Pipeline client ACKS; %v", n) - } + }) settings.PublishMode = beat.DefaultGuarantees client, err := pipeline.ConnectWith(settings) if err != nil { diff --git a/x-pack/filebeat/input/googlepubsub/input.go b/x-pack/filebeat/input/googlepubsub/input.go index 1f445304f38..a9e1b9f7873 100644 --- a/x-pack/filebeat/input/googlepubsub/input.go +++ b/x-pack/filebeat/input/googlepubsub/input.go @@ -19,6 +19,7 @@ import ( "github.com/elastic/beats/v7/filebeat/input" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/acker" "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/common/useragent" "github.com/elastic/beats/v7/libbeat/logp" @@ -92,16 +93,18 @@ func NewInput( // Build outlet for events. in.outlet, err = connector.ConnectWith(cfg, beat.ClientConfig{ - ACKEvents: func(privates []interface{}) { - for _, priv := range privates { - if msg, ok := priv.(*pubsub.Message); ok { - msg.Ack() - in.ackedCount.Inc() - } else { - in.log.Error("Failed ACKing pub/sub event") + ACKHandler: acker.ConnectionOnly( + acker.EventPrivateReporter(func(_ int, privates []interface{}) { + for _, priv := range privates { + if msg, ok := priv.(*pubsub.Message); ok { + msg.Ack() + in.ackedCount.Inc() + } else { + in.log.Error("Failed ACKing pub/sub event") + } } - } - }, + }), + ), }) if err != nil { return nil, err diff --git a/x-pack/filebeat/input/googlepubsub/pubsub_test.go b/x-pack/filebeat/input/googlepubsub/pubsub_test.go index 709cc488fc7..caf4e82e698 100644 --- a/x-pack/filebeat/input/googlepubsub/pubsub_test.go +++ b/x-pack/filebeat/input/googlepubsub/pubsub_test.go @@ -217,7 +217,7 @@ func runTest(t *testing.T, cfg *common.Config, run func(client *pubsub.Client, i runTestWithACKer(t, cfg, ackEvent, run) } -func runTestWithACKer(t *testing.T, cfg *common.Config, acker acker, run func(client *pubsub.Client, input *pubsubInput, out *stubOutleter, t *testing.T)) { +func runTestWithACKer(t *testing.T, cfg *common.Config, onEvent eventHandler, run func(client *pubsub.Client, input *pubsubInput, out *stubOutleter, t *testing.T)) { if !isInDockerIntegTestEnv() { // Don't test goroutines when using our compose.EnsureUp. defer resources.NewGoroutinesChecker().Check(t) @@ -233,7 +233,7 @@ func runTestWithACKer(t *testing.T, cfg *common.Config, acker acker, run func(cl defer close(inputCtx.Done) // Stub outlet for receiving events generated by the input. - eventOutlet := newStubOutlet(acker) + eventOutlet := newStubOutlet(onEvent) defer eventOutlet.Close() connector := channel.ConnectorFunc(func(_ *common.Config, cliCfg beat.ClientConfig) (channel.Outleter, error) { @@ -257,37 +257,32 @@ func newInputContext() input.Context { } } -type acker func(beat.Event, beat.ClientConfig) bool +type eventHandler func(beat.Event, beat.ClientConfig) bool type stubOutleter struct { sync.Mutex - cond *sync.Cond - done bool - Events []beat.Event - clientCfg beat.ClientConfig - acker acker + cond *sync.Cond + done bool + Events []beat.Event + clientCfg beat.ClientConfig + eventHandler eventHandler } -func newStubOutlet(acker acker) *stubOutleter { +func newStubOutlet(onEvent eventHandler) *stubOutleter { o := &stubOutleter{ - acker: acker, + eventHandler: onEvent, } o.cond = sync.NewCond(o) return o } func ackEvent(ev beat.Event, cfg beat.ClientConfig) bool { - switch { - case cfg.ACKCount != nil: - cfg.ACKCount(1) - case cfg.ACKEvents != nil: - evs := [1]interface{}{ev.Private} - cfg.ACKEvents(evs[:]) - case cfg.ACKLastEvent != nil: - cfg.ACKLastEvent(ev.Private) - default: + if cfg.ACKHandler == nil { return false } + + cfg.ACKHandler.AddEvent(ev, true) + cfg.ACKHandler.ACKEvents(1) return true } @@ -327,7 +322,7 @@ func (o *stubOutleter) Done() <-chan struct{} { return nil } func (o *stubOutleter) OnEvent(event beat.Event) bool { o.Lock() defer o.Unlock() - acked := o.acker(event, o.clientCfg) + acked := o.eventHandler(event, o.clientCfg) if acked { o.Events = append(o.Events, event) o.cond.Broadcast() diff --git a/x-pack/filebeat/input/o365audit/input.go b/x-pack/filebeat/input/o365audit/input.go index b60d5d00455..6dbaa3ab2f6 100644 --- a/x-pack/filebeat/input/o365audit/input.go +++ b/x-pack/filebeat/input/o365audit/input.go @@ -17,6 +17,7 @@ import ( "github.com/elastic/beats/v7/filebeat/input" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/acker" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" "github.com/elastic/beats/v7/libbeat/common/useragent" "github.com/elastic/beats/v7/libbeat/logp" @@ -83,15 +84,17 @@ func newInput( var out channel.Outleter out, err = connector.ConnectWith(cfg, beat.ClientConfig{ - ACKLastEvent: func(private interface{}) { - // Errors don't have a cursor. - if cursor, ok := private.(cursor); ok { - log.Debugf("ACKed cursor %+v", cursor) - if err := storage.Save(cursor); err != nil && err != errNoUpdate { - log.Errorf("Error saving state: %v", err) + ACKHandler: acker.ConnectionOnly( + acker.LastEventPrivateReporter(func(_ int, private interface{}) { + // Errors don't have a cursor. + if cursor, ok := private.(cursor); ok { + log.Debugf("ACKed cursor %+v", cursor) + if err := storage.Save(cursor); err != nil && err != errNoUpdate { + log.Errorf("Error saving state: %v", err) + } } - } - }, + }), + ), }) if err != nil { return nil, err diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index b8ebb4d755c..b5201686597 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -31,6 +31,7 @@ import ( "github.com/elastic/beats/v7/filebeat/input" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/acker" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" "github.com/elastic/beats/v7/libbeat/logp" awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" @@ -137,13 +138,15 @@ func NewInput(cfg *common.Config, connector channel.Connector, context input.Con } out, err := connector.ConnectWith(cfg, beat.ClientConfig{ - ACKEvents: func(privates []interface{}) { - for _, private := range privates { - if s3Context, ok := private.(*s3Context); ok { - s3Context.done() + ACKHandler: acker.ConnectionOnly( + acker.EventPrivateReporter(func(_ int, privates []interface{}) { + for _, private := range privates { + if s3Context, ok := private.(*s3Context); ok { + s3Context.done() + } } - } - }, + }), + ), }) if err != nil { return nil, err diff --git a/x-pack/functionbeat/function/core/sync_client.go b/x-pack/functionbeat/function/core/sync_client.go index f599567f710..cc1b0c37f57 100644 --- a/x-pack/functionbeat/function/core/sync_client.go +++ b/x-pack/functionbeat/function/core/sync_client.go @@ -8,7 +8,9 @@ import ( "sync" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common/acker" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/publisher/pipetool" ) // Client implements the interface used by all the functionbeat function, we only implement a synchronous @@ -49,32 +51,10 @@ func NewSyncClient(log *logp.Logger, pipeline beat.Pipeline, cfg beat.ClientConf } s := &SyncClient{log: log.Named("sync client")} - // Proxy any callbacks to the original client. - // - // Notes: it's not supported to have multiple callback defined, but to support any configuration - // we map all of them. - if cfg.ACKCount != nil { - s.ackCount = cfg.ACKCount - cfg.ACKCount = s.onACKCount - } - - if cfg.ACKEvents != nil { - s.ackEvents = cfg.ACKEvents - cfg.ACKEvents = s.onACKEvents - } - - if cfg.ACKLastEvent != nil { - s.ackLastEvent = cfg.ACKLastEvent - cfg.ACKLastEvent = nil - cfg.ACKEvents = s.onACKEvents - } - - // No calls is defined on the target on the config but we still need to track - // the ack to unblock. - hasACK := cfg.ACKCount != nil || cfg.ACKEvents != nil || cfg.ACKLastEvent != nil - if !hasACK { - cfg.ACKCount = s.onACKCount - } + pipeline = pipetool.WithACKer(pipeline, acker.TrackingCounter(func(_, total int) { + log.Debugf("ack callback receives with events count of %d", total) + s.onACK(total) + })) c, err := pipeline.ConnectWith(cfg) if err != nil { @@ -114,28 +94,6 @@ func (s *SyncClient) Wait() { s.wg.Wait() } -// AckEvents receives an array with all the event acked for this client. -func (s *SyncClient) onACKEvents(data []interface{}) { - s.log.Debugf("onACKEvents callback receives with events count of %d", len(data)) - count := len(data) - if count == 0 { - return - } - - s.onACKCount(count) - if s.ackEvents != nil { - s.ackEvents(data) - } - - if s.ackLastEvent != nil { - s.ackLastEvent(data[len(data)-1]) - } -} - -func (s *SyncClient) onACKCount(c int) { - s.log.Debugf("onACKCount callback receives with events count of %d", c) - s.wg.Add(c * -1) - if s.ackCount != nil { - s.ackCount(c) - } +func (s *SyncClient) onACK(n int) { + s.wg.Add(-1 * n) } diff --git a/x-pack/functionbeat/function/core/sync_client_test.go b/x-pack/functionbeat/function/core/sync_client_test.go index 623729525d0..4d5284eeb5c 100644 --- a/x-pack/functionbeat/function/core/sync_client_test.go +++ b/x-pack/functionbeat/function/core/sync_client_test.go @@ -49,15 +49,11 @@ func (d *dummyPipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) return d.client, nil } -func (d *dummyPipeline) SetACKHandler(ackhandler beat.PipelineACKHandler) error { - return nil -} - func TestSyncClient(t *testing.T) { receiver := func(c *dummyClient, sc *SyncClient) { select { case i := <-c.Received: - sc.onACKEvents(make([]interface{}, i)) + sc.onACK(i) return } } @@ -114,8 +110,8 @@ func TestSyncClient(t *testing.T) { select { case <-c.Received: // simulate multiple acks - sc.onACKEvents(make([]interface{}, 5)) - sc.onACKEvents(make([]interface{}, 5)) + sc.onACK(5) + sc.onACK(5) return } }(c, sc) @@ -127,93 +123,3 @@ func TestSyncClient(t *testing.T) { sc.Wait() }) } - -func TestCallbacksPropagation(t *testing.T) { - testCallback := func(done <-chan struct{}, config beat.ClientConfig, events []beat.Event) { - c := newDummyClient() - - pipeline := newDummyPipeline(c) - sc, err := NewSyncClient(nil, pipeline, config) - if !assert.NoError(t, err) { - return - } - defer sc.Close() - - go func(c *dummyClient, sc *SyncClient, events []beat.Event) { - select { - case <-c.Received: - elements := make([]interface{}, len(events)) - for i, e := range events { - elements[i] = e.Private - } - sc.onACKEvents(elements) - return - } - }(c, sc, events) - - err = sc.PublishAll(events) - if !assert.NoError(t, err) { - return - } - - sc.Wait() - select { - case <-done: - } - } - - t.Run("propagate ACKCount", func(t *testing.T) { - done := make(chan struct{}) - - callback := func(count int) { - assert.Equal(t, 2, count) - close(done) - } - - clientConfig := beat.ClientConfig{ - ACKCount: callback, - } - - testCallback(done, clientConfig, make([]beat.Event, 2)) - }) - - t.Run("propagate ACKEvents", func(t *testing.T) { - done := make(chan struct{}) - - callback := func(data []interface{}) { - assert.Equal(t, 2, len(data)) - close(done) - } - - clientConfig := beat.ClientConfig{ - ACKEvents: callback, - } - - testCallback(done, clientConfig, make([]beat.Event, 2)) - }) - - t.Run("propagate ACKLastEvent", func(t *testing.T) { - done := make(chan struct{}) - - type s struct{ test string } - - semaphore := &s{test: "hello"} - - events := []beat.Event{ - beat.Event{}, - beat.Event{ - Private: semaphore, - }, - } - callback := func(data interface{}) { - assert.Equal(t, semaphore, data) - close(done) - } - - clientConfig := beat.ClientConfig{ - ACKLastEvent: callback, - } - - testCallback(done, clientConfig, events) - }) -}