From 0b389cbd3456a2a2f43694f4a18f9b70cb769950 Mon Sep 17 00:00:00 2001 From: Monica Sarbu Date: Tue, 19 May 2015 15:23:43 +0300 Subject: [PATCH] Avoid adding to the channel up to the maximum(#10) The problem appears when the bulkChannel reaches the maximum size, the go-routine blocks and no more data are consumed from the channel. To fix the problem, make sure the bulkChannel doesn't reach the maximum size by consuming the data before the channel is full. --- outputs/elasticsearch/output.go | 25 +++++++++++----- outputs/elasticsearch/output_test.go | 44 +++++++++++++++++----------- 2 files changed, 44 insertions(+), 25 deletions(-) diff --git a/outputs/elasticsearch/output.go b/outputs/elasticsearch/output.go index 96a9ece7cd4..fc168c44fa0 100644 --- a/outputs/elasticsearch/output.go +++ b/outputs/elasticsearch/output.go @@ -100,6 +100,16 @@ func (out *ElasticsearchOutput) GetNameByIP(ip string) string { return name } +func (out *ElasticsearchOutput) InsertBulkMessage(bulkChannel chan interface{}) { + close(bulkChannel) + go func(channel chan interface{}) { + _, err := out.Conn.Bulk("", "", nil, channel) + if err != nil { + logp.Err("Fail to perform many index operations in a single API call: %s", err) + } + }(bulkChannel) +} + func (out *ElasticsearchOutput) SendMessagesGoroutine() { flushChannel := make(<-chan time.Time) @@ -115,7 +125,12 @@ func (out *ElasticsearchOutput) SendMessagesGoroutine() { case msg := <-out.sendingQueue: index := fmt.Sprintf("%s-%d.%02d.%02d", out.Index, msg.Ts.Year(), msg.Ts.Month(), msg.Ts.Day()) if out.FlushInterval > 0 { - logp.Debug("output_elasticsearch", "Insert bulk messages.") + logp.Debug("output_elasticsearch", "Insert %d bulk messages.", len(bulkChannel)) + if len(bulkChannel)+2 > out.BulkMaxSize { + logp.Debug("output_elasticsearch", "Channel size reached. Calling bulk") + out.InsertBulkMessage(bulkChannel) + bulkChannel = make(chan interface{}, out.BulkMaxSize) + } bulkChannel <- map[string]interface{}{ "index": map[string]interface{}{ "_index": index, @@ -131,13 +146,7 @@ func (out *ElasticsearchOutput) SendMessagesGoroutine() { } } case _ = <-flushChannel: - close(bulkChannel) - go func(channel chan interface{}) { - _, err := out.Conn.Bulk("", "", nil, channel) - if err != nil { - logp.Err("Fail to perform many index operations in a single API call: %s", err) - } - }(bulkChannel) + out.InsertBulkMessage(bulkChannel) bulkChannel = make(chan interface{}, out.BulkMaxSize) } } diff --git a/outputs/elasticsearch/output_test.go b/outputs/elasticsearch/output_test.go index 0344f631361..b89e1b5cd6a 100644 --- a/outputs/elasticsearch/output_test.go +++ b/outputs/elasticsearch/output_test.go @@ -15,7 +15,7 @@ import ( const elasticsearchAddr = "localhost" const elasticsearchPort = 9200 -func createElasticsearchConnection(flush_interval int) ElasticsearchOutput { +func createElasticsearchConnection(flush_interval int, bulk_size int) ElasticsearchOutput { index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid()) @@ -31,6 +31,7 @@ func createElasticsearchConnection(flush_interval int) ElasticsearchOutput { Index: index, Protocol: "", Flush_interval: &flush_interval, + BulkMaxSize: &bulk_size, }, 10) return elasticsearchOutput @@ -44,9 +45,9 @@ func TestTopologyInES(t *testing.T) { logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"topology", "output_elasticsearch"}) } - elasticsearchOutput1 := createElasticsearchConnection(0) - elasticsearchOutput2 := createElasticsearchConnection(0) - elasticsearchOutput3 := createElasticsearchConnection(0) + elasticsearchOutput1 := createElasticsearchConnection(0, 0) + elasticsearchOutput2 := createElasticsearchConnection(0, 0) + elasticsearchOutput3 := createElasticsearchConnection(0, 0) elasticsearchOutput1.PublishIPs("proxy1", []string{"10.1.0.4"}) elasticsearchOutput2.PublishIPs("proxy2", []string{"10.1.0.9", @@ -88,7 +89,7 @@ func TestOneEvent(t *testing.T) { ts := time.Now() - elasticsearchOutput := createElasticsearchConnection(0) + elasticsearchOutput := createElasticsearchConnection(0, 0) event := common.MapStr{} event["type"] = "redis" @@ -153,7 +154,7 @@ func TestEvents(t *testing.T) { ts := time.Now() - elasticsearchOutput := createElasticsearchConnection(0) + elasticsearchOutput := createElasticsearchConnection(0, 0) event := common.MapStr{} event["type"] = "redis" @@ -213,18 +214,8 @@ func TestEvents(t *testing.T) { } } -func TestBulkEvents(t *testing.T) { - if testing.Short() { - t.Skip("Skipping events publish in short mode, because they require Elasticsearch") - } - if testing.Verbose() { - logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"topology", "output_elasticsearch"}) - } - +func test_bulk_with_params(t *testing.T, elasticsearchOutput ElasticsearchOutput) { ts := time.Now() - - elasticsearchOutput := createElasticsearchConnection(50) - index := fmt.Sprintf("%s-%d.%02d.%02d", elasticsearchOutput.Index, ts.Year(), ts.Month(), ts.Day()) for i := 0; i < 10; i++ { @@ -271,8 +262,27 @@ func TestBulkEvents(t *testing.T) { if err != nil { t.Errorf("Failed to query elasticsearch: %s", err) + return } if resp.Hits.Total != 10 { t.Errorf("Wrong number of results: %d", resp.Hits.Total) } } + +func TestBulkEvents(t *testing.T) { + if testing.Short() { + t.Skip("Skipping events publish in short mode, because they require Elasticsearch") + } + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"topology", "output_elasticsearch", "elasticsearch"}) + } + + elasticsearchOutput := createElasticsearchConnection(50, 2) + test_bulk_with_params(t, elasticsearchOutput) + + elasticsearchOutput = createElasticsearchConnection(50, 1000) + test_bulk_with_params(t, elasticsearchOutput) + + elasticsearchOutput = createElasticsearchConnection(50, 5) + test_bulk_with_params(t, elasticsearchOutput) +}