Skip to content

Commit

Permalink
Alternative implementation for closing a beat.Client
Browse files Browse the repository at this point in the history
With the help of this change one can close a beat.Client instance
indirectly, by signaling instead of calling Close(). One still should
use `Close()` on shutdown, so to make sure that runner using the
client for publishing blocks and keeps resources intact if
WaitCloseTimeout has been configured.

The interface CloseRef implements a subset of context.Context, which can
be used to control shutdown. For example filebeat inputs normally run in
a loop reading messages, transforming those into events, and publishing
them to libbeat via a beat.Client instance. If the input accepts
context.Context for cancellation, then the run loop follows this code
pattern:

    func newInput(...) (*input, error) {
        // configure input

        // create context which is close/cancelled in `Close` method
	ctx, cancelFn := ...

	return &input{
	    ctx: ctx,
	    cancel: cancelFn,
	    ...
	}, nil
    }

    func (in *input) Start() {
        in.wg.Add(1)
        go func(){
	    defer in.wg.Done()
	    in.Run()
	}()
    }

    func (in *input) Run() {
	reader := ... // init reader for collection the raw data
	defer reader.Close()

        outlet := connector.ConnectWith(beat.ClientConfig{
            // underlying beat.Client will be closed if ctx is cancelled
            CloseRef: in.ctx,

            Processing: ...

            // give pipeline and ACKer a chance to ACK some inflight events during shutdown
            WaitClose: ...
            ACKEvents: func(private []interface{}) {
	    	for _, p := range private {
			reader.ACK(p)
		}
	    },
        })

	// this blocks until all events have been acked or for a duration of WaitClose
	defer outlet.Close()

        for err := ctx.Err(); err == nil; err = ctx.Err() {
            // Read returns error if ctx is cancelled
            message, err := source.Read(in.ctx, ...)
            if err != nil {
               ...
            }

            // OnEvent blocks if queue is full, but unblocks if ctx is
	    // cancelled.
            outlet.OnEvent(makeEvent(message))
        }
    }

    func (in *input) Close() {
	// cancel context
	// -> reader or outleter unblock
	// -> run loop returns
        in.cancel()
	in.wg.Wait() // wait until run loop returned
    }
  • Loading branch information
urso committed Jul 25, 2019
1 parent d5b8171 commit 794f8f5
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 47 deletions.
9 changes: 9 additions & 0 deletions libbeat/beat/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type ClientConfig struct {

Processing ProcessingConfig

CloseRef CloseRef

// WaitClose sets the maximum duration to wait on ACK, if client still has events
// active non-acknowledged events in the publisher pipeline.
// WaitClose is only effective if one of ACKCount, ACKEvents and ACKLastEvents
Expand Down Expand Up @@ -78,6 +80,13 @@ type ClientConfig struct {
ACKLastEvent func(interface{})
}

// CloseRef allows users to close the client asynchronously.
// A CloseReg implements a subset of function required for context.Context.
type CloseRef interface {
Done() <-chan struct{}
Err() error
}

// ProcessingConfig provides additional event processing settings a client can
// pass to the publisher pipeline on Connect.
type ProcessingConfig struct {
Expand Down
80 changes: 60 additions & 20 deletions libbeat/publisher/pipeline/acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
// All pipeline and client ACK handling support is provided by acker instances.
type acker interface {
close()
wait()
addEvent(event beat.Event, published bool) bool
ackEvents(int)
}
Expand All @@ -40,6 +41,7 @@ type emptyACK struct{}
var nilACKer acker = (*emptyACK)(nil)

func (*emptyACK) close() {}
func (*emptyACK) wait() {}
func (*emptyACK) addEvent(_ beat.Event, _ bool) bool { return true }
func (*emptyACK) ackEvents(_ int) {}

Expand Down Expand Up @@ -68,6 +70,7 @@ func newCountACK(pipeline *Pipeline, fn func(total, acked int)) *countACK {
}

func (a *countACK) close() {}
func (a *countACK) wait() {}
func (a *countACK) addEvent(_ beat.Event, _ bool) bool { return true }
func (a *countACK) ackEvents(n int) {
if a.pipeline.ackActive.Load() {
Expand Down Expand Up @@ -220,6 +223,8 @@ func (a *gapCountACK) close() {
close(a.done)
}

func (a *gapCountACK) wait() {}

func (a *gapCountACK) addEvent(_ beat.Event, published bool) bool {
// if gapList is empty and event is being dropped, forward drop event to ack
// loop worker:
Expand Down Expand Up @@ -313,9 +318,8 @@ func newBoundGapCountACK(
return a
}

func (a *boundGapCountACK) close() {
a.acker.close()
}
func (a *boundGapCountACK) close() { a.acker.close() }
func (a *boundGapCountACK) wait() { a.acker.wait() }

func (a *boundGapCountACK) addEvent(event beat.Event, published bool) bool {
a.sema.inc()
Expand Down Expand Up @@ -361,9 +365,9 @@ func makeCountACK(pipeline *Pipeline, canDrop bool, sema *sema, fn func(int, int
return newCountACK(pipeline, fn)
}

func (a *eventDataACK) close() {
a.acker.close()
}
func (a *eventDataACK) close() { a.acker.close() }

func (a *eventDataACK) wait() { a.acker.wait() }

func (a *eventDataACK) addEvent(event beat.Event, published bool) bool {
a.mutex.Lock()
Expand Down Expand Up @@ -400,37 +404,57 @@ func (a *eventDataACK) onACK(total, acked int) {
type waitACK struct {
acker acker

signal chan struct{}
waitClose time.Duration
signalAll chan struct{} // ack loop notifies `close` that all events have been acked
signalDone chan struct{} // shutdown handler telling `wait` that shutdown has been completed
waitClose time.Duration

active atomic.Bool

// number of active events
events atomic.Uint64

afterClose func()
}

func newWaitACK(acker acker, timeout time.Duration) *waitACK {
func newWaitACK(acker acker, timeout time.Duration, afterClose func()) *waitACK {
return &waitACK{
acker: acker,
signal: make(chan struct{}, 1),
waitClose: timeout,
active: atomic.MakeBool(true),
acker: acker,
signalAll: make(chan struct{}, 1),
signalDone: make(chan struct{}),
waitClose: timeout,
active: atomic.MakeBool(true),
afterClose: afterClose,
}
}

func (a *waitACK) close() {
// TODO: wait for events

a.active.Store(false)
if a.events.Load() > 0 {

if a.events.Load() == 0 {
a.finishClose()
return
}

// start routine to propagate shutdown signals or timeouts to anyone
// being blocked in wait.
go func() {
defer a.finishClose()

select {
case <-a.signal:
case <-a.signalAll:
case <-time.After(a.waitClose):
}
}
}()
}

// close the underlying acker upon exit
func (a *waitACK) finishClose() {
a.acker.close()
a.afterClose()
close(a.signalDone)
}

func (a *waitACK) wait() {
<-a.signalDone
}

func (a *waitACK) addEvent(event beat.Event, published bool) bool {
Expand All @@ -454,6 +478,22 @@ func (a *waitACK) releaseEvents(n int) {

// send done signal, if close is waiting
if !a.active.Load() {
a.signal <- struct{}{}
a.signalAll <- struct{}{}
}
}

// closeACKer simply wraps any other acker. It calls a custom function after
// the underlying acker has been closed.
type closeACKer struct {
acker
afterClose func()
}

func newCloseACKer(a acker, fn func()) acker {
return &closeACKer{acker: a, afterClose: fn}
}

func (a closeACKer) close() {
a.acker.close()
a.afterClose()
}
40 changes: 28 additions & 12 deletions libbeat/publisher/pipeline/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ type client struct {
canDrop bool
reportEvents bool

isOpen atomic.Bool
// Open state, signaling, and sync primitives for coordinating client Close.
isOpen atomic.Bool // set to false during shutdown, such that no new events will be accepted anymore.
closeOnce sync.Once // closeOnce ensure that the client shutdown sequence is only executed once
closeRef beat.CloseRef // extern closeRef for sending a signal that the client should be closed.
done chan struct{} // the done channel will be closed if the closeReg gets closed, or Close is run.

eventer beat.ClientEventer
}
Expand Down Expand Up @@ -135,23 +139,36 @@ func (c *client) publish(e beat.Event) {
}

func (c *client) Close() error {
// first stop ack handling. ACK handler might block (with timeout), waiting
// first stop ack handling. ACK handler might block on wait (with timeout), waiting
// for pending events to be ACKed.
c.doClose()
c.acker.wait()
return nil
}

log := c.logger()
func (c *client) doClose() {
c.closeOnce.Do(func() {
close(c.done)

if !c.isOpen.Swap(false) {
return nil // closed or already closing
}
log := c.logger()

c.onClosing()
c.isOpen.Store(false)
c.onClosing()

log.Debug("client: closing acker")
c.acker.close()
log.Debug("client: closing acker")
c.acker.close() // this must trigger a direct/indirect call to 'unlink'
})
}

// unlink is the final step of closing a client. It must be executed only after
// it is guaranteed that the underlying acker has been closed and will not
// accept any new publish or ACK events.
// This method is normally registered with the ACKer and triggered by it.
func (c *client) unlink() {
log := c.logger()
log.Debug("client: done closing acker")

// finally disconnect client from broker
n := c.producer.Cancel()
n := c.producer.Cancel() // close connection to queue
log.Debugf("client: cancelled %v events", n)

if c.reportEvents {
Expand All @@ -162,7 +179,6 @@ func (c *client) Close() error {
}

c.onClosed()
return nil
}

func (c *client) logger() *logp.Logger {
Expand Down
5 changes: 3 additions & 2 deletions libbeat/publisher/pipeline/client_ack.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (p *Pipeline) makeACKer(
canDrop bool,
cfg *beat.ClientConfig,
waitClose time.Duration,
afterClose func(),
) acker {
var (
bld = p.ackBuilder
Expand All @@ -56,9 +57,9 @@ func (p *Pipeline) makeACKer(
}

if waitClose <= 0 {
return acker
return newCloseACKer(acker, afterClose)
}
return newWaitACK(acker, waitClose)
return newWaitACK(acker, waitClose, afterClose)
}

func lastEventACK(fn func(interface{})) func([]interface{}) {
Expand Down
Loading

0 comments on commit 794f8f5

Please sign in to comment.