Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Composable ACKer #19632

Merged
merged 8 commits into from
Jul 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- `management.ConfigManager` has been renamed to `management.Manager`. {pull}19114[19114]
- `UpdateStatus` has been added to the `management.Manager` interface. {pull}19114[19114]
- Remove `common.MapStrPointer` parameter from `cfgfile.Runnerfactory` interface. {pull}19135[19135]
- Replace `ACKCount`, `ACKEvents`, and `ACKLastEvent` callbacks with `ACKHandler` and interface in `beat.ClientConfig`. {pull}19632[19632]
- Remove global ACK handler support via `SetACKHandler` from publisher pipeline. {pull}19632[19632]

==== Bugfixes

Expand Down
64 changes: 30 additions & 34 deletions filebeat/beater/acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,11 @@ package beater

import (
"github.com/elastic/beats/v7/filebeat/input/file"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/logp"
)

// eventAcker handles publisher pipeline ACKs and forwards
// them to the registrar or directly to the stateless logger.
type eventACKer struct {
stateful statefulLogger
stateless statelessLogger
log *logp.Logger
}

type statefulLogger interface {
Published(states []file.State)
}
Expand All @@ -38,35 +32,37 @@ type statelessLogger interface {
Published(c int) bool
}

func newEventACKer(stateless statelessLogger, stateful statefulLogger) *eventACKer {
return &eventACKer{stateless: stateless, stateful: stateful, log: logp.NewLogger("acker")}
}
// eventAcker handles publisher pipeline ACKs and forwards
// them to the registrar or directly to the stateless logger.
func eventACKer(statelessOut statelessLogger, statefulOut statefulLogger) beat.ACKer {
log := logp.NewLogger("acker")

func (a *eventACKer) ackEvents(data []interface{}) {
stateless := 0
states := make([]file.State, 0, len(data))
for _, datum := range data {
if datum == nil {
stateless++
continue
}
return acker.EventPrivateReporter(func(_ int, data []interface{}) {
stateless := 0
states := make([]file.State, 0, len(data))
for _, datum := range data {
if datum == nil {
stateless++
continue
}

st, ok := datum.(file.State)
if !ok {
stateless++
continue
}
st, ok := datum.(file.State)
if !ok {
stateless++
continue
}

states = append(states, st)
}
states = append(states, st)
}

if len(states) > 0 {
a.log.Debugw("stateful ack", "count", len(states))
a.stateful.Published(states)
}
if len(states) > 0 {
log.Debugw("stateful ack", "count", len(states))
statefulOut.Published(states)
}

if stateless > 0 {
a.log.Debugw("stateless ack", "count", stateless)
a.stateless.Published(stateless)
}
if stateless > 0 {
log.Debugw("stateless ack", "count", stateless)
statelessOut.Published(stateless)
}
})
}
9 changes: 7 additions & 2 deletions filebeat/beater/acker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/filebeat/input/file"
"github.com/elastic/beats/v7/libbeat/beat"
)

type mockStatefulLogger struct {
Expand Down Expand Up @@ -78,9 +79,13 @@ func TestACKer(t *testing.T) {
sl := &mockStatelessLogger{}
sf := &mockStatefulLogger{}

h := newEventACKer(sl, sf)
h := eventACKer(sl, sf)

h.ackEvents(test.data)
for _, datum := range test.data {
h.AddEvent(beat.Event{Private: datum}, true)
}

h.ACKEvents(len(test.data))
assert.Equal(t, test.stateless, sl.count)
assert.Equal(t, test.stateful, sf.states)
})
Expand Down
24 changes: 14 additions & 10 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,16 +229,20 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
// Make sure all events that were published in
registrarChannel := newRegistrarLogger(registrar)

err = b.Publisher.SetACKHandler(beat.PipelineACKHandler{
ACKEvents: newEventACKer(finishedLogger, registrarChannel).ackEvents,
})
if err != nil {
logp.Err("Failed to install the registry with the publisher pipeline: %v", err)
return err
}

fb.pipeline = pipetool.WithDefaultGuarantees(b.Publisher, beat.GuaranteedSend)
fb.pipeline = withPipelineEventCounter(fb.pipeline, wgEvents)
// setup event counting for startup and a global common ACKer, such that all events will be
// routed to the reigstrar after they've been ACKed.
// Events with Private==nil or the type of private != file.State are directly
// forwarded to `finishedLogger`. Events from the `logs` input will first be forwarded
// to the registrar via `registrarChannel`, which finally forwards the events to finishedLogger as well.
// The finishedLogger decrements the counters in wgEvents after all events have been securely processed
// by the registry.
fb.pipeline = withPipelineEventCounter(b.Publisher, wgEvents)
fb.pipeline = pipetool.WithACKer(fb.pipeline, eventACKer(finishedLogger, registrarChannel))

// Filebeat by default required infinite retry. Let's configure this for all
// inputs by default. Inputs (and InputController) can overwrite the sending
// guarantees explicitly when connecting with the pipeline.
fb.pipeline = pipetool.WithDefaultGuarantees(fb.pipeline, beat.GuaranteedSend)

outDone := make(chan struct{}) // outDone closes down all active pipeline connections
pipelineConnector := channel.NewOutletFactory(outDone).Create
Expand Down
15 changes: 9 additions & 6 deletions filebeat/input/kafka/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/common/backoff"
"github.com/elastic/beats/v7/libbeat/common/kafka"
"github.com/elastic/beats/v7/libbeat/logp"
Expand Down Expand Up @@ -69,13 +70,15 @@ func NewInput(
}

out, err := connector.ConnectWith(cfg, beat.ClientConfig{
ACKEvents: func(events []interface{}) {
for _, event := range events {
if meta, ok := event.(eventMeta); ok {
meta.handler.ack(meta.message)
ACKHandler: acker.ConnectionOnly(
acker.EventPrivateReporter(func(_ int, events []interface{}) {
for _, event := range events {
if meta, ok := event.(eventMeta); ok {
meta.handler.ack(meta.message)
}
}
}
},
}),
),
CloseRef: doneChannelContext(inputContext.Done),
WaitClose: config.WaitClose,
})
Expand Down
11 changes: 6 additions & 5 deletions filebeat/input/v2/input-cursor/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/logp"
)

Expand Down Expand Up @@ -145,8 +146,8 @@ func (inp *managedInput) runSource(
}()

client, err := pipeline.ConnectWith(beat.ClientConfig{
CloseRef: ctx.Cancelation,
ACKEvents: newInputACKHandler(ctx.Logger),
CloseRef: ctx.Cancelation,
ACKHandler: newInputACKHandler(ctx.Logger),
})
if err != nil {
return err
Expand Down Expand Up @@ -174,8 +175,8 @@ func (inp *managedInput) createSourceID(s Source) string {
return fmt.Sprintf("%v::%v", inp.manager.Type, s.Name())
}

func newInputACKHandler(log *logp.Logger) func([]interface{}) {
return func(private []interface{}) {
func newInputACKHandler(log *logp.Logger) beat.ACKer {
return acker.EventPrivateReporter(func(acked int, private []interface{}) {
var n uint
var last int
for i := 0; i < len(private); i++ {
Expand All @@ -196,5 +197,5 @@ func newInputACKHandler(log *logp.Logger) func([]interface{}) {
return
}
private[last].(*updateOp).Execute(n)
}
})
}
22 changes: 7 additions & 15 deletions filebeat/input/v2/input-cursor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,24 +434,16 @@ func TestManager_InputsRun(t *testing.T) {
defer cancel()

// setup publishing pipeline and capture ACKer, so we can simulate progress in the Output
var acker func([]interface{})
var activeEventPrivate []interface{}

ackEvents := func(n int) {
data, rest := activeEventPrivate[:n], activeEventPrivate[n:]
activeEventPrivate = rest
acker(data)
}

var acker beat.ACKer
var wgACKer sync.WaitGroup
wgACKer.Add(1)
pipeline := &pubtest.FakeConnector{
ConnectFunc: func(cfg beat.ClientConfig) (beat.Client, error) {
defer wgACKer.Done()
acker = cfg.ACKEvents
acker = cfg.ACKHandler
return &pubtest.FakeClient{
PublishFunc: func(event beat.Event) {
activeEventPrivate = append(activeEventPrivate, event.Private)
acker.AddEvent(event, true)
},
}, nil
},
Expand All @@ -478,19 +470,19 @@ func TestManager_InputsRun(t *testing.T) {
require.Equal(t, nil, store.snapshot()["test::key"].Cursor)

// ACK first 2 events and check snapshot state
ackEvents(2)
acker.ACKEvents(2)
require.Equal(t, "test-cursor-state2", store.snapshot()["test::key"].Cursor)

// ACK 1 events and check snapshot state (3 events published)
ackEvents(1)
acker.ACKEvents(1)
require.Equal(t, "test-cursor-state3", store.snapshot()["test::key"].Cursor)

// ACK event without cursor update and check snapshot state not modified
ackEvents(1)
acker.ACKEvents(1)
require.Equal(t, "test-cursor-state3", store.snapshot()["test::key"].Cursor)

// ACK rest
ackEvents(3)
acker.ACKEvents(3)
require.Equal(t, "test-cursor-state6", store.snapshot()["test::key"].Cursor)
})
}
Expand Down
28 changes: 12 additions & 16 deletions journalbeat/beater/journalbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ import (
"github.com/elastic/beats/v7/journalbeat/input"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"

"github.com/elastic/beats/v7/journalbeat/config"
_ "github.com/elastic/beats/v7/journalbeat/include"
Expand Down Expand Up @@ -67,7 +69,7 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {

var inputs []*input.Input
for _, c := range config.Inputs {
i, err := input.New(c, b, done, cp.States())
i, err := input.New(c, b.Info, done, cp.States())
if err != nil {
return nil, err
}
Expand All @@ -91,34 +93,28 @@ func (bt *Journalbeat) Run(b *beat.Beat) error {
bt.logger.Info("journalbeat is running! Hit CTRL-C to stop it.")
defer bt.logger.Info("journalbeat is stopping")

err := bt.pipeline.SetACKHandler(beat.PipelineACKHandler{
ACKLastEvents: func(data []interface{}) {
for _, datum := range data {
if st, ok := datum.(checkpoint.JournalState); ok {
bt.checkpoint.PersistState(st)
}
}
},
})
if err != nil {
return err
}
defer bt.checkpoint.Shutdown()

pipeline := pipetool.WithACKer(b.Publisher, acker.LastEventPrivateReporter(func(_ int, private interface{}) {
if st, ok := private.(checkpoint.JournalState); ok {
bt.checkpoint.PersistState(st)
}
}))

var wg sync.WaitGroup
for _, i := range bt.inputs {
wg.Add(1)
go bt.runInput(i, &wg)
go bt.runInput(i, &wg, pipeline)
}

wg.Wait()

return nil
}

func (bt *Journalbeat) runInput(i *input.Input, wg *sync.WaitGroup) {
func (bt *Journalbeat) runInput(i *input.Input, wg *sync.WaitGroup, pipeline beat.Pipeline) {
defer wg.Done()
i.Run()
i.Run(pipeline)
}

// Stop stops the beat and its inputs.
Expand Down
15 changes: 7 additions & 8 deletions journalbeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/elastic/beats/v7/libbeat/processors/add_formatted_index"

"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"

"github.com/elastic/beats/v7/journalbeat/checkpoint"
Expand All @@ -38,7 +39,6 @@ type Input struct {
readers []*reader.Reader
done chan struct{}
config Config
pipeline beat.Pipeline
client beat.Client
states map[string]checkpoint.JournalState
logger *logp.Logger
Expand All @@ -49,7 +49,7 @@ type Input struct {
// New returns a new Inout
func New(
c *common.Config,
b *beat.Beat,
info beat.Info,
done chan struct{},
states map[string]checkpoint.JournalState,
) (*Input, error) {
Expand Down Expand Up @@ -104,7 +104,7 @@ func New(
readers = append(readers, r)
}

inputProcessors, err := processorsForInput(b.Info, config)
inputProcessors, err := processorsForInput(info, config)
if err != nil {
return nil, err
}
Expand All @@ -115,7 +115,6 @@ func New(
readers: readers,
done: done,
config: config,
pipeline: b.Publisher,
states: states,
logger: logger,
eventMeta: config.EventMetadata,
Expand All @@ -125,18 +124,18 @@ func New(

// Run connects to the output, collects entries from the readers
// and then publishes the events.
func (i *Input) Run() {
func (i *Input) Run(pipeline beat.Pipeline) {
var err error
i.client, err = i.pipeline.ConnectWith(beat.ClientConfig{
i.client, err = pipeline.ConnectWith(beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
Processing: beat.ProcessingConfig{
EventMetadata: i.eventMeta,
Meta: nil,
Processor: i.processors,
},
ACKCount: func(n int) {
ACKHandler: acker.Counting(func(n int) {
i.logger.Debugw("journalbeat successfully published events", "event.count", n)
},
}),
})
if err != nil {
i.logger.Error("Error connecting to output: %v", err)
Expand Down
Loading