diff --git a/outputs/elasticsearch/api.go b/outputs/elasticsearch/api.go index 305e9bfe6e75..02e8b08494d6 100644 --- a/outputs/elasticsearch/api.go +++ b/outputs/elasticsearch/api.go @@ -16,7 +16,9 @@ const ( ) type Elasticsearch struct { - Url string + Url string + Username string + Password string client *http.Client } @@ -55,7 +57,7 @@ func (r QueryResult) String() string { } // Create a connection to Elasticsearch -func NewElasticsearch(url string) *Elasticsearch { +func NewElasticsearch(url string, username string, password string) *Elasticsearch { es := Elasticsearch{ Url: DefaultElasticsearchUrl, client: &http.Client{}, @@ -63,6 +65,8 @@ func NewElasticsearch(url string) *Elasticsearch { if url != es.Url { es.Url = url } + es.Username = username + es.Password = password return &es } @@ -139,6 +143,11 @@ func (es *Elasticsearch) Request(method string, url string, return nil, err } + req.Header.Add("Accept", "application/json") + if es.Username != "" || es.Password != "" { + req.SetBasicAuth(es.Username, es.Password) + } + resp, err := es.client.Do(req) if err != nil { return nil, err diff --git a/outputs/elasticsearch/api_test.go b/outputs/elasticsearch/api_test.go index eee83ec44f23..49e7379d45e1 100644 --- a/outputs/elasticsearch/api_test.go +++ b/outputs/elasticsearch/api_test.go @@ -75,7 +75,7 @@ func TestIndex(t *testing.T) { t.Skip("Skipping in short mode, because it requires Elasticsearch") } - es := NewElasticsearch("http://localhost:9200") + es := NewElasticsearch("http://localhost:9200", "", "") index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid()) diff --git a/outputs/elasticsearch/bulkapi_test.go b/outputs/elasticsearch/bulkapi_test.go index 9edeb6112964..72b93060f2cb 100644 --- a/outputs/elasticsearch/bulkapi_test.go +++ b/outputs/elasticsearch/bulkapi_test.go @@ -15,7 +15,7 @@ func TestBulk(t *testing.T) { if testing.Short() { t.Skip("Skipping in short mode, because it requires Elasticsearch") } - es := NewElasticsearch("http://localhost:9200") + es := NewElasticsearch("http://localhost:9200", "", "") index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid()) ops := []map[string]interface{}{ @@ -69,7 +69,7 @@ func TestEmptyBulk(t *testing.T) { if testing.Short() { t.Skip("Skipping in short mode, because it requires Elasticsearch") } - es := NewElasticsearch("http://localhost:9200") + es := NewElasticsearch("http://localhost:9200", "", "") index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid()) body := make(chan interface{}, 10) @@ -85,7 +85,6 @@ func TestEmptyBulk(t *testing.T) { if resp != nil { t.Errorf("Unexpected response: %s", resp) } - } func TestBulkMoreOperations(t *testing.T) { @@ -95,7 +94,7 @@ func TestBulkMoreOperations(t *testing.T) { if testing.Short() { t.Skip("Skipping in short mode, because it requires Elasticsearch") } - es := NewElasticsearch("http://localhost:9200") + es := NewElasticsearch("http://localhost:9200", "", "") index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid()) ops := []map[string]interface{}{ diff --git a/outputs/elasticsearch/output.go b/outputs/elasticsearch/output.go index 121d0f93666d..96a9ece7cd49 100644 --- a/outputs/elasticsearch/output.go +++ b/outputs/elasticsearch/output.go @@ -30,18 +30,14 @@ type PublishedTopology struct { // Initialize Elasticsearch as output func (out *ElasticsearchOutput) Init(config outputs.MothershipConfig, topology_expire int) error { - url := fmt.Sprintf("http://%s:%d", config.Host, config.Port) - con := NewElasticsearch(url) - out.Conn = con + if len(config.Protocol) == 0 { + config.Protocol = "http" + } - // TODO: - //api.Username = config.Username - //api.Password = config.Password - //api.BasePath = config.Path + url := fmt.Sprintf("%s://%s:%d%s", config.Protocol, config.Host, config.Port, config.Path) - //if config.Protocol != "" { - // api.Protocol = config.Protocol - //} + con := NewElasticsearch(url, config.Username, config.Password) + out.Conn = con if config.Index != "" { out.Index = config.Index diff --git a/outputs/elasticsearch/output_test.go b/outputs/elasticsearch/output_test.go index d9d2f4fe4214..0344f6313617 100644 --- a/outputs/elasticsearch/output_test.go +++ b/outputs/elasticsearch/output_test.go @@ -113,7 +113,7 @@ func TestOneEvent(t *testing.T) { // give control to the other goroutine, otherwise the refresh happens // before the refresh. We should find a better solution for this. - time.Sleep(100 * time.Millisecond) + time.Sleep(200 * time.Millisecond) _, err = elasticsearchOutput.Conn.Refresh(index) if err != nil { @@ -188,7 +188,7 @@ func TestEvents(t *testing.T) { // give control to the other goroutine, otherwise the refresh happens // before the refresh. We should find a better solution for this. - time.Sleep(100 * time.Millisecond) + time.Sleep(200 * time.Millisecond) elasticsearchOutput.Conn.Refresh(index)