Skip to content

Commit

Permalink
[Elasticsearch] Add Pipeline configuration
Browse files Browse the repository at this point in the history
Signed-off-by: Aleksandr Maus <[email protected]>
  • Loading branch information
aleksmaus committed Sep 1, 2024
1 parent 70f9afe commit 493d48a
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 33 deletions.
1 change: 1 addition & 0 deletions config_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ elasticsearch:
# hostport: "" # http://{domain or ip}:{port}, if not empty, Elasticsearch output is enabled
# index: "falco" # index (default: falco)
# type: "_doc"
# pipeline: "" # optional ingest pipeline name
# minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)
# suffix: "daily" # date suffix for index rotation : daily (default), monthly, annually, none
# mutualtls: false # if true, checkcert flag will be ignored (server cert will always be checked)
Expand Down
3 changes: 2 additions & 1 deletion docs/outputs/elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
| `elasticsearch.hostport` | `ELASTICSEARCH_HOSTPORT` | | http://{domain or ip}:{port}, if not empty, Elasticsearch output is **enabled** |
| `elasticsearch.index` | `ELASTICSEARCH_INDEX` | `falco` | Index |
| `elasticsearch.type` | `ELASTICSEARCH_TYPE` | `_doc` | Index |
| `elasticsearch.pipeline` | `ELASTICSEARCH_PIPELINE` | | Optional ingest pipeline name. Documentation: https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html |
| `elasticsearch.suffix` | `ELASTICSEARCH_SUFFIX` | `daily` | Date suffix for index rotation : `daily`, `monthly`, `annually`, `none` |
| `elasticsearch.apikey` | `ELASTICSEARCH_APIKEY` | | Use this APIKey to authenticate to Elasticsearch |
| `elasticsearch.apikey` | `ELASTICSEARCH_APIKEY` | | Use this APIKey to authenticate to Elasticsearch |
| `elasticsearch.username` | `ELASTICSEARCH_USERNAME` | | Use this username to authenticate to Elasticsearch |
| `elasticsearch.password` | `ELASTICSEARCH_PASSWORD` | | Use this password to authenticate to Elasticsearch |
| `elasticsearch.flattenfields` | `ELASTICSEARCH_FLATTENFIELDS` | `false` | Replace . by _ to avoid mapping conflicts, force to true if `createindextemplate=true` |
Expand Down
7 changes: 5 additions & 2 deletions outputs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,11 @@ const HttpPut = "PUT"

// Client communicates with the different API.
type Client struct {
OutputType string
EndpointURL *url.URL
OutputType string

// FIXME: This causes race condition if outputs overwrite this URL during requests from multiple go routines
EndpointURL *url.URL

ContentType string
ShutDownFunc func()
Config *types.Configuration
Expand Down
43 changes: 28 additions & 15 deletions outputs/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,34 +102,47 @@ func (c *Client) elasticsearchPost(index string, payload []byte, falcoPayloads .
return
}

c.EndpointURL = endpointURL

reqOpt := func(req *http.Request) {
if c.Config.Elasticsearch.ApiKey != "" {
req.Header.Set("Authorization", "APIKey "+c.Config.Elasticsearch.ApiKey)
}
reqOpts := []RequestOptionFunc{
// Set request headers
func(req *http.Request) {
if c.Config.Elasticsearch.ApiKey != "" {
req.Header.Set("Authorization", "APIKey "+c.Config.Elasticsearch.ApiKey)
}

if c.Config.Elasticsearch.Username != "" && c.Config.Elasticsearch.Password != "" {
req.SetBasicAuth(c.Config.Elasticsearch.Username, c.Config.Elasticsearch.Password)
}
if c.Config.Elasticsearch.Username != "" && c.Config.Elasticsearch.Password != "" {
req.SetBasicAuth(c.Config.Elasticsearch.Username, c.Config.Elasticsearch.Password)
}

for i, j := range c.Config.Elasticsearch.CustomHeaders {
req.Header.Set(i, j)
}
for i, j := range c.Config.Elasticsearch.CustomHeaders {
req.Header.Set(i, j)
}
},

// Set the final endpointURL
func(req *http.Request) {
// Append pipeline parameter to the URL if configured
if c.Config.Elasticsearch.Pipeline != "" {
query := endpointURL.Query()
query.Set("pipeline", c.Config.Elasticsearch.Pipeline)
endpointURL.RawQuery = query.Encode()
}
// Set request URL
req.URL = endpointURL
},
}

var response string
if c.Config.Elasticsearch.Batching.Enabled {
// Use PostWithResponse call when batching is enabled in order to capture response body on 200
res, err := c.PostWithResponse(payload, reqOpt)
res, err := c.PostWithResponse(payload, reqOpts...)
if err != nil {
response = err.Error()
} else {
response = res
}
} else {
// Use regular Post call, this avoid parsing response on http status 200
err = c.Post(payload, reqOpt)
err = c.Post(payload, reqOpts...)
if err != nil {
response = err.Error()
}
Expand Down Expand Up @@ -194,7 +207,7 @@ func (c *Client) elasticsearchPost(index string, payload []byte, falcoPayloads .
}
}
log.Printf("[INFO] : %v - %v\n", c.OutputType, "attempt to POST again the payload without the wrong field")
err = c.Post(payload, reqOpt)
err = c.Post(payload, reqOpts...)
if err != nil {
c.setElasticSearchErrorMetrics(sz)
return
Expand Down
33 changes: 18 additions & 15 deletions outputs/sumologic.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,24 @@ func (c *Client) SumoLogicPost(falcopayload types.FalcoPayload) {
return
}

c.EndpointURL = endpointURL

err = c.Post(falcopayload, func(req *http.Request) {
if c.Config.SumoLogic.SourceCategory != "" {
req.Header.Set("X-Sumo-Category", c.Config.SumoLogic.SourceCategory)
}

if c.Config.SumoLogic.SourceHost != "" {
req.Header.Set("X-Sumo-Host", c.Config.SumoLogic.SourceHost)
}

if c.Config.SumoLogic.Name != "" {
req.Header.Set("X-Sumo-Name", c.Config.SumoLogic.Name)
}
})
err = c.Post(falcopayload,
func(req *http.Request) {
if c.Config.SumoLogic.SourceCategory != "" {
req.Header.Set("X-Sumo-Category", c.Config.SumoLogic.SourceCategory)
}

if c.Config.SumoLogic.SourceHost != "" {
req.Header.Set("X-Sumo-Host", c.Config.SumoLogic.SourceHost)
}

if c.Config.SumoLogic.Name != "" {
req.Header.Set("X-Sumo-Name", c.Config.SumoLogic.Name)
}
},
func(req *http.Request) {
req.URL = endpointURL
},
)

if err != nil {
c.setSumoLogicErrorMetrics()
Expand Down
1 change: 1 addition & 0 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ type ElasticsearchOutputConfig struct {
HostPort string
Index string
Type string
Pipeline string
MinimumPriority string
Suffix string
Username string
Expand Down

0 comments on commit 493d48a

Please sign in to comment.