Skip to content

Commit

Permalink
Merge pull request #229 from OlivierCazade/api-omitempty
Browse files Browse the repository at this point in the history
Add omitempty flag to stage config
  • Loading branch information
OlivierCazade authored Jun 15, 2022
2 parents 217223d + 18be06a commit 4549b72
Show file tree
Hide file tree
Showing 13 changed files with 58 additions and 58 deletions.
28 changes: 14 additions & 14 deletions pkg/api/conn_track.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import "time"

type ConnTrack struct {
// TODO: should by a pointer instead?
KeyDefinition KeyDefinition `yaml:"keyDefinition" doc:"fields that are used to identify the connection"`
OutputRecordTypes []string `yaml:"outputRecordTypes" enum:"ConnTrackOutputRecordTypeEnum" doc:"output record types to emit"`
OutputFields []OutputField `yaml:"outputFields" doc:"list of output fields"`
EndConnectionTimeout time.Duration `yaml:"endConnectionTimeout" doc:"duration of time to wait from the last flow log to end a connection"`
KeyDefinition KeyDefinition `yaml:"keyDefinition,omitempty" doc:"fields that are used to identify the connection"`
OutputRecordTypes []string `yaml:"outputRecordTypes,omitempty" enum:"ConnTrackOutputRecordTypeEnum" doc:"output record types to emit"`
OutputFields []OutputField `yaml:"outputFields,omitempty" doc:"list of output fields"`
EndConnectionTimeout time.Duration `yaml:"endConnectionTimeout,omitempty" doc:"duration of time to wait from the last flow log to end a connection"`
}

type ConnTrackOutputRecordTypeEnum struct {
Expand All @@ -38,12 +38,12 @@ func ConnTrackOutputRecordTypeName(operation string) string {
}

type KeyDefinition struct {
FieldGroups []FieldGroup `yaml:"fieldGroups" doc:"list of field group definitions"`
Hash ConnTrackHash `yaml:"hash" doc:"how to build the connection hash"`
FieldGroups []FieldGroup `yaml:"fieldGroups,omitempty" doc:"list of field group definitions"`
Hash ConnTrackHash `yaml:"hash,omitempty" doc:"how to build the connection hash"`
}

type FieldGroup struct {
Name string `yaml:"name" doc:"field group name"`
Name string `yaml:"name,omitempty" doc:"field group name"`
Fields []string `yaml:"fields" doc:"list of fields in the group"`
}

Expand All @@ -54,16 +54,16 @@ type FieldGroup struct {
// When they are not set, a different hash will be computed for A->B and B->A,
// and they are tracked as different connections.
type ConnTrackHash struct {
FieldGroupRefs []string `yaml:"fieldGroupRefs" doc:"list of field group names to build the hash"`
FieldGroupARef string `yaml:"fieldGroupARef" doc:"field group name of endpoint A"`
FieldGroupBRef string `yaml:"fieldGroupBRef" doc:"field group name of endpoint B"`
FieldGroupRefs []string `yaml:"fieldGroupRefs,omitempty" doc:"list of field group names to build the hash"`
FieldGroupARef string `yaml:"fieldGroupARef,omitempty" doc:"field group name of endpoint A"`
FieldGroupBRef string `yaml:"fieldGroupBRef,omitempty" doc:"field group name of endpoint B"`
}

type OutputField struct {
Name string `yaml:"name" doc:"output field name"`
Operation string `yaml:"operation" enum:"ConnTrackOperationEnum" doc:"aggregate operation on the field value"`
SplitAB bool `yaml:"splitAB" doc:"When true, 2 output fields will be created. One for A->B and one for B->A flows."`
Input string `yaml:"input" doc:"The input field to base the operation on. When omitted, 'name' is used"`
Name string `yaml:"name,omitempty" doc:"output field name"`
Operation string `yaml:"operation,omitempty" enum:"ConnTrackOperationEnum" doc:"aggregate operation on the field value"`
SplitAB bool `yaml:"splitAB,omitempty" doc:"When true, 2 output fields will be created. One for A->B and one for B->A flows."`
Input string `yaml:"input,omitempty" doc:"The input field to base the operation on. When omitted, 'name' is used"`
}

type ConnTrackOperationEnum struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/decode_aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@
package api

type DecodeAws struct {
Fields []string `yaml:"fields" json:"fields" doc:"list of aws flow log fields"`
Fields []string `yaml:"fields,omitempty" json:"fields" doc:"list of aws flow log fields"`
}
10 changes: 5 additions & 5 deletions pkg/api/encode_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ package api
type EncodeKafka struct {
Address string `yaml:"address" json:"address" doc:"address of kafka server"`
Topic string `yaml:"topic" json:"topic" doc:"kafka topic to write to"`
Balancer string `yaml:"balancer" json:"balancer" enum:"KafkaEncodeBalancerEnum" doc:"one of the following:"`
WriteTimeout int64 `yaml:"writeTimeout" json:"writeTimeout" doc:"timeout (in seconds) for write operation performed by the Writer"`
ReadTimeout int64 `yaml:"readTimeout" json:"readTimeout" doc:"timeout (in seconds) for read operation performed by the Writer"`
BatchBytes int64 `yaml:"batchBytes" json:"batchBytes" doc:"limit the maximum size of a request in bytes before being sent to a partition"`
BatchSize int `yaml:"batchSize" json:"batchSize" doc:"limit on how many messages will be buffered before being sent to a partition"`
Balancer string `yaml:"balancer,omitempty" json:"balancer,omitempty" enum:"KafkaEncodeBalancerEnum" doc:"one of the following:"`
WriteTimeout int64 `yaml:"writeTimeout,omitempty" json:"writeTimeout,omitempty" doc:"timeout (in seconds) for write operation performed by the Writer"`
ReadTimeout int64 `yaml:"readTimeout,omitempty" json:"readTimeout,omitempty" doc:"timeout (in seconds) for read operation performed by the Writer"`
BatchBytes int64 `yaml:"batchBytes,omitempty" json:"batchBytes,omitempty" doc:"limit the maximum size of a request in bytes before being sent to a partition"`
BatchSize int `yaml:"batchSize,omitempty" json:"batchSize,omitempty" doc:"limit on how many messages will be buffered before being sent to a partition"`
}

type KafkaEncodeBalancerEnum struct {
Expand Down
8 changes: 4 additions & 4 deletions pkg/api/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package api

type PromEncode struct {
Metrics PromMetricsItems `yaml:"metrics" json:"metrics" doc:"list of prometheus metric definitions, each includes:"`
Port int `yaml:"port" json:"port" doc:"port number to expose \"/metrics\" endpoint"`
Prefix string `yaml:"prefix" json:"prefix" doc:"prefix added to each metric name"`
ExpiryTime int `yaml:"expiryTime" json:"expiryTime" doc:"seconds of no-flow to wait before deleting prometheus data item"`
Metrics PromMetricsItems `yaml:"metrics" json:"metrics,omitempty" doc:"list of prometheus metric definitions, each includes:"`
Port int `yaml:"port" json:"port,omitempty" doc:"port number to expose \"/metrics\" endpoint"`
Prefix string `yaml:"prefix" json:"prefix,omitempty" doc:"prefix added to each metric name"`
ExpiryTime int `yaml:"expiryTime" json:"expiryTime,omitempty" doc:"seconds of no-flow to wait before deleting prometheus data item"`
}

type PromEncodeOperationEnum struct {
Expand Down
10 changes: 5 additions & 5 deletions pkg/api/extract_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ type AggregateBy []string
type AggregateOperation string

type AggregateDefinition struct {
Name string `yaml:"name" json:"name" doc:"description of aggregation result"`
By AggregateBy `yaml:"by" json:"by" doc:"list of fields on which to aggregate"`
Operation AggregateOperation `yaml:"operation" json:"operation" doc:"sum, min, max, avg or raw_values"`
RecordKey string `yaml:"recordKey" json:"recordKey" doc:"internal field on which to perform the operation"`
TopK int `yaml:"topK" json:"topK" doc:"number of highest incidence to report (default - report all)"`
Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"description of aggregation result"`
By AggregateBy `yaml:"by,omitempty" json:"by,omitempty" doc:"list of fields on which to aggregate"`
Operation AggregateOperation `yaml:"operation,omitempty" json:"operation,omitempty" doc:"sum, min, max, avg or raw_values"`
RecordKey string `yaml:"recordKey,omitempty" json:"recordKey,omitempty" doc:"internal field on which to perform the operation"`
TopK int `yaml:"topK,omitempty" json:"topK,omitempty" doc:"number of highest incidence to report (default - report all)"`
}
8 changes: 4 additions & 4 deletions pkg/api/ingest_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package api

type IngestCollector struct {
HostName string `yaml:"hostName" json:"hostName" doc:"the hostname to listen on"`
Port int `yaml:"port" json:"port" doc:"the port number to listen on, for IPFIX/NetFlow v9. Omit or set to 0 to disable IPFIX/NetFlow v9 ingestion"`
PortLegacy int `yaml:"portLegacy" json:"portLegacy" doc:"the port number to listen on, for legacy NetFlow v5. Omit or set to 0 to disable NetFlow v5 ingestion"`
BatchMaxLen int `yaml:"batchMaxLen" json:"batchMaxLen" doc:"the number of accumulated flows before being forwarded for processing"`
HostName string `yaml:"hostName,omitempty" json:"hostName,omitempty" doc:"the hostname to listen on"`
Port int `yaml:"port,omitempty" json:"port,omitempty" doc:"the port number to listen on, for IPFIX/NetFlow v9. Omit or set to 0 to disable IPFIX/NetFlow v9 ingestion"`
PortLegacy int `yaml:"portLegacy,omitempty" json:"portLegacy,omitempty" doc:"the port number to listen on, for legacy NetFlow v5. Omit or set to 0 to disable NetFlow v5 ingestion"`
BatchMaxLen int `yaml:"batchMaxLen,omitempty" json:"batchMaxLen,omitempty" doc:"the number of accumulated flows before being forwarded for processing"`
}
4 changes: 2 additions & 2 deletions pkg/api/ingest_grpc.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package api

type IngestGRPCProto struct {
Port int `yaml:"port" json:"port" doc:"the port number to listen on"`
BufferLen int `yaml:"bufferLength" json:"bufferLength" doc:"the length of the ingest channel buffer, in groups of flows, containing each group hundreds of flows (default: 100)"`
Port int `yaml:"port,omitempty" json:"port,omitempty" doc:"the port number to listen on"`
BufferLen int `yaml:"bufferLength,omitempty" json:"bufferLength,omitempty" doc:"the length of the ingest channel buffer, in groups of flows, containing each group hundreds of flows (default: 100)"`
}
12 changes: 6 additions & 6 deletions pkg/api/ingest_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package api

type IngestKafka struct {
Brokers []string `yaml:"brokers" json:"brokers" doc:"list of kafka broker addresses"`
Topic string `yaml:"topic" json:"topic" doc:"kafka topic to listen on"`
GroupId string `yaml:"groupid" json:"groupid" doc:"separate groupid for each consumer on specified topic"`
GroupBalancers []string `yaml:"groupBalancers" json:"groupBalancers" doc:"list of balancing strategies (range, roundRobin, rackAffinity)"`
StartOffset string `yaml:"startOffset" json:"startOffset" doc:"FirstOffset (least recent - default) or LastOffset (most recent) offset available for a partition"`
BatchReadTimeout int64 `yaml:"batchReadTimeout" json:"batchReadTimeout" doc:"how often (in milliseconds) to process input"`
Brokers []string `yaml:"brokers,omitempty" json:"brokers,omitempty" doc:"list of kafka broker addresses"`
Topic string `yaml:"topic,omitempty" json:"topic,omitempty" doc:"kafka topic to listen on"`
GroupId string `yaml:"groupid,omitempty" json:"groupid,omitempty" doc:"separate groupid for each consumer on specified topic"`
GroupBalancers []string `yaml:"groupBalancers,omitempty" json:"groupBalancers,omitempty" doc:"list of balancing strategies (range, roundRobin, rackAffinity)"`
StartOffset string `yaml:"startOffset,omitempty" json:"startOffset,omitempty" doc:"FirstOffset (least recent - default) or LastOffset (most recent) offset available for a partition"`
BatchReadTimeout int64 `yaml:"batchReadTimeout,omitempty" json:"batchReadTimeout,omitempty" doc:"how often (in milliseconds) to process input"`
}
6 changes: 3 additions & 3 deletions pkg/api/transform_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package api

type TransformFilter struct {
Rules []TransformFilterRule `yaml:"rules" json:"rules" doc:"list of filter rules, each includes:"`
Rules []TransformFilterRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"list of filter rules, each includes:"`
}

type TransformFilterOperationEnum struct {
Expand All @@ -32,6 +32,6 @@ func TransformFilterOperationName(operation string) string {
}

type TransformFilterRule struct {
Input string `yaml:"input" json:"input" doc:"entry input field"`
Type string `yaml:"type" json:"type" enum:"TransformFilterOperationEnum" doc:"one of the following:"`
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"`
Type string `yaml:"type,omitempty" json:"type,omitempty" enum:"TransformFilterOperationEnum" doc:"one of the following:"`
}
8 changes: 4 additions & 4 deletions pkg/api/transform_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package api

type TransformGeneric struct {
Policy string `yaml:"policy" json:"policy" enum:"TransformGenericOperationEnum" doc:"key replacement policy; may be one of the following:"`
Rules []GenericTransformRule `yaml:"rules" json:"rules" doc:"list of transform rules, each includes:"`
Policy string `yaml:"policy,omitempty" json:"policy,omitempty" enum:"TransformGenericOperationEnum" doc:"key replacement policy; may be one of the following:"`
Rules []GenericTransformRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"list of transform rules, each includes:"`
}

type TransformGenericOperationEnum struct {
Expand All @@ -32,8 +32,8 @@ func TransformGenericOperationName(operation string) string {
}

type GenericTransformRule struct {
Input string `yaml:"input" json:"input" doc:"entry input field"`
Output string `yaml:"output" json:"output" doc:"entry output field"`
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"`
Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"`
}

type GenericTransform []GenericTransformRule
6 changes: 3 additions & 3 deletions pkg/api/transform_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ func TransformNetworkOperationName(operation string) string {
}

type NetworkTransformRule struct {
Input string `yaml:"input" json:"input" doc:"entry input field"`
Output string `yaml:"output" json:"output" doc:"entry output field"`
Type string `yaml:"type" json:"type" enum:"TransformNetworkOperationEnum" doc:"one of the following:"`
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"entry input field"`
Output string `yaml:"output,omitempty" json:"output,omitempty" doc:"entry output field"`
Type string `yaml:"type,omitempty" json:"type,omitempty" enum:"TransformNetworkOperationEnum" doc:"one of the following:"`
Parameters string `yaml:"parameters,omitempty" json:"parameters,omitempty" doc:"parameters specific to type"`
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/api/write_stdout.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package api

type WriteStdout struct {
Format string `yaml:"format" json:"format" doc:"the format of each line: printf (default) or json"`
Format string `yaml:"format,omitempty" json:"format,omitempty" doc:"the format of each line: printf (default) or json"`
}
12 changes: 6 additions & 6 deletions pkg/config/pipeline_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestLokiPipeline(t *testing.T) {

b, err = json.Marshal(params[0])
require.NoError(t, err)
require.Equal(t, `{"name":"ingest","ingest":{"type":"collector","collector":{"hostName":"127.0.0.1","port":9999,"portLegacy":0,"batchMaxLen":0}}}`, string(b))
require.Equal(t, `{"name":"ingest","ingest":{"type":"collector","collector":{"hostName":"127.0.0.1","port":9999}}}`, string(b))

b, err = json.Marshal(params[1])
require.NoError(t, err)
Expand Down Expand Up @@ -136,19 +136,19 @@ func TestKafkaPromPipeline(t *testing.T) {

b, err = json.Marshal(params[0])
require.NoError(t, err)
require.Equal(t, `{"name":"ingest","ingest":{"type":"kafka","kafka":{"brokers":["http://kafka"],"topic":"netflows","groupid":"my-group","groupBalancers":null,"startOffset":"","batchReadTimeout":0}}}`, string(b))
require.Equal(t, `{"name":"ingest","ingest":{"type":"kafka","kafka":{"brokers":["http://kafka"],"topic":"netflows","groupid":"my-group"}}}`, string(b))

b, err = json.Marshal(params[1])
require.NoError(t, err)
require.Equal(t, `{"name":"filter","transform":{"type":"filter","filter":{"rules":[{"input":"doesnt_exist","type":"remove_entry_if_doesnt_exist"}]}}}`, string(b))

b, err = json.Marshal(params[2])
require.NoError(t, err)
require.Equal(t, `{"name":"aggregate","extract":{"type":"aggregates","aggregates":[{"name":"src_as_connection_count","by":["srcAS"],"operation":"count","recordKey":"","topK":0}]}}`, string(b))
require.Equal(t, `{"name":"aggregate","extract":{"type":"aggregates","aggregates":[{"name":"src_as_connection_count","by":["srcAS"],"operation":"count"}]}}`, string(b))

b, err = json.Marshal(params[3])
require.NoError(t, err)
require.Equal(t, `{"name":"prom","encode":{"type":"prom","prom":{"metrics":[{"name":"connections_per_source_as","type":"counter","filter":{"key":"name","value":"src_as_connection_count"},"valueKey":"recent_count","labels":["by","aggregate"],"buckets":[]}],"port":9090,"prefix":"flp_","expiryTime":0}}}`, string(b))
require.Equal(t, `{"name":"prom","encode":{"type":"prom","prom":{"metrics":[{"name":"connections_per_source_as","type":"counter","filter":{"key":"name","value":"src_as_connection_count"},"valueKey":"recent_count","labels":["by","aggregate"],"buckets":[]}],"port":9090,"prefix":"flp_"}}}`, string(b))
}

func TestForkPipeline(t *testing.T) {
Expand All @@ -167,13 +167,13 @@ func TestForkPipeline(t *testing.T) {

b, err = json.Marshal(params[0])
require.NoError(t, err)
require.Equal(t, `{"name":"ingest","ingest":{"type":"collector","collector":{"hostName":"127.0.0.1","port":9999,"portLegacy":0,"batchMaxLen":0}}}`, string(b))
require.Equal(t, `{"name":"ingest","ingest":{"type":"collector","collector":{"hostName":"127.0.0.1","port":9999}}}`, string(b))

b, err = json.Marshal(params[1])
require.NoError(t, err)
require.Equal(t, `{"name":"loki","write":{"type":"loki","loki":{"url":"http://loki:3100/","batchWait":"1s","batchSize":102400,"timeout":"10s","minBackoff":"1s","maxBackoff":"5m","maxRetries":10,"timestampLabel":"TimeReceived","timestampScale":"1s"}}}`, string(b))

b, err = json.Marshal(params[2])
require.NoError(t, err)
require.Equal(t, `{"name":"stdout","write":{"type":"stdout","stdout":{"format":""}}}`, string(b))
require.Equal(t, `{"name":"stdout","write":{"type":"stdout","stdout":{}}}`, string(b))
}

0 comments on commit 4549b72

Please sign in to comment.