Skip to content

Commit

Permalink
Merge pull request #649 from urso/enh/pub-queues
Browse files Browse the repository at this point in the history
Separate per event and bulk-event queues
  • Loading branch information
ruflin committed Jan 7, 2016
2 parents e22e626 + 6865d1a commit 4304cc7
Show file tree
Hide file tree
Showing 17 changed files with 103 additions and 42 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff]
*Affecting all Beats*
- Fix logging issue with file based output where newlines could be misplaced
during concurrent logging {pull}650[650]
- Reduce memory usage by separate queue sizes for single events and bulk events. {pull}649[649] {issue}516[516]

*Packetbeat*
- Fix setting direction to out and use its value to decide when dropping events if ignore_outgoing is enabled {pull}557[557]
Expand Down
3 changes: 3 additions & 0 deletions filebeat/etc/filebeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,9 @@ shipper:
# refresh_topology_freq. The default is 15 seconds.
#topology_expire: 15

# Internal queue size for single events in processing pipeline
#queue_size: 1000

# Configure local GeoIP database support.
# If no paths are not configured geoip is disabled.
#geoip:
Expand Down
10 changes: 10 additions & 0 deletions libbeat/docs/shipperconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ shipper:
#paths:
# - "/usr/share/GeoIP/GeoLiteCity.dat"
# - "/usr/local/var/GeoIP/GeoLiteCity.dat"
------------------------------------------------------------------------------

==== Options
Expand Down Expand Up @@ -143,6 +144,15 @@ useful in case a Beat stops publishing its IP addresses. The IP addresses
are removed automatically from the topology map after expiration. The default
is 15 seconds.

===== queue_size

Configure internal queue sizes for single events in processing pipeline. Default
value is 1000.

===== bulk_queue_size

(DO NOT TOUCH) Configure internal queue size for bulk events in processing pipeline. Default value is 0.

===== geoip.paths

This configuration option is currently used by Packetbeat only.
Expand Down
3 changes: 3 additions & 0 deletions libbeat/etc/libbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ shipper:
# refresh_topology_freq. The default is 15 seconds.
#topology_expire: 15

# Internal queue size for single events in processing pipeline
#queue_size: 1000

# Configure local GeoIP database support.
# If no paths are not configured geoip is disabled.
#geoip:
Expand Down
11 changes: 5 additions & 6 deletions libbeat/publisher/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,17 @@ const (
defaultBulkSize = 200
)

func newAsyncPublisher(pub *PublisherType) *asyncPublisher {

func newAsyncPublisher(pub *PublisherType, hwm, bulkHWM int) *asyncPublisher {
p := &asyncPublisher{pub: pub}
p.ws.Init()

var outputs []worker
for _, out := range pub.Output {
outputs = append(outputs, asyncOutputer(&p.ws, out))
outputs = append(outputs, asyncOutputer(&p.ws, hwm, bulkHWM, out))
}

p.outputs = outputs
p.messageWorker.init(&pub.wsPublisher, 1000, newPreprocessor(pub, p))
p.messageWorker.init(&pub.wsPublisher, hwm, bulkHWM, newPreprocessor(pub, p))
return p
}

Expand Down Expand Up @@ -67,7 +66,7 @@ func (p *asyncPublisher) PublishEvents(ctx context, events []common.MapStr) bool
return true
}

func asyncOutputer(ws *workerSignal, worker *outputWorker) worker {
func asyncOutputer(ws *workerSignal, hwm, bulkHWM int, worker *outputWorker) worker {
config := worker.config

flushInterval := defaultFlushInterval
Expand All @@ -89,5 +88,5 @@ func asyncOutputer(ws *workerSignal, worker *outputWorker) worker {

debug("create bulk processing worker (interval=%v, bulk size=%v)",
flushInterval, maxBulkSize)
return newBulkWorker(ws, 1000, worker, flushInterval, maxBulkSize)
return newBulkWorker(ws, hwm, bulkHWM, worker, flushInterval, maxBulkSize)
}
33 changes: 19 additions & 14 deletions libbeat/publisher/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type bulkWorker struct {
ws *workerSignal

queue chan message
bulkQueue chan message
flushTicker *time.Ticker

maxBatchSize int
Expand All @@ -20,14 +21,16 @@ type bulkWorker struct {
}

func newBulkWorker(
ws *workerSignal, hwm int, output worker,
ws *workerSignal, hwm int, bulkHWM int,
output worker,
flushInterval time.Duration,
maxBatchSize int,
) *bulkWorker {
b := &bulkWorker{
output: output,
ws: ws,
queue: make(chan message, hwm),
bulkQueue: make(chan message, bulkHWM),
flushTicker: time.NewTicker(flushInterval),
maxBatchSize: maxBatchSize,
events: make([]common.MapStr, 0, maxBatchSize),
Expand All @@ -40,7 +43,11 @@ func newBulkWorker(
}

func (b *bulkWorker) send(m message) {
b.queue <- m
if m.events == nil {
b.queue <- m
} else {
b.bulkQueue <- m
}
}

func (b *bulkWorker) run() {
Expand All @@ -51,16 +58,9 @@ func (b *bulkWorker) run() {
case <-b.ws.done:
return
case m := <-b.queue:
if m.event != nil { // single event
b.onEvent(m.context.signal, m.event)
} else { // batch of events
b.onEvents(m.context.signal, m.events)
}

// buffer full?
if len(b.events) == cap(b.events) {
b.publish()
}
b.onEvent(m.context.signal, m.event)
case m := <-b.bulkQueue:
b.onEvents(m.context.signal, m.events)
case <-b.flushTicker.C:
if len(b.events) > 0 {
b.publish()
Expand All @@ -74,18 +74,21 @@ func (b *bulkWorker) onEvent(signal outputs.Signaler, event common.MapStr) {
if signal != nil {
b.pending = append(b.pending, signal)
}

if len(b.events) == cap(b.events) {
b.publish()
}
}

func (b *bulkWorker) onEvents(signal outputs.Signaler, events []common.MapStr) {
for len(events) > 0 {
// split up bulk to match required bulk sizes.
// If input events have been split up bufferFull will be set and
// bulk request will be published.
bufferFull := false
spaceLeft := cap(b.events) - len(b.events)
consume := len(events)
bufferFull := spaceLeft <= consume
if spaceLeft < consume {
bufferFull = true
consume = spaceLeft
if signal != nil {
// creating cascading signaler chain for
Expand Down Expand Up @@ -113,6 +116,7 @@ func (b *bulkWorker) publish() {
context: context{
signal: outputs.NewCompositeSignaler(b.pending...),
},
event: nil,
events: b.events,
})

Expand All @@ -123,5 +127,6 @@ func (b *bulkWorker) publish() {
func (b *bulkWorker) shutdown() {
b.flushTicker.Stop()
stopQueue(b.queue)
stopQueue(b.bulkQueue)
b.ws.wg.Done()
}
7 changes: 4 additions & 3 deletions libbeat/publisher/bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
flushInterval time.Duration = 10 * time.Millisecond
maxBatchSize = 10
queueSize = 4 * maxBatchSize
bulkQueueSize = 1
)

// Send a single event to the bulkWorker and verify that the event
Expand All @@ -23,7 +24,7 @@ func TestBulkWorkerSendSingle(t *testing.T) {
}
ws := newWorkerSignal()
defer ws.stop()
bw := newBulkWorker(ws, queueSize, mh, flushInterval, maxBatchSize)
bw := newBulkWorker(ws, queueSize, bulkQueueSize, mh, flushInterval, maxBatchSize)

s := newTestSignaler()
m := testMessage(s, testEvent())
Expand All @@ -46,7 +47,7 @@ func TestBulkWorkerSendBatch(t *testing.T) {
}
ws := newWorkerSignal()
defer ws.stop()
bw := newBulkWorker(ws, queueSize, mh, time.Duration(time.Hour), maxBatchSize)
bw := newBulkWorker(ws, queueSize, 0, mh, time.Duration(time.Hour), maxBatchSize)

events := make([]common.MapStr, maxBatchSize)
for i := range events {
Expand Down Expand Up @@ -76,7 +77,7 @@ func TestBulkWorkerSendBatchGreaterThanMaxBatchSize(t *testing.T) {
}
ws := newWorkerSignal()
defer ws.stop()
bw := newBulkWorker(ws, queueSize, mh, flushInterval, maxBatchSize)
bw := newBulkWorker(ws, queueSize, 0, mh, flushInterval, maxBatchSize)

// Send
events := make([]common.MapStr, maxBatchSize+1)
Expand Down
6 changes: 3 additions & 3 deletions libbeat/publisher/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,16 +141,16 @@ func newTestPublisher(bulkSize int, response OutputResponse) *testPublisher {
ow.config.BulkMaxSize = &bulkSize
ow.handler = mh
ws := workerSignal{}
ow.messageWorker.init(&ws, 1000, mh)
ow.messageWorker.init(&ws, defaultChanSize, defaultBulkChanSize, mh)

pub := &PublisherType{
Output: []*outputWorker{ow},
wsOutput: ws,
}
pub.wsOutput.Init()
pub.wsPublisher.Init()
pub.syncPublisher = newSyncPublisher(pub)
pub.asyncPublisher = newAsyncPublisher(pub)
pub.syncPublisher = newSyncPublisher(pub, defaultChanSize, defaultBulkChanSize)
pub.asyncPublisher = newAsyncPublisher(pub, defaultChanSize, defaultBulkChanSize)
return &testPublisher{
pub: pub,
outputMsgHandler: mh,
Expand Down
4 changes: 2 additions & 2 deletions libbeat/publisher/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func newOutputWorker(
out outputs.Outputer,
ws *workerSignal,
hwm int,
bulkHWM int,
) *outputWorker {
maxBulkSize := defaultBulkSize
if config.BulkMaxSize != nil {
Expand All @@ -31,14 +32,13 @@ func newOutputWorker(
config: config,
maxBulkSize: maxBulkSize,
}
o.messageWorker.init(ws, hwm, o)
o.messageWorker.init(ws, hwm, bulkHWM, o)
return o
}

func (o *outputWorker) onStop() {}

func (o *outputWorker) onMessage(m message) {

if m.event != nil {
o.onEvent(&m.context, m.event)
} else {
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestOutputWorker(t *testing.T) {
outputs.MothershipConfig{},
outputer,
newWorkerSignal(),
1)
1, 0)

ow.onStop() // Noop

Expand Down
26 changes: 23 additions & 3 deletions libbeat/publisher/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,22 @@ type ShipperConfig struct {
Topology_expire int
Tags []string
Geoip common.Geoip

// internal publisher queue sizes
QueueSize *int `yaml:"queue_size"`
BulkQueueSize *int `yaml:"bulk_queue_size"`
}

type Topology struct {
Name string `json:"name"`
Ip string `json:"ip"`
}

const (
defaultChanSize = 1000
defaultBulkChanSize = 0
)

func init() {
publishDisabled = flag.Bool("N", false, "Disable actual publishing for testing")
}
Expand Down Expand Up @@ -191,6 +200,16 @@ func (publisher *PublisherType) init(
logp.Info("Dry run mode. All output types except the file based one are disabled.")
}

hwm := defaultChanSize
if shipper.QueueSize != nil && *shipper.QueueSize > 0 {
hwm = *shipper.QueueSize
}

bulkHWM := defaultBulkChanSize
if shipper.BulkQueueSize != nil && *shipper.BulkQueueSize >= 0 {
bulkHWM = *shipper.BulkQueueSize
}

publisher.GeoLite = common.LoadGeoIPData(shipper.Geoip)

publisher.wsOutput.Init()
Expand All @@ -211,7 +230,8 @@ func (publisher *PublisherType) init(
debug("Create output worker")

outputers = append(outputers,
newOutputWorker(config, output, &publisher.wsOutput, 1000))
newOutputWorker(config, output, &publisher.wsOutput,
hwm, bulkHWM))

if !config.Save_topology {
continue
Expand Down Expand Up @@ -289,8 +309,8 @@ func (publisher *PublisherType) init(
go publisher.UpdateTopologyPeriodically()
}

publisher.asyncPublisher = newAsyncPublisher(publisher)
publisher.syncPublisher = newSyncPublisher(publisher)
publisher.asyncPublisher = newAsyncPublisher(publisher, hwm, bulkHWM)
publisher.syncPublisher = newSyncPublisher(publisher, hwm, bulkHWM)

return nil
}
4 changes: 2 additions & 2 deletions libbeat/publisher/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ type syncPublisher struct {

type syncClient func(message) bool

func newSyncPublisher(pub *PublisherType) *syncPublisher {
func newSyncPublisher(pub *PublisherType, hwm, bulkHWM int) *syncPublisher {
s := &syncPublisher{pub: pub}
s.messageWorker.init(&pub.wsPublisher, 1000, newPreprocessor(pub, s))
s.messageWorker.init(&pub.wsPublisher, hwm, bulkHWM, newPreprocessor(pub, s))
return s
}

Expand Down
Loading

0 comments on commit 4304cc7

Please sign in to comment.