Skip to content

Commit

Permalink
Support for specifying ingest pipelines in Elasticsearch (#735)
Browse files Browse the repository at this point in the history
Signed-off-by: Aleksandrov Aleksandr <[email protected]>
  • Loading branch information
aa1ex committed Jan 24, 2025
1 parent 43aea22 commit 0593d95
Show file tree
Hide file tree
Showing 8 changed files with 377 additions and 25 deletions.
15 changes: 15 additions & 0 deletions e2e/file_elasticsearch/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
pipelines:
file_elasticsearch:
input:
type: file
persistence_mode: async
watching_dir: SOME_DIR
offsets_file: SOME_FILE
offsets_op: reset
output:
type: elasticsearch
endpoints:
- http://localhost:9200
username: SOME_USERNAME
password: SOME_PASSWORD
index_format: SOME_INDEX
33 changes: 33 additions & 0 deletions e2e/file_elasticsearch/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# https://github.com/elastic/start-local/tree/main
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.17.0
container_name: es-local-test
volumes:
- dev-elasticsearch:/usr/share/elasticsearch/data
ports:
- "127.0.0.1:19200:9200"
environment:
- discovery.type=single-node
- ELASTIC_PASSWORD=elastic
- xpack.security.enabled=true
- xpack.security.http.ssl.enabled=false
- xpack.license.self_generated.type=trial
- xpack.ml.use_auto_machine_memory_percent=true
- "_JAVA_OPTIONS=-XX:UseSVE=0"
ulimits:
memlock:
soft: -1
hard: -1
healthcheck:
test:
[
"CMD-SHELL",
"curl --output /dev/null --silent --head --fail -u elastic:elastic http://elasticsearch:19200",
]
interval: 5s
timeout: 5s
retries: 10

volumes:
dev-elasticsearch:
76 changes: 76 additions & 0 deletions e2e/file_elasticsearch/file_elasticsearch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package file_elasticsearch

import (
"fmt"
"math/rand"
"os"
"path"
"path/filepath"
"testing"
"time"

"github.com/ozontech/file.d/cfg"
"github.com/stretchr/testify/require"
)

// This test verifies that messages sent to Elasticsearch are correctly processed by the ingest pipeline
// and that each message is assigned a 'processed_at' field containing a timestamp.

// Config for file-elasticsearch plugin e2e test
type Config struct {
Count int
Endpoint string
Pipeline string
Username string
Password string
dir string
index string
}

// Configure sets additional fields for input and output plugins
func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string) {
c.dir = t.TempDir()
offsetsDir := t.TempDir()

input := conf.Pipelines[pipelineName].Raw.Get("input")
input.Set("watching_dir", c.dir)
input.Set("filename_pattern", "messages.log")
input.Set("offsets_file", filepath.Join(offsetsDir, "offsets.yaml"))

output := conf.Pipelines[pipelineName].Raw.Get("output")
c.index = fmt.Sprintf("my-index-%d", rand.Intn(1000))
output.Set("index_format", c.index)
output.Set("pipeline", c.Pipeline)
output.Set("username", c.Username)
output.Set("password", c.Password)
output.Set("endpoints", []string{c.Endpoint})

err := createIngestPipeline(c.Endpoint, c.Pipeline, c.Username, c.Password)
require.NoError(t, err)
}

// Send creates file and writes messages
func (c *Config) Send(t *testing.T) {
file, err := os.Create(path.Join(c.dir, "messages.log"))
require.NoError(t, err)
defer func() { _ = file.Close() }()

for i := 0; i < c.Count; i++ {
_, err = file.WriteString("{\"message\":\"test\"}\n")
require.NoError(t, err)
}
}

// Validate waits for the message processing to complete
func (c *Config) Validate(t *testing.T) {
err := waitUntilIndexReady(c.Endpoint, c.index, c.Username, c.Password, c.Count, 10, 250*time.Millisecond)
require.NoError(t, err)
docs, err := getDocumentsFromIndex(c.Endpoint, c.index, c.Username, c.Password)
require.NoError(t, err)
require.Len(t, docs, c.Count)
for _, doc := range docs {
if _, ok := doc["processed_at"]; !ok {
t.Errorf("doc %v doesn't have processed_at field", doc)
}
}
}
177 changes: 177 additions & 0 deletions e2e/file_elasticsearch/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package file_elasticsearch

import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"
)

func createIngestPipeline(elasticURL, pipelineID, username, password string) error {
url := fmt.Sprintf("%s/_ingest/pipeline/%s", elasticURL, pipelineID)

pipelineBody := map[string]interface{}{
"description": "test ingest pipeline",
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "processed_at",
"value": "{{_ingest.timestamp}}",
},
},
},
}

body, err := json.Marshal(pipelineBody)
if err != nil {
return fmt.Errorf("failed to marshal body: %w", err)
}

req, err := http.NewRequest(http.MethodPut, url, bytes.NewBuffer(body))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}

req.Header.Set("Content-Type", "application/json")
if username != "" && password != "" {
req.SetBasicAuth(username, password)
}

client := &http.Client{Timeout: time.Second}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to make HTTP request: %w", err)
}

respBody, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read body response: %w", err)
}
_ = resp.Body.Close()

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
return fmt.Errorf("unexpected status: %d, body: %s", resp.StatusCode, string(respBody))
}

return nil
}

func getDocumentsFromIndex(elasticURL, indexName, username, password string) ([]map[string]interface{}, error) {
url := fmt.Sprintf("%s/%s/_search", elasticURL, indexName)

body := []byte(`{"query":{"match_all":{}}}`)

req, err := http.NewRequest(http.MethodPost, url, strings.NewReader(string(body)))
if err != nil {
return nil, fmt.Errorf("failed to create HTTP request: %w", err)
}

req.Header.Set("Content-Type", "application/json")
if username != "" && password != "" {
req.SetBasicAuth(username, password)
}

client := &http.Client{Timeout: time.Second}
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to make HTTP request: %w", err)
}

respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("error reading response: %w", err)
}
_ = resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status: %d, response: %s", resp.StatusCode, string(respBody))
}

var result map[string]interface{}
if err := json.Unmarshal(respBody, &result); err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}

var resultDocs []map[string]interface{}

if hits, ok := result["hits"].(map[string]interface{}); ok {
if docs, ok := hits["hits"].([]interface{}); ok {
for _, doc := range docs {
mappedDoc, ok := doc.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("unexpected document structure")
}
source, ok := mappedDoc["_source"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("_source field has unexpected structure")
}
resultDocs = append(resultDocs, source)
}
}
} else {
return nil, fmt.Errorf("unexpected response structure")
}

return resultDocs, nil
}

func waitUntilIndexReady(elasticURL, indexName, username, password string, minDocs, retries int, delay time.Duration) error {
client := &http.Client{
Timeout: time.Second,
}

for i := 0; i < retries; i++ {
url := fmt.Sprintf("%s/%s/_count", elasticURL, indexName)
req, err := http.NewRequest(http.MethodGet, url, http.NoBody)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}

req.Header.Set("Content-Type", "application/json")
if username != "" && password != "" {
req.SetBasicAuth(username, password)
}

resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to make request: %w", err)
}

if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusServiceUnavailable {
_ = resp.Body.Close()
time.Sleep(delay)
continue
}

if resp.StatusCode != http.StatusOK {
_ = resp.Body.Close()
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to reade response: %w", err)
}
_ = resp.Body.Close()

var result map[string]interface{}
if err := json.Unmarshal(body, &result); err != nil {
return fmt.Errorf("failed to decode response: %w", err)
}

if count, ok := result["count"].(float64); ok {
if int(count) >= minDocs {
return nil
}
} else {
return fmt.Errorf("unexpected response structure")
}

time.Sleep(delay)
}

return fmt.Errorf("index '%s' did not meet conditions after %d retries", indexName, retries)
}
24 changes: 18 additions & 6 deletions e2e/start_work_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@ package e2e_test

import (
"context"
"log"
"strconv"
"testing"
"time"

"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/e2e/file_clickhouse"
"github.com/ozontech/file.d/e2e/file_elasticsearch"
"github.com/ozontech/file.d/e2e/file_file"
"github.com/ozontech/file.d/e2e/http_file"
"github.com/ozontech/file.d/e2e/join_throttle"
"github.com/ozontech/file.d/e2e/kafka_auth"
"github.com/ozontech/file.d/e2e/kafka_file"
"github.com/ozontech/file.d/e2e/split_join"
"log"
"strconv"
"testing"
"time"

"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/fd"
_ "github.com/ozontech/file.d/plugin/action/add_file_name"
_ "github.com/ozontech/file.d/plugin/action/add_host"
Expand Down Expand Up @@ -143,6 +144,17 @@ func TestE2EStabilityWorkCase(t *testing.T) {
e2eTest: &file_clickhouse.Config{},
cfgPath: "./file_clickhouse/config.yml",
},
{
name: "file_elasticsearch",
e2eTest: &file_elasticsearch.Config{
Count: 10,
Pipeline: "test-ingest-pipeline",
Endpoint: "http://localhost:19200",
Username: "elastic",
Password: "elastic",
},
cfgPath: "./file_elasticsearch/config.yml",
},
}

for num, test := range testsList {
Expand Down
6 changes: 6 additions & 0 deletions plugin/output/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,5 +151,11 @@ After a non-retryable write error, fall with a non-zero exit code or not

<br>

**`pipeline`** *`string`*

The name of the ingest pipeline to write events to.

<br>


<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
Loading

0 comments on commit 0593d95

Please sign in to comment.