Skip to content

Commit

Permalink
Split big batches in ES output plugin (#708)
Browse files Browse the repository at this point in the history
* Retry 413

* Add e2e test

---------

Co-authored-by: george pogosyan <[email protected]>
  • Loading branch information
goshansmails and george pogosyan authored Dec 26, 2024
1 parent 6a1c63c commit ac86bc2
Show file tree
Hide file tree
Showing 8 changed files with 284 additions and 5 deletions.
22 changes: 22 additions & 0 deletions e2e/file_es/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Password for the 'elastic' user (at least 6 characters)
ELASTIC_PASSWORD=password

# Version of Elastic products
STACK_VERSION=8.16.1

# Set the cluster name
CLUSTER_NAME=docker-cluster

# Set to 'basic' or 'trial' to automatically start the 30-day trial
LICENSE=basic
#LICENSE=trial

# Port to expose Elasticsearch HTTP API to the host
ES_PORT=9200
#ES_PORT=127.0.0.1:9200

# Increase or decrease based on the available host memory (in bytes)
MEM_LIMIT=1073741824

# Project namespace (defaults to the current folder name if not set)
#COMPOSE_PROJECT_NAME=myproject
19 changes: 19 additions & 0 deletions e2e/file_es/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
pipelines:
file_es:
input:
type: file
output:
type: elasticsearch
batch_flush_timeout: 200ms
batch_size: 500 * 1
connection_timeout: 30s
endpoints:
- http://localhost:9200
fatal_on_failed_insert: true
strict: false
index_format: index_name
retry: 1
retention: 1s
workers_count: 1
username: elastic
password: password
37 changes: 37 additions & 0 deletions e2e/file_es/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
version: "3"

services:
es01:
image: elasticsearch:${STACK_VERSION}
volumes:
- esdata01:/usr/share/elasticsearch/data
ports:
- ${ES_PORT}:9200
environment:
- node.name=es01
- cluster.name=${CLUSTER_NAME}
- cluster.initial_master_nodes=es01
- ELASTIC_PASSWORD=${ELASTIC_PASSWORD}
- bootstrap.memory_lock=true
- xpack.security.enabled=false
- xpack.license.self_generated.type=${LICENSE}
- xpack.ml.use_auto_machine_memory_percent=true
- http.max_content_length=128b
mem_limit: ${MEM_LIMIT}
ulimits:
memlock:
soft: -1
hard: -1
healthcheck:
test:
[
"CMD-SHELL",
"curl -s http://localhost:9200 | grep -q 'missing authentication credentials'",
]
interval: 10s
timeout: 10s
retries: 120

volumes:
esdata01:
driver: local
154 changes: 154 additions & 0 deletions e2e/file_es/file_es.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package file_es

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path"
"path/filepath"
"testing"
"time"

"github.com/ozontech/file.d/cfg"
"github.com/stretchr/testify/require"
)

type Config struct {
ctx context.Context
cancel func()

inputDir string
}

func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string) {
c.ctx, c.cancel = context.WithTimeout(context.Background(), time.Minute*2)

c.inputDir = t.TempDir()
offsetsDir := t.TempDir()

input := conf.Pipelines[pipelineName].Raw.Get("input")
input.Set("watching_dir", c.inputDir)
input.Set("filename_pattern", "input.log")
input.Set("offsets_file", filepath.Join(offsetsDir, "offsets.yaml"))
}

const (
n = 10
successEvent = `{"field_a":"AAAA","field_b":"BBBB"}`
failEvent = `{"field_a":"AAAA","field_b":"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"}`
)

func (c *Config) Send(t *testing.T) {
file, err := os.Create(path.Join(c.inputDir, "input.log"))
require.NoError(t, err)
defer func() {
_ = file.Close()
}()

for i := 0; i < n; i++ {
err = addEvent(file, successEvent)
require.NoError(t, err)
}

err = addEvent(file, failEvent)
require.NoError(t, err)

for i := 0; i < 2*n-1; i++ {
err = addEvent(file, successEvent)
require.NoError(t, err)
}

_ = file.Sync()
}

func addEvent(f *os.File, s string) error {
_, err := f.WriteString(s + "\n")
return err
}

func (c *Config) Validate(t *testing.T) {
time.Sleep(5 * time.Second)

count, err := c.getEventsCount()
require.NoError(t, err)
require.Equal(t, n, count)

err = c.deleteAll()
require.NoError(t, err)
}

func (c *Config) deleteAll() error {
client := &http.Client{Timeout: 3 * time.Second}

req, err := http.NewRequest(http.MethodDelete, "http://127.0.0.1:9200/index_name", http.NoBody)
if err != nil {
return fmt.Errorf("create request: %w", err)
}
req.Header.Add("Authorization", "elastic:password")

resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("do request: %w", err)
}
defer func() {
_ = resp.Body.Close()
}()

respBody, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("read all: %w", err)
}

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("wrong status code; status = %d; body = %s", resp.StatusCode, respBody)
}

return nil
}

type searchResp struct {
Hits struct {
Total struct {
Value int `json:"value"`
Relation string `json:"relation"`
}
} `json:"hits"`
}

func (c *Config) getEventsCount() (int, error) {
client := &http.Client{Timeout: 3 * time.Second}

req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:9200/index_name/_search", http.NoBody)
if err != nil {
return 0, fmt.Errorf("create request: %w", err)
}
req.Header.Add("Authorization", "elastic:password")

resp, err := client.Do(req)
if err != nil {
return 0, fmt.Errorf("do request: %w", err)
}
defer func() {
_ = resp.Body.Close()
}()

respBody, err := io.ReadAll(resp.Body)
if err != nil {
return 0, fmt.Errorf("read all: %w", err)
}

if resp.StatusCode != http.StatusOK {
return 0, fmt.Errorf("wrong status code; status = %d; body = %s", resp.StatusCode, respBody)
}

var respData searchResp
err = json.Unmarshal(respBody, &respData)
if err != nil {
return 0, err
}

return respData.Hits.Total.Value, nil
}
1 change: 1 addition & 0 deletions e2e/http_file/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pipelines:
http_file:
input:
type: http
address: ":9201"
meta:
remote_addr: "{{ .remote_addr }}"
user_agent: '{{ index (index .request.Header "User-Agent") 0}}'
Expand Down
2 changes: 1 addition & 1 deletion e2e/http_file/http_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (c *Config) Send(t *testing.T) {
cl := http.Client{}
for j := 0; j < c.Lines; j++ {
rd := bytes.NewReader(samples[j%len(samples)])
req, err := http.NewRequest(http.MethodPost, "http://localhost:9200/?login=e2e-test", rd)
req, err := http.NewRequest(http.MethodPost, "http://localhost:9201/?login=e2e-test", rd)
require.NoError(t, err)
_, err = cl.Do(req)
require.NoError(t, err)
Expand Down
6 changes: 6 additions & 0 deletions e2e/start_work_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/e2e/file_clickhouse"
"github.com/ozontech/file.d/e2e/file_es"
"github.com/ozontech/file.d/e2e/file_file"
"github.com/ozontech/file.d/e2e/http_file"
"github.com/ozontech/file.d/e2e/join_throttle"
Expand Down Expand Up @@ -143,6 +144,11 @@ func TestE2EStabilityWorkCase(t *testing.T) {
e2eTest: &file_clickhouse.Config{},
cfgPath: "./file_clickhouse/config.yml",
},
{
name: "file_es",
e2eTest: &file_es.Config{},
cfgPath: "./file_es/config.yml",
},
}

for num, test := range testsList {
Expand Down
48 changes: 44 additions & 4 deletions plugin/output/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type Plugin struct {
batcher *pipeline.RetriableBatcher
avgEventSize int

begin []int

time string
headerPrefix string
cancel context.CancelFunc
Expand Down Expand Up @@ -221,6 +223,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
p.registerMetrics(params.MetricCtl)
p.mu = &sync.Mutex{}
p.headerPrefix = `{"` + p.config.BatchOpType + `":{"_index":"`
p.begin = make([]int, 0, p.config.BatchSize_+1)

if len(p.config.IndexValues) == 0 {
p.config.IndexValues = append(p.config.IndexValues, "@time")
Expand Down Expand Up @@ -347,16 +350,18 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err
data.outBuf = make([]byte, 0, p.config.BatchSize_*p.avgEventSize)
}

eventsCount := 0
p.begin = p.begin[:0]
data.outBuf = data.outBuf[:0]
batch.ForEach(func(event *pipeline.Event) {
eventsCount++
p.begin = append(p.begin, len(data.outBuf))
data.outBuf = p.appendEvent(data.outBuf, event)
})
p.begin = append(p.begin, len(data.outBuf))

statusCode, err := p.client.DoTimeout(http.MethodPost, NDJSONContentType, data.outBuf,
p.config.ConnectionTimeout_, p.reportESErrors)

statusCode, err := p.saveOrSplit(0, eventsCount, p.begin, data.outBuf)
if err != nil {
p.sendErrorMetric.WithLabelValues(strconv.Itoa(statusCode)).Inc()
switch statusCode {
case http.StatusBadRequest, http.StatusRequestEntityTooLarge:
const errMsg = "can't send to the elastic, non-retryable error occurred"
Expand All @@ -375,6 +380,41 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err
return nil
}

func (p *Plugin) saveOrSplit(left int, right int, begin []int, data []byte) (int, error) {
if left == right {
return http.StatusOK, nil
}

statusCode, err := p.client.DoTimeout(
http.MethodPost,
NDJSONContentType,
data[begin[left]:begin[right]],
p.config.ConnectionTimeout_, p.reportESErrors)

if err != nil {
p.sendErrorMetric.WithLabelValues(strconv.Itoa(statusCode)).Inc()
switch statusCode {
case http.StatusRequestEntityTooLarge:
// can't save even one log
if right-left == 1 {
return statusCode, err
}

middle := (left + right) / 2
statusCode, err = p.saveOrSplit(left, middle, begin, data)
if err != nil {
return statusCode, err
}

return p.saveOrSplit(middle, right, begin, data)
default:
return statusCode, err
}
}

return http.StatusOK, nil
}

func (p *Plugin) appendEvent(outBuf []byte, event *pipeline.Event) []byte {
// index command
outBuf = p.appendIndexName(outBuf, event)
Expand Down

0 comments on commit ac86bc2

Please sign in to comment.