Skip to content

Commit

Permalink
Add ID processor (#14524) (#15041)
Browse files Browse the repository at this point in the history
* WIP: Flake ID processor

* Fleshing out implementation of generator

* Rename package

* Unexport const

* Use increment operator

* Adding processor scaffolding

* Fixing default field

* Adding CHANGELOG entry

* Fixing compile errors

* WIP: unit tests

* Fixing byte copy

* Fixing up tests

* Adding test TODOs

* Adding non-default target field unit test

* Adding one more test TODO

* Adding TODO for post-benchmarking

* Introduce type

* Adding unit test for factory

* Adding unit test for mac

* Adding unit test for mac

* Fleshing out remaining mac unit tests

* Adding tests for ES ID generator

* Remove TODO after experimenting with IIFE (perf was worse)

* Moving doc

* Adding UUID processor to list in docs

* Apply suggestions from docs code review

Co-Authored-By: DeDe Morton <[email protected]>

* Adding godoc

* Rename generator function type

* Exporting and adding godoc

* Adding godoc

* Updating godoc

* Adding Unwrap error methods

* Moving ES ID generator into generators package + singleton construction

* Addressing Hound feedback

* Renaming processor to `add_id`

* Updating processor name in CHANGELOG entry

* More refactoring updates

* Fixing more vet errors

* Unexport config struct as it's only used within this package

* Fixing doc anchor

* Moving generator construction to processor constructor; simplifying factory

* Fixing compile error

* Validate ID generator type in config

* Finer-grained locking to reduce mutex contention

* Initialize package global variables that depend on randomness, later

* Compute last timestamp while accounting for system time going backwards

* Simpler and testable timestamp() function

* Adding unit test for timestamp function

* Re-implementing ES timestamp algorithm

* Removing unused variable
  • Loading branch information
ycombinator authored Dec 11, 2019
1 parent ee1ce0c commit f8adf57
Show file tree
Hide file tree
Showing 14 changed files with 887 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add consumer_lag in Kafka consumergroup metricset {pull}14822[14822]
- Make use of consumer_lag in Kafka dashboard {pull}14863[14863]
- Refactor kubernetes autodiscover to enable different resource based discovery {pull}14738[14738]
- Add `add_id` processor. {pull}14524[14524]

*Auditbeat*

Expand Down
6 changes: 6 additions & 0 deletions libbeat/docs/processors-list.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ endif::[]
ifndef::no_add_host_metadata_processor[]
* <<add-host-metadata,`add_host_metadata`>>
endif::[]
ifndef::no_add_id_processor[]
* <<add-id,`add_id`>>
endif::[]
ifndef::no_add_kubernetes_metadata_processor[]
* <<add-kubernetes-metadata,`add_kubernetes_metadata`>>
endif::[]
Expand Down Expand Up @@ -101,6 +104,9 @@ endif::[]
ifndef::no_add_host_metadata_processor[]
include::{libbeat-processors-dir}/add_host_metadata/docs/add_host_metadata.asciidoc[]
endif::[]
ifndef::no_add_id[]
include::{libbeat-processors-dir}/add_id/docs/add_id.asciidoc[]
endif::[]
ifndef::no_add_kubernetes_metadata_processor[]
include::{libbeat-processors-dir}/add_kubernetes_metadata/docs/add_kubernetes_metadata.asciidoc[]
endif::[]
Expand Down
76 changes: 76 additions & 0 deletions libbeat/processors/add_id/add_id.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package add_id

import (
"fmt"

"github.com/elastic/beats/libbeat/processors/add_id/generator"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
jsprocessor "github.com/elastic/beats/libbeat/processors/script/javascript/module/processor"
)

func init() {
processors.RegisterPlugin("add_id", New)
jsprocessor.RegisterPlugin("AddID", New)
}

const processorName = "add_id"

type addID struct {
config config
gen generator.IDGenerator
}

// New constructs a new Add ID processor.
func New(cfg *common.Config) (processors.Processor, error) {
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
return nil, makeErrConfigUnpack(err)
}

gen, err := generator.Factory(config.Type)
if err != nil {
return nil, makeErrComputeID(err)
}

p := &addID{
config,
gen,
}

return p, nil
}

// Run enriches the given event with an ID
func (p *addID) Run(event *beat.Event) (*beat.Event, error) {
id := p.gen.NextID()

if _, err := event.PutValue(p.config.TargetField, id); err != nil {
return nil, makeErrComputeID(err)
}

return event, nil
}

func (p *addID) String() string {
return fmt.Sprintf("%v=[target_field=[%v]]", processorName, p.config.TargetField)
}
65 changes: 65 additions & 0 deletions libbeat/processors/add_id/add_id_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package add_id

import (
"testing"

"github.com/elastic/beats/libbeat/common"

"github.com/elastic/beats/libbeat/beat"

"github.com/stretchr/testify/assert"
)

func TestDefaultTargetField(t *testing.T) {
p, err := New(common.MustNewConfigFrom(nil))
assert.NoError(t, err)

testEvent := &beat.Event{}

newEvent, err := p.Run(testEvent)
assert.NoError(t, err)

v, err := newEvent.GetValue("@metadata.id")
assert.NoError(t, err)
assert.NotEmpty(t, v)
}

func TestNonDefaultTargetField(t *testing.T) {
cfg := common.MustNewConfigFrom(common.MapStr{
"target_field": "foo",
})
p, err := New(cfg)
assert.NoError(t, err)

testEvent := &beat.Event{
Fields: common.MapStr{},
}

newEvent, err := p.Run(testEvent)
assert.NoError(t, err)

v, err := newEvent.GetValue("foo")
assert.NoError(t, err)
assert.NotEmpty(t, v)

v, err = newEvent.GetValue("@metadata.id")
assert.NoError(t, err)
assert.Empty(t, v)
}
44 changes: 44 additions & 0 deletions libbeat/processors/add_id/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package add_id

import (
"github.com/elastic/beats/libbeat/processors/add_id/generator"
)

// configuration for Add ID processor.
type config struct {
TargetField string `config:"target_field"` // Target field for the ID
Type string `config:"type"` // Type of ID
}

func defaultConfig() config {
return config{
TargetField: "@metadata.id",
Type: "elasticsearch",
}
}

func (c *config) Validate() error {
// Validate type of ID generator
if !generator.Exists(c.Type) {
return makeErrUnknownType(c.Type)
}

return nil
}
18 changes: 18 additions & 0 deletions libbeat/processors/add_id/docs/add_id.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[[add-id]]
=== Generate an ID for an event

The `add_id` processor generates a unique ID for an event.

[source,yaml]
-----------------------------------------------------
processors:
- add_id: ~
-----------------------------------------------------

The following settings are supported:

`target_field`:: (Optional) Field where the generated ID will be stored. Default is `@metadata.id`.

`type`:: (Optional) Type of ID to generate. Currently only `elasticsearch` is supported and is the default.
The `elasticsearch` type generates IDs using the same algorithm that Elasticsearch uses for auto-generating
document IDs.
55 changes: 55 additions & 0 deletions libbeat/processors/add_id/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package add_id

import (
"fmt"
)

type (
errConfigUnpack struct{ cause error }
errComputeID struct{ cause error }
errUnknownType struct{ typ string }
)

func makeErrConfigUnpack(cause error) errConfigUnpack {
return errConfigUnpack{cause}
}
func (e errConfigUnpack) Error() string {
return fmt.Sprintf("failed to unpack %v processor configuration: %v", processorName, e.cause)
}
func (e errConfigUnpack) Unwrap() error {
return e.cause
}

func makeErrComputeID(cause error) errComputeID {
return errComputeID{cause}
}
func (e errComputeID) Error() string {
return fmt.Sprintf("failed to compute ID: %v", e.cause)
}
func (e errComputeID) Unwrap() error {
return e.cause
}

func makeErrUnknownType(typ string) errUnknownType {
return errUnknownType{typ}
}
func (e errUnknownType) Error() string {
return fmt.Sprintf("invalid type [%s]", e.typ)
}
33 changes: 33 additions & 0 deletions libbeat/processors/add_id/generator/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package generator

import (
"fmt"
)

type (
errUnknownType struct{ typ string }
)

func makeErrUnknownType(typ string) errUnknownType {
return errUnknownType{typ}
}
func (e errUnknownType) Error() string {
return fmt.Sprintf("invalid type [%s]", e.typ)
}
Loading

0 comments on commit f8adf57

Please sign in to comment.