diff --git a/outputs/elasticsearch/api.go b/outputs/elasticsearch/api.go index 2e718343320..305e9bfe6e7 100644 --- a/outputs/elasticsearch/api.go +++ b/outputs/elasticsearch/api.go @@ -7,6 +7,8 @@ import ( "io/ioutil" "net/http" "net/url" + + "github.com/elastic/libbeat/logp" ) const ( @@ -86,7 +88,11 @@ func MakePath(index string, doc_type string, id string) (string, error) { } } else { if len(id) > 0 { - path = fmt.Sprintf("/%s/%s", index, id) + if len(index) > 0 { + path = fmt.Sprintf("/%s/%s", index, id) + } else { + path = fmt.Sprintf("/%s", id) + } } else { path = fmt.Sprintf("/%s", index) } @@ -94,6 +100,21 @@ func MakePath(index string, doc_type string, id string) (string, error) { return path, nil } +func ReadQueryResult(resp http.Response) (*QueryResult, error) { + + defer resp.Body.Close() + obj, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + var result QueryResult + err = json.Unmarshal(obj, &result) + if err != nil { + return nil, err + } + return &result, err +} + // Create a HTTP request to Elaticsearch func (es *Elasticsearch) Request(method string, url string, params map[string]string, body interface{}) (*http.Response, error) { @@ -148,6 +169,7 @@ func (es *Elasticsearch) Index(index string, doc_type string, id string, } else { method = "PUT" } + logp.Debug("output_elasticsearch", "method=%s path=%s", method, path) resp, err := es.Request(method, path, params, body) if err != nil { return nil, err @@ -177,17 +199,23 @@ func (es *Elasticsearch) Refresh(index string) (*QueryResult, error) { return nil, err } - defer resp.Body.Close() - obj, err := ioutil.ReadAll(resp.Body) + return ReadQueryResult(*resp) +} + +// Instantiate an index +func (es *Elasticsearch) CreateIndex(index string) (*QueryResult, error) { + + path, err := MakePath(index, "", "") if err != nil { return nil, err } - var result QueryResult - err = json.Unmarshal(obj, &result) + + resp, err := es.Request("PUT", path, nil, nil) if err != nil { return nil, err } - return &result, err + + return ReadQueryResult(*resp) } // Deletes a typed JSON document from a specific index based on its id. @@ -204,17 +232,7 @@ func (es *Elasticsearch) Delete(index string, doc_type string, id string, params return nil, err } - defer resp.Body.Close() - obj, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, err - } - var result QueryResult - err = json.Unmarshal(obj, &result) - if err != nil { - return nil, err - } - return &result, err + return ReadQueryResult(*resp) } // A search request can be executed purely using a URI by providing request parameters. diff --git a/outputs/elasticsearch/api_test.go b/outputs/elasticsearch/api_test.go index 09ea33f8ca5..eee83ec44f2 100644 --- a/outputs/elasticsearch/api_test.go +++ b/outputs/elasticsearch/api_test.go @@ -47,6 +47,22 @@ func TestMakePath(t *testing.T) { if path != "/twitter/_refresh" { t.Errorf("Wrong path created: %s", path) } + + path, err = MakePath("", "", "_bulk") + if err != nil { + t.Errorf("Fail to create path: %s", err) + } + if path != "/_bulk" { + t.Errorf("Wrong path created: %s", path) + } + path, err = MakePath("twitter", "", "") + if err != nil { + t.Errorf("Fail to create path: %s", err) + } + if path != "/twitter" { + t.Errorf("Wrong path created: %s", path) + } + } func TestIndex(t *testing.T) { @@ -56,7 +72,7 @@ func TestIndex(t *testing.T) { } if testing.Short() { - t.Skip("Skipping topology tests in short mode, because they require Elasticsearch") + t.Skip("Skipping in short mode, because it requires Elasticsearch") } es := NewElasticsearch("http://localhost:9200") diff --git a/outputs/elasticsearch/bulkapi.go b/outputs/elasticsearch/bulkapi.go new file mode 100644 index 00000000000..d0fae3dc54a --- /dev/null +++ b/outputs/elasticsearch/bulkapi.go @@ -0,0 +1,63 @@ +package elasticsearch + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "time" + + "github.com/elastic/libbeat/common" +) + +type BulkMsg struct { + Ts time.Time + Event common.MapStr +} + +func (es *Elasticsearch) Bulk(index string, doc_type string, + params map[string]string, body chan interface{}) (*QueryResult, error) { + + path, err := MakePath(index, doc_type, "_bulk") + if err != nil { + return nil, err + } + + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + for obj := range body { + enc.Encode(obj) + } + + url := es.Url + path + if len(params) > 0 { + url = url + "?" + UrlEncode(params) + } + + req, err := http.NewRequest("POST", url, &buf) + if err != nil { + return nil, err + } + + resp, err := es.client.Do(req) + if err != nil { + return nil, err + } + + defer resp.Body.Close() + obj, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + var result QueryResult + err = json.Unmarshal(obj, &result) + if err != nil { + return nil, err + } + + if resp.StatusCode > 299 { + return &result, fmt.Errorf("ES returned an error: %s", resp.Status) + } + return &result, err +} diff --git a/outputs/elasticsearch/bulkapi_test.go b/outputs/elasticsearch/bulkapi_test.go new file mode 100644 index 00000000000..7d3e5202f69 --- /dev/null +++ b/outputs/elasticsearch/bulkapi_test.go @@ -0,0 +1,158 @@ +package elasticsearch + +import ( + "fmt" + "os" + "testing" + + "github.com/elastic/libbeat/logp" +) + +func TestBulk(t *testing.T) { + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"elasticsearch"}) + } + if testing.Short() { + t.Skip("Skipping in short mode, because it requires Elasticsearch") + } + es := NewElasticsearch("http://localhost:9200") + index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid()) + + ops := []map[string]interface{}{ + map[string]interface{}{ + "index": map[string]interface{}{ + "_index": index, + "_type": "type1", + "_id": "1", + }, + }, + map[string]interface{}{ + "field1": "value1", + }, + } + + body := make(chan interface{}, 10) + for _, op := range ops { + body <- op + } + close(body) + + params := map[string]string{ + "refresh": "true", + } + _, err := es.Bulk(index, "type1", params, body) + if err != nil { + t.Errorf("Bulk() returned error: %s", err) + } + + params = map[string]string{ + "q": "field1:value1", + } + result, err := es.SearchUri(index, "type1", params) + if err != nil { + t.Errorf("SearchUri() returns an error: %s", err) + } + if result.Hits.Total != 1 { + t.Errorf("Wrong number of search results: %d", result.Hits.Total) + } + + _, err = es.Delete(index, "", "", nil) + if err != nil { + t.Errorf("Delete() returns error: %s", err) + } +} + +func TestBulkMoreOperations(t *testing.T) { + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"elasticsearch"}) + } + if testing.Short() { + t.Skip("Skipping in short mode, because it requires Elasticsearch") + } + es := NewElasticsearch("http://localhost:9200") + index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid()) + + ops := []map[string]interface{}{ + map[string]interface{}{ + "index": map[string]interface{}{ + "_index": index, + "_type": "type1", + "_id": "1", + }, + }, + map[string]interface{}{ + "field1": "value1", + }, + map[string]interface{}{ + "delete": map[string]interface{}{ + "_index": index, + "_type": "type1", + "_id": "2", + }, + }, + map[string]interface{}{ + "create": map[string]interface{}{ + "_index": index, + "_type": "type1", + "_id": "3", + }, + }, + map[string]interface{}{ + "field1": "value3", + }, + map[string]interface{}{ + "update": map[string]interface{}{ + "_id": "1", + "_index": index, + "_type": "type1", + }, + }, + map[string]interface{}{ + "doc": map[string]interface{}{ + "field2": "value2", + }, + }, + } + + body := make(chan interface{}, 10) + for _, op := range ops { + body <- op + } + close(body) + + params := map[string]string{ + "refresh": "true", + } + resp, err := es.Bulk(index, "type1", params, body) + if err != nil { + t.Errorf("Bulk() returned error: %s [%s]", err, resp) + return + } + + params = map[string]string{ + "q": "field1:value3", + } + result, err := es.SearchUri(index, "type1", params) + if err != nil { + t.Errorf("SearchUri() returns an error: %s", err) + } + if result.Hits.Total != 1 { + t.Errorf("Wrong number of search results: %d", result.Hits.Total) + } + + params = map[string]string{ + "q": "field2:value2", + } + result, err = es.SearchUri(index, "type1", params) + if err != nil { + t.Errorf("SearchUri() returns an error: %s", err) + } + if result.Hits.Total != 1 { + t.Errorf("Wrong number of search results: %d", result.Hits.Total) + } + + _, err = es.Delete(index, "", "", nil) + if err != nil { + t.Errorf("Delete() returns error: %s", err) + } +} diff --git a/outputs/elasticsearch/output.go b/outputs/elasticsearch/output.go index ccab903100d..d89644ebc75 100644 --- a/outputs/elasticsearch/output.go +++ b/outputs/elasticsearch/output.go @@ -15,8 +15,11 @@ type ElasticsearchOutput struct { Index string TopologyExpire int Conn *Elasticsearch + FlushInterval time.Duration + BulkMaxSize int - TopologyMap map[string]string + TopologyMap map[string]string + sendingQueue chan BulkMsg } type PublishedTopology struct { @@ -51,15 +54,28 @@ func (out *ElasticsearchOutput) Init(config outputs.MothershipConfig, topology_e out.TopologyExpire = topology_expire /*sec*/ * 1000 // millisec } + out.FlushInterval = 1000 * time.Millisecond + if config.Flush_interval != nil { + out.FlushInterval = time.Duration(*config.Flush_interval) * time.Millisecond + } + out.BulkMaxSize = 10000 + if config.BulkMaxSize != nil { + out.BulkMaxSize = *config.BulkMaxSize + } + err := out.EnableTTL() if err != nil { logp.Err("Fail to set _ttl mapping: %s", err) return err } + out.sendingQueue = make(chan BulkMsg, 1000) + go out.SendMessagesGoroutine() + logp.Info("[ElasticsearchOutput] Using Elasticsearch %s", url) logp.Info("[ElasticsearchOutput] Using index pattern [%s-]YYYY.MM.DD", out.Index) logp.Info("[ElasticsearchOutput] Topology expires after %ds", out.TopologyExpire/1000) + logp.Info("[ElasticsearchOutput] Flush interval %s", out.FlushInterval) return nil } @@ -88,6 +104,49 @@ func (out *ElasticsearchOutput) GetNameByIP(ip string) string { return name } +func (out *ElasticsearchOutput) SendMessagesGoroutine() { + flushChannel := make(<-chan time.Time) + + if out.FlushInterval > 0 { + flushTicker := time.NewTicker(out.FlushInterval) + flushChannel = flushTicker.C + } + + bulkChannel := make(chan interface{}, out.BulkMaxSize) + + for { + select { + case msg := <-out.sendingQueue: + index := fmt.Sprintf("%s-%d.%02d.%02d", out.Index, msg.Ts.Year(), msg.Ts.Month(), msg.Ts.Day()) + if out.FlushInterval > 0 { + logp.Debug("output_elasticsearch", "Insert bulk messages") + bulkChannel <- map[string]interface{}{ + "index": map[string]interface{}{ + "_index": index, + "_type": msg.Event["type"].(string), + }, + } + bulkChannel <- msg.Event + } else { + logp.Debug("output_elasticsearch", "Insert a single event") + _, err := out.Conn.Index(index, msg.Event["type"].(string), "", nil, msg.Event) + if err != nil { + logp.Err("Fail to index or update: %s", err) + } + } + case _ = <-flushChannel: + close(bulkChannel) + go func(channel chan interface{}) { + _, err := out.Conn.Bulk("", "", nil, channel) + if err != nil { + logp.Err("Fail to perform many index operations in a single API call: %s", err) + } + }(bulkChannel) + bulkChannel = make(chan interface{}, out.BulkMaxSize) + } + } +} + // Each shipper publishes a list of IPs together with its name to Elasticsearch func (out *ElasticsearchOutput) PublishIPs(name string, localAddrs []string) error { logp.Debug("output_elasticsearch", "Publish IPs %s with expiration time %d", localAddrs, out.TopologyExpire) @@ -151,8 +210,9 @@ func (out *ElasticsearchOutput) UpdateLocalTopologyMap() { // Publish an event func (out *ElasticsearchOutput) PublishEvent(ts time.Time, event common.MapStr) error { - index := fmt.Sprintf("%s-%d.%02d.%02d", out.Index, ts.Year(), ts.Month(), ts.Day()) - _, err := out.Conn.Index(index, event["type"].(string), "", nil, event) - logp.Debug("output_elasticsearch", "Publish event") - return err + out.sendingQueue <- BulkMsg{Ts: ts, Event: event} + + //_, err := out.Conn.Index(index, event["type"].(string), "", nil, event) + logp.Debug("output_elasticsearch", "Publish event: %s", event) + return nil } diff --git a/outputs/elasticsearch/output_test.go b/outputs/elasticsearch/output_test.go index 31f407e980b..d9d2f4fe421 100644 --- a/outputs/elasticsearch/output_test.go +++ b/outputs/elasticsearch/output_test.go @@ -3,6 +3,7 @@ package elasticsearch import ( "fmt" "os" + "strconv" "testing" "time" @@ -14,21 +15,22 @@ import ( const elasticsearchAddr = "localhost" const elasticsearchPort = 9200 -func createElasticsearchConnection() ElasticsearchOutput { +func createElasticsearchConnection(flush_interval int) ElasticsearchOutput { index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid()) var elasticsearchOutput ElasticsearchOutput elasticsearchOutput.Init(outputs.MothershipConfig{ - Enabled: true, - Save_topology: true, - Host: elasticsearchAddr, - Port: elasticsearchPort, - Username: "", - Password: "", - Path: "", - Index: index, - Protocol: "", + Enabled: true, + Save_topology: true, + Host: elasticsearchAddr, + Port: elasticsearchPort, + Username: "", + Password: "", + Path: "", + Index: index, + Protocol: "", + Flush_interval: &flush_interval, }, 10) return elasticsearchOutput @@ -42,9 +44,9 @@ func TestTopologyInES(t *testing.T) { logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"topology", "output_elasticsearch"}) } - elasticsearchOutput1 := createElasticsearchConnection() - elasticsearchOutput2 := createElasticsearchConnection() - elasticsearchOutput3 := createElasticsearchConnection() + elasticsearchOutput1 := createElasticsearchConnection(0) + elasticsearchOutput2 := createElasticsearchConnection(0) + elasticsearchOutput3 := createElasticsearchConnection(0) elasticsearchOutput1.PublishIPs("proxy1", []string{"10.1.0.4"}) elasticsearchOutput2.PublishIPs("proxy2", []string{"10.1.0.9", @@ -80,9 +82,13 @@ func TestOneEvent(t *testing.T) { if testing.Short() { t.Skip("Skipping events publish in short mode, because they require Elasticsearch") } + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"elasticsearch", "output_elasticsearch"}) + } + ts := time.Now() - elasticsearchOutput := createElasticsearchConnection() + elasticsearchOutput := createElasticsearchConnection(0) event := common.MapStr{} event["type"] = "redis" @@ -92,50 +98,62 @@ func TestOneEvent(t *testing.T) { event["dst_port"] = 6379 event["src_ip"] = "192.168.22.2" event["src_port"] = 6378 - event["agent"] = "appserver1" + event["shipper"] = "appserver1" r := common.MapStr{} r["request"] = "MGET key1" r["response"] = "value1" index := fmt.Sprintf("%s-%d.%02d.%02d", elasticsearchOutput.Index, ts.Year(), ts.Month(), ts.Day()) + logp.Debug("output_elasticsearch", "index = %s", index) - es := NewElasticsearch("http://localhost:9200") - - if es == nil { - t.Errorf("Failed to create Elasticsearch connection") - } err := elasticsearchOutput.PublishEvent(ts, event) if err != nil { t.Errorf("Failed to publish the event: %s", err) } - es.Refresh(index) + // give control to the other goroutine, otherwise the refresh happens + // before the refresh. We should find a better solution for this. + time.Sleep(100 * time.Millisecond) + + _, err = elasticsearchOutput.Conn.Refresh(index) + if err != nil { + t.Errorf("Failed to refresh: %s", err) + } + + defer func() { + _, err = elasticsearchOutput.Conn.Delete(index, "", "", nil) + if err != nil { + t.Errorf("Failed to delete index: %s", err) + } + }() params := map[string]string{ - "q": "agent:appserver1", + "q": "shipper:appserver1", } - resp, err := es.SearchUri(index, "", params) + resp, err := elasticsearchOutput.Conn.SearchUri(index, "", params) if err != nil { - t.Errorf("Failed to query elasticsearch: %s", err) + t.Errorf("Failed to query elasticsearch for index(%s): %s", index, err) + return } + logp.Debug("output_elasticsearch", "resp = %s", resp) if resp.Hits.Total != 1 { - t.Errorf("Too many results") + t.Errorf("Wrong number of results: %d", resp.Hits.Total) } - _, err = es.Delete(index, "", "", nil) - if err != nil { - t.Errorf("Failed to delete index: %s", err) - } } func TestEvents(t *testing.T) { if testing.Short() { t.Skip("Skipping events publish in short mode, because they require Elasticsearch") } + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"topology", "output_elasticsearch"}) + } + ts := time.Now() - elasticsearchOutput := createElasticsearchConnection() + elasticsearchOutput := createElasticsearchConnection(0) event := common.MapStr{} event["type"] = "redis" @@ -145,7 +163,7 @@ func TestEvents(t *testing.T) { event["dst_port"] = 6379 event["src_ip"] = "192.168.22.2" event["src_port"] = 6378 - event["agent"] = "appserver1" + event["shipper"] = "appserver1" r := common.MapStr{} r["request"] = "MGET key1" r["response"] = "value1" @@ -153,11 +171,6 @@ func TestEvents(t *testing.T) { index := fmt.Sprintf("%s-%d.%02d.%02d", elasticsearchOutput.Index, ts.Year(), ts.Month(), ts.Day()) - es := NewElasticsearch("http://localhost:9200") - - if es == nil { - t.Errorf("Failed to create Elasticsearch connection") - } err := elasticsearchOutput.PublishEvent(ts, event) if err != nil { t.Errorf("Failed to publish the event: %s", err) @@ -173,24 +186,93 @@ func TestEvents(t *testing.T) { t.Errorf("Failed to publish the event: %s", err) } - es.Refresh(index) + // give control to the other goroutine, otherwise the refresh happens + // before the refresh. We should find a better solution for this. + time.Sleep(100 * time.Millisecond) + + elasticsearchOutput.Conn.Refresh(index) params := map[string]string{ - "q": "agent:appserver1", + "q": "shipper:appserver1", } - resp, err := es.SearchUri(index, "", params) + defer func() { + _, err = elasticsearchOutput.Conn.Delete(index, "", "", nil) + if err != nil { + t.Errorf("Failed to delete index: %s", err) + } + }() + + resp, err := elasticsearchOutput.Conn.SearchUri(index, "", params) if err != nil { t.Errorf("Failed to query elasticsearch: %s", err) } if resp.Hits.Total != 2 { - t.Errorf("Too many results") + t.Errorf("Wrong number of results: %d", resp.Hits.Total) } +} + +func TestBulkEvents(t *testing.T) { + if testing.Short() { + t.Skip("Skipping events publish in short mode, because they require Elasticsearch") + } + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"topology", "output_elasticsearch"}) + } + + ts := time.Now() + + elasticsearchOutput := createElasticsearchConnection(50) + + index := fmt.Sprintf("%s-%d.%02d.%02d", elasticsearchOutput.Index, ts.Year(), ts.Month(), ts.Day()) + + for i := 0; i < 10; i++ { + + event := common.MapStr{} + event["type"] = "redis" + event["status"] = "OK" + event["responsetime"] = 34 + event["dst_ip"] = "192.168.21.1" + event["dst_port"] = 6379 + event["src_ip"] = "192.168.22.2" + event["src_port"] = 6378 + event["shipper"] = "appserver" + strconv.Itoa(i) + r := common.MapStr{} + r["request"] = "MGET key" + strconv.Itoa(i) + r["response"] = "value" + strconv.Itoa(i) + event["redis"] = r + + err := elasticsearchOutput.PublishEvent(ts, event) + if err != nil { + t.Errorf("Failed to publish the event: %s", err) + } - _, err = es.Delete(index, "", "", nil) - if err != nil { - t.Errorf("Failed to delete index: %s", err) } + // give control to the other goroutine, otherwise the refresh happens + // before the refresh. We should find a better solution for this. + time.Sleep(200 * time.Millisecond) + + elasticsearchOutput.Conn.Refresh(index) + + params := map[string]string{ + "q": "type:redis", + } + + defer func() { + _, err := elasticsearchOutput.Conn.Delete(index, "", "", nil) + if err != nil { + t.Errorf("Failed to delete index: %s", err) + } + }() + + resp, err := elasticsearchOutput.Conn.SearchUri(index, "", params) + + if err != nil { + t.Errorf("Failed to query elasticsearch: %s", err) + } + if resp.Hits.Total != 10 { + t.Errorf("Wrong number of results: %d", resp.Hits.Total) + } } diff --git a/outputs/outputs.go b/outputs/outputs.go index c0d0c49e6e5..2d8383c4a0b 100644 --- a/outputs/outputs.go +++ b/outputs/outputs.go @@ -24,7 +24,8 @@ type MothershipConfig struct { Rotate_every_kb int Number_of_files int DataType string - Flush_interval int + Flush_interval *int + BulkMaxSize *int } // Functions to be exported by a output plugin diff --git a/outputs/redis/redis.go b/outputs/redis/redis.go index 764f236dc8a..c69f0600217 100644 --- a/outputs/redis/redis.go +++ b/outputs/redis/redis.go @@ -75,12 +75,12 @@ func (out *RedisOutput) Init(config outputs.MothershipConfig, topology_expire in } out.FlushInterval = 1000 * time.Millisecond - if config.Flush_interval != 0 { - if config.Flush_interval < 0 { + if config.Flush_interval != nil { + if *config.Flush_interval < 0 { out.flush_immediatelly = true logp.Warn("Flushing to REDIS on each push, performance migh be affected") } else { - out.FlushInterval = time.Duration(config.Flush_interval) * time.Millisecond + out.FlushInterval = time.Duration(*config.Flush_interval) * time.Millisecond } }