Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow transform_generic to keep existing fields #156

Merged
merged 7 commits into from
Apr 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ parameters:
transform:
type: generic
generic:
policy: replace_keys
rules:
- input: Bytes
output: v1_bytes
Expand All @@ -168,6 +169,7 @@ parameters:
transform:
type: generic
generic:
policy: replace_keys
rules:
- input: Bytes
output: v2_bytes
Expand Down Expand Up @@ -249,6 +251,9 @@ The generic transform module maps the input json keys into another set of keys.
This allows to perform subsequent operations using a uniform set of keys.
In some use cases, only a subset of the provided fields are required.
Using the generic transform, we may specify those particular fields that interest us.
Specify `policy: replace_keys` to use only the newly specified keys.
To include the original keys and values in addition to those specified in the `rules`,
specify `policy: preserve_original_keys`.
Comment on lines +255 to +256
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if an original key conflicts with a key specified in rules? (i.e. they have the same name)
Is it an error? does the original key prevail? or the new key?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new key overwrites the old value.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think its worth noting in the docs?


For example, suppose we have a flow log with the following syntax:
```
Expand All @@ -264,6 +269,7 @@ parameters:
transform:
type: generic
generic:
policy: replace_keys
rules:
- input: Bytes
output: bytes
Expand Down Expand Up @@ -295,18 +301,20 @@ pipeline:
follows: transform1
parameters:
- name: transform1
transform
transform:
type: generic
generic:
policy: replace_keys
rules:
- input: DstAddr
output: dstAddr
- input: SrcAddr
output: srcAddr
- name: transform2
transform
transform:
type: generic
generic:
policy: replace_keys
rules:
- input: dstAddr
output: dstIP
Expand All @@ -321,6 +329,19 @@ Before the first transform suppose we have the keys `DstAddr` and `SrcAddr`.
After the first transform, we have the keys `dstAddr` and `srcAddr`.
After the second transform, we have the keys `dstAddr`, `dstIP`, `srcAddr`, and `srcIP`.

To maintain all the old keys and values and simply add the key `dstAddr` (derived from `DstAddr`), use the following:
```
parameters:
- name: transform1
transform:
type: generic
generic:
policy: preserve_original_keys
rules:
- input: DstAddr
output: dstAddr
```

### Transform Filter

The filter transform module allows setting rules to remove complete entries from
Expand Down
1 change: 1 addition & 0 deletions contrib/kubernetes/flowlogs-pipeline.conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ parameters:
- name: transform_generic
transform:
generic:
policy: replace_keys
rules:
- input: SrcAddr
output: srcIP
Expand Down
3 changes: 3 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ Following is the supported API format for generic transformations:

<pre>
generic:
policy: (enum) key replacement policy; may be one of the following:
preserve_original_keys: adds new keys in addition to existing keys (default)
replace_keys: removes all old keys and uses only the new keys
rules: list of transform rules, each includes:
input: entry input field
output: entry output field
Expand Down
1 change: 1 addition & 0 deletions pkg/api/enum.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type enums struct {
PromEncodeOperationEnum PromEncodeOperationEnum
TransformNetworkOperationEnum TransformNetworkOperationEnum
TransformFilterOperationEnum TransformFilterOperationEnum
TransformGenericOperationEnum TransformGenericOperationEnum
KafkaEncodeBalancerEnum KafkaEncodeBalancerEnum
}

Expand Down
12 changes: 11 additions & 1 deletion pkg/api/transform_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,17 @@
package api

type TransformGeneric struct {
Rules []GenericTransformRule `yaml:"rules" doc:"list of transform rules, each includes:"`
Policy string `yaml:"policy" enum:"TransformGenericOperationEnum" doc:"key replacement policy; may be one of the following:"`
Rules []GenericTransformRule `yaml:"rules" doc:"list of transform rules, each includes:"`
}

type TransformGenericOperationEnum struct {
PreserveOriginalKeys string `yaml:"preserve_original_keys" doc:"adds new keys in addition to existing keys (default)"`
ReplaceKeys string `yaml:"replace_keys" doc:"removes all old keys and uses only the new keys"`
}

func TransformGenericOperationName(operation string) string {
return GetEnumName(TransformGenericOperationEnum{}, operation)
}

type GenericTransformRule struct {
Expand Down
3 changes: 2 additions & 1 deletion pkg/confgen/flowlogs2metrics_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ func (cg *ConfGen) generateFlowlogs2PipelineConfig(fileName string) error {
"transform": map[string]interface{}{
"type": "generic",
"generic": map[string]interface{}{
"rules": cg.config.Transform.Generic.Rules,
"policy": "replace_keys",
"rules": cg.config.Transform.Generic.Rules,
},
},
},
Expand Down
1 change: 1 addition & 0 deletions pkg/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ parameters:
transform:
type: generic
generic:
policy: replace_keys
rules:
- input: Bytes
output: flp_bytes
Expand Down
28 changes: 24 additions & 4 deletions pkg/pipeline/transform/transform_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,24 @@ import (
)

type Generic struct {
Rules []api.GenericTransformRule
policy string
rules []api.GenericTransformRule
}

// Transform transforms a flow to a new set of keys
func (g *Generic) Transform(input []config.GenericMap) []config.GenericMap {
log.Debugf("f = %v", g)
log.Debugf("entering Generic Transform g = %v", g)
output := make([]config.GenericMap, 0)
for _, entry := range input {
outputEntry := make(config.GenericMap)
for _, transformRule := range g.Rules {
if g.policy != "replace_keys" {
// copy old map to new map
for key, value := range entry {
outputEntry[key] = value
}
}
log.Debugf("outputEntry = %v", outputEntry)
for _, transformRule := range g.rules {
log.Debugf("transformRule = %v", transformRule)
outputEntry[transformRule.Output] = entry[transformRule.Input]
}
Expand All @@ -46,8 +54,20 @@ func (g *Generic) Transform(input []config.GenericMap) []config.GenericMap {
// NewTransformGeneric create a new transform
func NewTransformGeneric(params config.StageParam) (Transformer, error) {
log.Debugf("entering NewTransformGeneric")
log.Debugf("params.Transform.Generic = %v", params.Transform.Generic)
rules := params.Transform.Generic.Rules
policy := params.Transform.Generic.Policy
switch policy {
case "replace_keys", "preserve_original_keys", "":
// valid; nothing to do
log.Infof("NewTransformGeneric, policy = %s", policy)
default:
log.Panicf("unknown policy %s for transform.generic", policy)
}
transformGeneric := &Generic{
Rules: params.Transform.Generic.Rules,
policy: policy,
rules: rules,
}
log.Debugf("transformGeneric = %v", transformGeneric)
return transformGeneric, nil
}
70 changes: 64 additions & 6 deletions pkg/pipeline/transform/transform_generic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/stretchr/testify/require"
)

const testConfigTransformGeneric = `---
const testConfigTransformGenericMaintainFalse = `---
log-level: debug
pipeline:
- name: transform1
Expand All @@ -34,6 +34,7 @@ parameters:
transform:
type: generic
generic:
policy: replace_keys
rules:
- input: srcIP
output: SrcAddr
Expand All @@ -49,7 +50,32 @@ parameters:
output: srcIP
`

func getGenericExpectedOutput() config.GenericMap {
const testConfigTransformGenericMaintainTrue = `---
log-level: debug
pipeline:
- name: transform1
parameters:
- name: transform1
transform:
type: generic
generic:
policy: preserve_original_keys
rules:
- input: srcIP
output: SrcAddr
- input: dstIP
output: DstAddr
- input: dstPort
output: DstPort
- input: srcPort
output: SrcPort
- input: protocol
output: Protocol
- input: srcIP
output: srcIP
`

func getGenericExpectedOutputShort() config.GenericMap {
return config.GenericMap{
"SrcAddr": "10.0.0.1",
"srcIP": "10.0.0.1",
Expand All @@ -60,14 +86,46 @@ func getGenericExpectedOutput() config.GenericMap {
}
}

func TestNewTransformGeneric(t *testing.T) {
newTransform := InitNewTransformGeneric(t, testConfigTransformGeneric)
func getGenericExpectedOutputLong() config.GenericMap {
return config.GenericMap{
"SrcAddr": "10.0.0.1",
"SrcPort": 11777,
"Protocol": "tcp",
"DstAddr": "20.0.0.2",
"DstPort": 22,
"srcIP": "10.0.0.1",
"8888IP": "8.8.8.8",
"emptyIP": "",
"level": "error",
"srcPort": 11777,
"protocol": "tcp",
"protocol_num": 6,
"value": 7.0,
"message": "test message",
"dstIP": "20.0.0.2",
"dstPort": 22,
}
}

func TestNewTransformGenericMaintainFalse(t *testing.T) {
newTransform := InitNewTransformGeneric(t, testConfigTransformGenericMaintainFalse)
transformGeneric := newTransform.(*Generic)
require.Len(t, transformGeneric.rules, 6)

input := test.GetIngestMockEntry(false)
output := transformGeneric.Transform([]config.GenericMap{input})
expectedOutput := getGenericExpectedOutputShort()
require.Equal(t, expectedOutput, output[0])
}

func TestNewTransformGenericMaintainTrue(t *testing.T) {
newTransform := InitNewTransformGeneric(t, testConfigTransformGenericMaintainTrue)
transformGeneric := newTransform.(*Generic)
require.Len(t, transformGeneric.Rules, 6)
require.Len(t, transformGeneric.rules, 6)

input := test.GetIngestMockEntry(false)
output := transformGeneric.Transform([]config.GenericMap{input})
expectedOutput := getGenericExpectedOutput()
expectedOutput := getGenericExpectedOutputLong()
require.Equal(t, expectedOutput, output[0])
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/pipeline/transform_multiple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ parameters:
transform:
type: generic
generic:
policy: replace_keys
rules:
- input: srcIP
output: SrcAddr
Expand All @@ -69,6 +70,7 @@ parameters:
transform:
type: generic
generic:
policy: replace_keys
rules:
- input: SrcAddr
output: SrcAddr2
Expand Down
1 change: 1 addition & 0 deletions playground/goflow3.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ parameters:
transform:
type: generic
generic:
policy: replace_keys
rules:
- input: Bytes
output: flp_bytes
Expand Down