diff --git a/README.md b/README.md index 01c976ce4..1bb190e17 100644 --- a/README.md +++ b/README.md @@ -152,6 +152,7 @@ parameters: transform: type: generic generic: + policy: replace_keys rules: - input: Bytes output: v1_bytes @@ -168,6 +169,7 @@ parameters: transform: type: generic generic: + policy: replace_keys rules: - input: Bytes output: v2_bytes @@ -249,6 +251,9 @@ The generic transform module maps the input json keys into another set of keys. This allows to perform subsequent operations using a uniform set of keys. In some use cases, only a subset of the provided fields are required. Using the generic transform, we may specify those particular fields that interest us. +Specify `policy: replace_keys` to use only the newly specified keys. +To include the original keys and values in addition to those specified in the `rules`, +specify `policy: preserve_original_keys`. For example, suppose we have a flow log with the following syntax: ``` @@ -264,6 +269,7 @@ parameters: transform: type: generic generic: + policy: replace_keys rules: - input: Bytes output: bytes @@ -295,18 +301,20 @@ pipeline: follows: transform1 parameters: - name: transform1 - transform + transform: type: generic generic: + policy: replace_keys rules: - input: DstAddr output: dstAddr - input: SrcAddr output: srcAddr - name: transform2 - transform + transform: type: generic generic: + policy: replace_keys rules: - input: dstAddr output: dstIP @@ -321,6 +329,19 @@ Before the first transform suppose we have the keys `DstAddr` and `SrcAddr`. After the first transform, we have the keys `dstAddr` and `srcAddr`. After the second transform, we have the keys `dstAddr`, `dstIP`, `srcAddr`, and `srcIP`. +To maintain all the old keys and values and simply add the key `dstAddr` (derived from `DstAddr`), use the following: +``` +parameters: + - name: transform1 + transform: + type: generic + generic: + policy: preserve_original_keys + rules: + - input: DstAddr + output: dstAddr +``` + ### Transform Filter The filter transform module allows setting rules to remove complete entries from diff --git a/contrib/kubernetes/flowlogs-pipeline.conf.yaml b/contrib/kubernetes/flowlogs-pipeline.conf.yaml index 663bd9d41..f7d5025ef 100644 --- a/contrib/kubernetes/flowlogs-pipeline.conf.yaml +++ b/contrib/kubernetes/flowlogs-pipeline.conf.yaml @@ -13,6 +13,7 @@ parameters: - name: transform_generic transform: generic: + policy: replace_keys rules: - input: SrcAddr output: srcIP diff --git a/docs/api.md b/docs/api.md index 4b1154967..1a82b4b01 100644 --- a/docs/api.md +++ b/docs/api.md @@ -79,6 +79,9 @@ Following is the supported API format for generic transformations:
generic: + policy: (enum) key replacement policy; may be one of the following: + preserve_original_keys: adds new keys in addition to existing keys (default) + replace_keys: removes all old keys and uses only the new keys rules: list of transform rules, each includes: input: entry input field output: entry output field diff --git a/pkg/api/enum.go b/pkg/api/enum.go index fc1da03cc..cb519ffcb 100644 --- a/pkg/api/enum.go +++ b/pkg/api/enum.go @@ -26,6 +26,7 @@ type enums struct { PromEncodeOperationEnum PromEncodeOperationEnum TransformNetworkOperationEnum TransformNetworkOperationEnum TransformFilterOperationEnum TransformFilterOperationEnum + TransformGenericOperationEnum TransformGenericOperationEnum KafkaEncodeBalancerEnum KafkaEncodeBalancerEnum } diff --git a/pkg/api/transform_generic.go b/pkg/api/transform_generic.go index 2f4f17998..ab9bdf1cf 100644 --- a/pkg/api/transform_generic.go +++ b/pkg/api/transform_generic.go @@ -18,7 +18,17 @@ package api type TransformGeneric struct { - Rules []GenericTransformRule `yaml:"rules" doc:"list of transform rules, each includes:"` + Policy string `yaml:"policy" enum:"TransformGenericOperationEnum" doc:"key replacement policy; may be one of the following:"` + Rules []GenericTransformRule `yaml:"rules" doc:"list of transform rules, each includes:"` +} + +type TransformGenericOperationEnum struct { + PreserveOriginalKeys string `yaml:"preserve_original_keys" doc:"adds new keys in addition to existing keys (default)"` + ReplaceKeys string `yaml:"replace_keys" doc:"removes all old keys and uses only the new keys"` +} + +func TransformGenericOperationName(operation string) string { + return GetEnumName(TransformGenericOperationEnum{}, operation) } type GenericTransformRule struct { diff --git a/pkg/confgen/flowlogs2metrics_config.go b/pkg/confgen/flowlogs2metrics_config.go index 4985515e8..cc94f10a7 100644 --- a/pkg/confgen/flowlogs2metrics_config.go +++ b/pkg/confgen/flowlogs2metrics_config.go @@ -70,7 +70,8 @@ func (cg *ConfGen) generateFlowlogs2PipelineConfig(fileName string) error { "transform": map[string]interface{}{ "type": "generic", "generic": map[string]interface{}{ - "rules": cg.config.Transform.Generic.Rules, + "policy": "replace_keys", + "rules": cg.config.Transform.Generic.Rules, }, }, }, diff --git a/pkg/pipeline/pipeline_test.go b/pkg/pipeline/pipeline_test.go index 1751d4e21..e1174ef2e 100644 --- a/pkg/pipeline/pipeline_test.go +++ b/pkg/pipeline/pipeline_test.go @@ -90,6 +90,7 @@ parameters: transform: type: generic generic: + policy: replace_keys rules: - input: Bytes output: flp_bytes diff --git a/pkg/pipeline/transform/transform_generic.go b/pkg/pipeline/transform/transform_generic.go index 8eb7659ce..f3b994a80 100644 --- a/pkg/pipeline/transform/transform_generic.go +++ b/pkg/pipeline/transform/transform_generic.go @@ -24,16 +24,24 @@ import ( ) type Generic struct { - Rules []api.GenericTransformRule + policy string + rules []api.GenericTransformRule } // Transform transforms a flow to a new set of keys func (g *Generic) Transform(input []config.GenericMap) []config.GenericMap { - log.Debugf("f = %v", g) + log.Debugf("entering Generic Transform g = %v", g) output := make([]config.GenericMap, 0) for _, entry := range input { outputEntry := make(config.GenericMap) - for _, transformRule := range g.Rules { + if g.policy != "replace_keys" { + // copy old map to new map + for key, value := range entry { + outputEntry[key] = value + } + } + log.Debugf("outputEntry = %v", outputEntry) + for _, transformRule := range g.rules { log.Debugf("transformRule = %v", transformRule) outputEntry[transformRule.Output] = entry[transformRule.Input] } @@ -46,8 +54,20 @@ func (g *Generic) Transform(input []config.GenericMap) []config.GenericMap { // NewTransformGeneric create a new transform func NewTransformGeneric(params config.StageParam) (Transformer, error) { log.Debugf("entering NewTransformGeneric") + log.Debugf("params.Transform.Generic = %v", params.Transform.Generic) + rules := params.Transform.Generic.Rules + policy := params.Transform.Generic.Policy + switch policy { + case "replace_keys", "preserve_original_keys", "": + // valid; nothing to do + log.Infof("NewTransformGeneric, policy = %s", policy) + default: + log.Panicf("unknown policy %s for transform.generic", policy) + } transformGeneric := &Generic{ - Rules: params.Transform.Generic.Rules, + policy: policy, + rules: rules, } + log.Debugf("transformGeneric = %v", transformGeneric) return transformGeneric, nil } diff --git a/pkg/pipeline/transform/transform_generic_test.go b/pkg/pipeline/transform/transform_generic_test.go index 17a90a06c..dd0652421 100644 --- a/pkg/pipeline/transform/transform_generic_test.go +++ b/pkg/pipeline/transform/transform_generic_test.go @@ -25,7 +25,7 @@ import ( "github.com/stretchr/testify/require" ) -const testConfigTransformGeneric = `--- +const testConfigTransformGenericMaintainFalse = `--- log-level: debug pipeline: - name: transform1 @@ -34,6 +34,7 @@ parameters: transform: type: generic generic: + policy: replace_keys rules: - input: srcIP output: SrcAddr @@ -49,7 +50,32 @@ parameters: output: srcIP ` -func getGenericExpectedOutput() config.GenericMap { +const testConfigTransformGenericMaintainTrue = `--- +log-level: debug +pipeline: + - name: transform1 +parameters: + - name: transform1 + transform: + type: generic + generic: + policy: preserve_original_keys + rules: + - input: srcIP + output: SrcAddr + - input: dstIP + output: DstAddr + - input: dstPort + output: DstPort + - input: srcPort + output: SrcPort + - input: protocol + output: Protocol + - input: srcIP + output: srcIP +` + +func getGenericExpectedOutputShort() config.GenericMap { return config.GenericMap{ "SrcAddr": "10.0.0.1", "srcIP": "10.0.0.1", @@ -60,14 +86,46 @@ func getGenericExpectedOutput() config.GenericMap { } } -func TestNewTransformGeneric(t *testing.T) { - newTransform := InitNewTransformGeneric(t, testConfigTransformGeneric) +func getGenericExpectedOutputLong() config.GenericMap { + return config.GenericMap{ + "SrcAddr": "10.0.0.1", + "SrcPort": 11777, + "Protocol": "tcp", + "DstAddr": "20.0.0.2", + "DstPort": 22, + "srcIP": "10.0.0.1", + "8888IP": "8.8.8.8", + "emptyIP": "", + "level": "error", + "srcPort": 11777, + "protocol": "tcp", + "protocol_num": 6, + "value": 7.0, + "message": "test message", + "dstIP": "20.0.0.2", + "dstPort": 22, + } +} + +func TestNewTransformGenericMaintainFalse(t *testing.T) { + newTransform := InitNewTransformGeneric(t, testConfigTransformGenericMaintainFalse) + transformGeneric := newTransform.(*Generic) + require.Len(t, transformGeneric.rules, 6) + + input := test.GetIngestMockEntry(false) + output := transformGeneric.Transform([]config.GenericMap{input}) + expectedOutput := getGenericExpectedOutputShort() + require.Equal(t, expectedOutput, output[0]) +} + +func TestNewTransformGenericMaintainTrue(t *testing.T) { + newTransform := InitNewTransformGeneric(t, testConfigTransformGenericMaintainTrue) transformGeneric := newTransform.(*Generic) - require.Len(t, transformGeneric.Rules, 6) + require.Len(t, transformGeneric.rules, 6) input := test.GetIngestMockEntry(false) output := transformGeneric.Transform([]config.GenericMap{input}) - expectedOutput := getGenericExpectedOutput() + expectedOutput := getGenericExpectedOutputLong() require.Equal(t, expectedOutput, output[0]) } diff --git a/pkg/pipeline/transform_multiple_test.go b/pkg/pipeline/transform_multiple_test.go index 42d9631b4..0492fb91c 100644 --- a/pkg/pipeline/transform_multiple_test.go +++ b/pkg/pipeline/transform_multiple_test.go @@ -51,6 +51,7 @@ parameters: transform: type: generic generic: + policy: replace_keys rules: - input: srcIP output: SrcAddr @@ -69,6 +70,7 @@ parameters: transform: type: generic generic: + policy: replace_keys rules: - input: SrcAddr output: SrcAddr2 diff --git a/playground/goflow3.yml b/playground/goflow3.yml index 4b30f926d..4b1077537 100644 --- a/playground/goflow3.yml +++ b/playground/goflow3.yml @@ -20,6 +20,7 @@ parameters: transform: type: generic generic: + policy: replace_keys rules: - input: Bytes output: flp_bytes