Skip to content

Commit

Permalink
Add Pipeline registration and add default pipelines. (#1258)
Browse files Browse the repository at this point in the history
* Disable pipeline registration and application by default.
* If enabled, register default pipelines on APM Server `run` or `setup`.
* Add user_agent pipeline as default pipeline.
* Add system tests and log entries.
* Add pipeline definition to packaging.

implements #1032
  • Loading branch information
simitt authored Aug 13, 2018
1 parent a9b302c commit fc51041
Show file tree
Hide file tree
Showing 17 changed files with 505 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ https://github.com/elastic/apm-server/compare/6.4\...master[View commits]
=== Added

- Provide basic server information at `/` {pull}1197[1197]
- Add pipeline registration and pipeline usage {pull}1258[1258]

[[release-notes-6.4]]
== APM Server version 6.4
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,6 @@ are-kibana-objects-updated: python-env
@$(MAKE) clean update
@$(PYTHON_ENV)/bin/python ./script/are_kibana_saved_objects_updated.py ${BEATS_VERSION}

.PHONY: register-pipelines
register-pipelines: update ${BEAT_NAME}
${BEAT_GOPATH}/src/${BEAT_PATH}/${BEAT_NAME} setup --pipelines
29 changes: 27 additions & 2 deletions _meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,26 @@ apm-server:
# Set to false to disable the metrics endpoint
#enabled: true

# A pipeline is a definition of processors applied to documents when writing them to Elasticsearch.
# Using pipelines involves two steps:
# (1) registering a pipeline
# (2) applying a pipeline during data ingestion (see `output.elasticsearch.pipelines`)
#
# You can manually register pipelines, or use this configuration option to ensure
# pipelines are loaded and registered at the configured Elasticsearch instances.
# Automatic pipeline registration requires
# * `output.elasticsearch` to be enabled and configured.
# * having the required Elasticsearch Processor Plugins installed.
# APM Server default pipelines require you to have the `Ingest User Agent Plugin` installed.
# Find the default pipeline configuration at `ingest/pipeline/definition.json`.
#
#register.ingest.pipeline:
# Registers pipeline definitions in Elasticsearch on APM Server startup. Defaults to false.
#enabled: false

# Overwrites existing pipeline definitions in Elasticsearch. Defaults to true.
#overwrite: true

#================================ General ======================================

# Internal queue configuration for buffering events to be published.
Expand Down Expand Up @@ -374,8 +394,13 @@ output.elasticsearch:
when.contains:
processor.event: "onboarding"

# Optional ingest node pipeline. By default no pipeline will be used.
#pipeline: ""
# A pipeline is a definition of processors applied to documents when writing them to Elasticsearch.
# APM Server comes with a default pipeline definition, located at `ingets/pipeline/definition.json`.
# Pipelines are disabled by default. To make use of them you have to:
# (1) ensure pipelines are registered in Elasticsearch, see `apm-server.register.ingest.pipeline`
# (2) enable the following:
#pipelines:
#- pipeline: "apm_user_agent"

# Optional HTTP Path
#path: "/elasticsearch"
Expand Down
29 changes: 27 additions & 2 deletions apm-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,26 @@ apm-server:
# Set to false to disable the metrics endpoint
#enabled: true

# A pipeline is a definition of processors applied to documents when writing them to Elasticsearch.
# Using pipelines involves two steps:
# (1) registering a pipeline
# (2) applying a pipeline during data ingestion (see `output.elasticsearch.pipelines`)
#
# You can manually register pipelines, or use this configuration option to ensure
# pipelines are loaded and registered at the configured Elasticsearch instances.
# Automatic pipeline registration requires
# * `output.elasticsearch` to be enabled and configured.
# * having the required Elasticsearch Processor Plugins installed.
# APM Server default pipelines require you to have the `Ingest User Agent Plugin` installed.
# Find the default pipeline configuration at `ingest/pipeline/definition.json`.
#
#register.ingest.pipeline:
# Registers pipeline definitions in Elasticsearch on APM Server startup. Defaults to false.
#enabled: false

# Overwrites existing pipeline definitions in Elasticsearch. Defaults to true.
#overwrite: true

#================================ General ======================================

# Internal queue configuration for buffering events to be published.
Expand Down Expand Up @@ -374,8 +394,13 @@ output.elasticsearch:
when.contains:
processor.event: "onboarding"

# Optional ingest node pipeline. By default no pipeline will be used.
#pipeline: ""
# A pipeline is a definition of processors applied to documents when writing them to Elasticsearch.
# APM Server comes with a default pipeline definition, located at `ingets/pipeline/definition.json`.
# Pipelines are disabled by default. To make use of them you have to:
# (1) ensure pipelines are registered in Elasticsearch, see `apm-server.register.ingest.pipeline`
# (2) enable the following:
#pipelines:
#- pipeline: "apm_user_agent"

# Optional HTTP Path
#path: "/elasticsearch"
Expand Down
38 changes: 37 additions & 1 deletion beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ import (

"github.com/elastic/apm-agent-go"
"github.com/elastic/apm-agent-go/transport"
"github.com/elastic/apm-server/ingest/pipeline"
"github.com/elastic/apm-server/pipelistener"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs/elasticsearch"
)

type beater struct {
Expand Down Expand Up @@ -63,7 +65,7 @@ func New(b *beat.Beat, ucfg *common.Config) (beat.Beater, error) {
}
if b.Config != nil && beaterConfig.RumConfig.SourceMapping.EsConfig == nil {
// fall back to elasticsearch output configuration for sourcemap storage if possible
if b.Config.Output.Name() == "elasticsearch" {
if isElasticsearchOutput(b) {
logger.Info("Falling back to elasticsearch output for sourcemap storage")
beaterConfig.setSmapElasticsearch(b.Config.Output.Config())
} else {
Expand All @@ -77,6 +79,17 @@ func New(b *beat.Beat, ucfg *common.Config) (beat.Beater, error) {
stopped: false,
logger: logger,
}

if isElasticsearchOutput(b) && beaterConfig.Register.Ingest.Pipeline.isEnabled() {
logger.Info("Registering pipeline callback.")
err := bt.registerPipelineCallback(b)
if err != nil {
return nil, err
}
} else {
logger.Info("No pipeline callback registered")
}

return bt, nil
}

Expand Down Expand Up @@ -207,6 +220,29 @@ func initTracer(info beat.Info, config *Config, logger *logp.Logger) (*elasticap
return tracer, lis, nil
}

func isElasticsearchOutput(b *beat.Beat) bool {
return b.Config != nil && b.Config.Output.Name() == "elasticsearch"
}

func (bt *beater) registerPipelineCallback(b *beat.Beat) error {
overwrite := bt.config.Register.Ingest.Pipeline.shouldOverwrite()
path := bt.config.Register.Ingest.Pipeline.Path

// ensure setup cmd is working properly
b.OverwritePipelinesCallback = func(esConfig *common.Config) error {
esClient, err := elasticsearch.NewConnectedClient(esConfig)
if err != nil {
return err
}
return pipeline.RegisterPipelines(esClient, overwrite, path)
}
// ensure pipelines are registered when new ES connection is established.
elasticsearch.RegisterConnectCallback(func(esClient *elasticsearch.Client) error {
return pipeline.RegisterPipelines(esClient, overwrite, path)
})
return nil
}

// Graceful shutdown
func (bt *beater) Stop() {
bt.logger.Infof("stopping apm-server... waiting maximum of %v seconds for queues to drain",
Expand Down
38 changes: 36 additions & 2 deletions beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"path/filepath"
"testing"
"time"

Expand All @@ -47,8 +48,7 @@ import (
)

func TestBeatConfig(t *testing.T) {
falsy := false
truthy := true
falsy, truthy := false, true

tests := []struct {
conf map[string]interface{}
Expand Down Expand Up @@ -98,6 +98,15 @@ func TestBeatConfig(t *testing.T) {
"metrics": map[string]interface{}{
"enabled": false,
},
"register": map[string]interface{}{
"ingest": map[string]interface{}{
"pipeline": map[string]interface{}{
"enabled": true,
"overwrite": false,
"path": filepath.Join("tmp", "definition.json"),
},
},
},
},
beaterConf: &Config{
Host: "localhost:3000",
Expand Down Expand Up @@ -142,6 +151,15 @@ func TestBeatConfig(t *testing.T) {
Enabled: &falsy,
},
ConcurrentRequests: 15,
Register: &registerConfig{
Ingest: &ingestConfig{
Pipeline: &pipelineConfig{
Enabled: &truthy,
Overwrite: &falsy,
Path: filepath.Join("tmp", "definition.json"),
},
},
},
},
msg: "Given config overwrites default",
},
Expand Down Expand Up @@ -175,6 +193,13 @@ func TestBeatConfig(t *testing.T) {
},
"library_pattern": "rum",
},
"register": map[string]interface{}{
"ingest": map[string]interface{}{
"pipeline": map[string]interface{}{
"enabled": false,
},
},
},
},
beaterConf: &Config{
Host: "localhost:3000",
Expand Down Expand Up @@ -223,6 +248,15 @@ func TestBeatConfig(t *testing.T) {
Enabled: &truthy,
},
ConcurrentRequests: 5,
Register: &registerConfig{
Ingest: &ingestConfig{
Pipeline: &pipelineConfig{
Enabled: &falsy,
Overwrite: &truthy,
Path: filepath.Join("ingest", "pipeline", "definition.json"),
},
},
},
},
msg: "Given config merged with default",
},
Expand Down
33 changes: 33 additions & 0 deletions beater/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package beater

import (
"net"
"path/filepath"
"regexp"
"time"

Expand Down Expand Up @@ -47,6 +48,7 @@ type Config struct {
SelfInstrumentation *InstrumentationConfig `config:"instrumentation"`
RumConfig *rumConfig `config:"rum"`
FrontendConfig *rumConfig `config:"frontend"`
Register *registerConfig `config:"register"`
}

type ExpvarConfig struct {
Expand All @@ -69,6 +71,20 @@ type metricsConfig struct {
Enabled *bool `config:"enabled"`
}

type registerConfig struct {
Ingest *ingestConfig `config:"ingest"`
}

type ingestConfig struct {
Pipeline *pipelineConfig `config:"pipeline"`
}

type pipelineConfig struct {
Enabled *bool `config:"enabled"`
Overwrite *bool `config:"overwrite"`
Path string
}

type SourceMapping struct {
Cache *Cache `config:"cache"`
IndexPattern string `config:"index_pattern"`
Expand Down Expand Up @@ -117,6 +133,14 @@ func (s *SourceMapping) isSetup() bool {
return s != nil && (s.EsConfig != nil)
}

func (c *pipelineConfig) isEnabled() bool {
return c != nil && (c.Enabled != nil && *c.Enabled)
}

func (c *pipelineConfig) shouldOverwrite() bool {
return c != nil && (c.Overwrite != nil && *c.Overwrite)
}

func (c *Config) SetRumConfig() {
if c.RumConfig != nil && c.RumConfig.Enabled != nil {
return
Expand Down Expand Up @@ -173,6 +197,7 @@ func defaultRum(beatVersion string) *rumConfig {

func defaultConfig(beatVersion string) *Config {
metricsEnabled := true
pipelineEnabled, pipelineOverwrite := false, true
return &Config{
Host: net.JoinHostPort("localhost", defaultPort),
MaxUnzippedSize: 30 * 1024 * 1024, // 30mb
Expand All @@ -194,5 +219,13 @@ func defaultConfig(beatVersion string) *Config {
},
FrontendConfig: defaultRum(beatVersion),
RumConfig: defaultRum(beatVersion),
Register: &registerConfig{
Ingest: &ingestConfig{
Pipeline: &pipelineConfig{
Enabled: &pipelineEnabled,
Overwrite: &pipelineOverwrite,
Path: filepath.Join("ingest", "pipeline", "definition.json"),
}},
},
}
}
Loading

0 comments on commit fc51041

Please sign in to comment.