diff --git a/outputs/elasticsearch/output.go b/outputs/elasticsearch/output.go index 96a9ece7cd49..fc168c44fa04 100644 --- a/outputs/elasticsearch/output.go +++ b/outputs/elasticsearch/output.go @@ -100,6 +100,16 @@ func (out *ElasticsearchOutput) GetNameByIP(ip string) string { return name } +func (out *ElasticsearchOutput) InsertBulkMessage(bulkChannel chan interface{}) { + close(bulkChannel) + go func(channel chan interface{}) { + _, err := out.Conn.Bulk("", "", nil, channel) + if err != nil { + logp.Err("Fail to perform many index operations in a single API call: %s", err) + } + }(bulkChannel) +} + func (out *ElasticsearchOutput) SendMessagesGoroutine() { flushChannel := make(<-chan time.Time) @@ -115,7 +125,12 @@ func (out *ElasticsearchOutput) SendMessagesGoroutine() { case msg := <-out.sendingQueue: index := fmt.Sprintf("%s-%d.%02d.%02d", out.Index, msg.Ts.Year(), msg.Ts.Month(), msg.Ts.Day()) if out.FlushInterval > 0 { - logp.Debug("output_elasticsearch", "Insert bulk messages.") + logp.Debug("output_elasticsearch", "Insert %d bulk messages.", len(bulkChannel)) + if len(bulkChannel)+2 > out.BulkMaxSize { + logp.Debug("output_elasticsearch", "Channel size reached. Calling bulk") + out.InsertBulkMessage(bulkChannel) + bulkChannel = make(chan interface{}, out.BulkMaxSize) + } bulkChannel <- map[string]interface{}{ "index": map[string]interface{}{ "_index": index, @@ -131,13 +146,7 @@ func (out *ElasticsearchOutput) SendMessagesGoroutine() { } } case _ = <-flushChannel: - close(bulkChannel) - go func(channel chan interface{}) { - _, err := out.Conn.Bulk("", "", nil, channel) - if err != nil { - logp.Err("Fail to perform many index operations in a single API call: %s", err) - } - }(bulkChannel) + out.InsertBulkMessage(bulkChannel) bulkChannel = make(chan interface{}, out.BulkMaxSize) } } diff --git a/outputs/elasticsearch/output_test.go b/outputs/elasticsearch/output_test.go index 0344f6313617..b89e1b5cd6a9 100644 --- a/outputs/elasticsearch/output_test.go +++ b/outputs/elasticsearch/output_test.go @@ -15,7 +15,7 @@ import ( const elasticsearchAddr = "localhost" const elasticsearchPort = 9200 -func createElasticsearchConnection(flush_interval int) ElasticsearchOutput { +func createElasticsearchConnection(flush_interval int, bulk_size int) ElasticsearchOutput { index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid()) @@ -31,6 +31,7 @@ func createElasticsearchConnection(flush_interval int) ElasticsearchOutput { Index: index, Protocol: "", Flush_interval: &flush_interval, + BulkMaxSize: &bulk_size, }, 10) return elasticsearchOutput @@ -44,9 +45,9 @@ func TestTopologyInES(t *testing.T) { logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"topology", "output_elasticsearch"}) } - elasticsearchOutput1 := createElasticsearchConnection(0) - elasticsearchOutput2 := createElasticsearchConnection(0) - elasticsearchOutput3 := createElasticsearchConnection(0) + elasticsearchOutput1 := createElasticsearchConnection(0, 0) + elasticsearchOutput2 := createElasticsearchConnection(0, 0) + elasticsearchOutput3 := createElasticsearchConnection(0, 0) elasticsearchOutput1.PublishIPs("proxy1", []string{"10.1.0.4"}) elasticsearchOutput2.PublishIPs("proxy2", []string{"10.1.0.9", @@ -88,7 +89,7 @@ func TestOneEvent(t *testing.T) { ts := time.Now() - elasticsearchOutput := createElasticsearchConnection(0) + elasticsearchOutput := createElasticsearchConnection(0, 0) event := common.MapStr{} event["type"] = "redis" @@ -153,7 +154,7 @@ func TestEvents(t *testing.T) { ts := time.Now() - elasticsearchOutput := createElasticsearchConnection(0) + elasticsearchOutput := createElasticsearchConnection(0, 0) event := common.MapStr{} event["type"] = "redis" @@ -213,18 +214,8 @@ func TestEvents(t *testing.T) { } } -func TestBulkEvents(t *testing.T) { - if testing.Short() { - t.Skip("Skipping events publish in short mode, because they require Elasticsearch") - } - if testing.Verbose() { - logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"topology", "output_elasticsearch"}) - } - +func test_bulk_with_params(t *testing.T, elasticsearchOutput ElasticsearchOutput) { ts := time.Now() - - elasticsearchOutput := createElasticsearchConnection(50) - index := fmt.Sprintf("%s-%d.%02d.%02d", elasticsearchOutput.Index, ts.Year(), ts.Month(), ts.Day()) for i := 0; i < 10; i++ { @@ -271,8 +262,27 @@ func TestBulkEvents(t *testing.T) { if err != nil { t.Errorf("Failed to query elasticsearch: %s", err) + return } if resp.Hits.Total != 10 { t.Errorf("Wrong number of results: %d", resp.Hits.Total) } } + +func TestBulkEvents(t *testing.T) { + if testing.Short() { + t.Skip("Skipping events publish in short mode, because they require Elasticsearch") + } + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"topology", "output_elasticsearch", "elasticsearch"}) + } + + elasticsearchOutput := createElasticsearchConnection(50, 2) + test_bulk_with_params(t, elasticsearchOutput) + + elasticsearchOutput = createElasticsearchConnection(50, 1000) + test_bulk_with_params(t, elasticsearchOutput) + + elasticsearchOutput = createElasticsearchConnection(50, 5) + test_bulk_with_params(t, elasticsearchOutput) +}