Skip to content

Commit

Permalink
refactor to reduce cyclomatic complexity and rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
KalmanMeth committed Jul 24, 2023
1 parent 008b066 commit 1179cc9
Showing 1 changed file with 131 additions and 121 deletions.
252 changes: 131 additions & 121 deletions controllers/flowlogspipeline/flp_common_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,39 +311,114 @@ func (b *builder) addTransformStages(stage *config.PipelineBuilderStage) (*corev
lastStage := *stage
indexFields := constants.LokiIndexFields

var clusterName string
transformFilterRules := []api.TransformFilterRule{}
lastStage = b.addTransformFilter(lastStage)

if b.desired.Processor.ClusterName != "" {
clusterName = b.desired.Processor.ClusterName
} else {
//take clustername from openshift
clusterName = string(golbals.DefaultClusterID)
}
if clusterName != "" {
transformFilterRules = []api.TransformFilterRule{
{
Input: "K8S_ClusterName",
Type: "add_field_if_doesnt_exist",
Value: clusterName,
},
indexFields, lastStage = b.addConnectionTracking(indexFields, lastStage)

// enrich stage (transform) configuration
enrichedStage := lastStage.TransformNetwork("enrich", api.TransformNetwork{
Rules: api.NetworkTransformRules{{
Input: "SrcAddr",
Output: "SrcK8S",
Type: api.AddKubernetesRuleType,
}, {
Input: "DstAddr",
Output: "DstK8S",
Type: api.AddKubernetesRuleType,
}, {
Type: api.ReinterpretDirectionRuleType,
}},
DirectionInfo: api.NetworkTransformDirectionInfo{
ReporterIPField: "AgentIP",
SrcHostField: "SrcK8S_HostIP",
DstHostField: "DstK8S_HostIP",
FlowDirectionField: "FlowDirection",
IfDirectionField: "IfDirection",
},
})

// loki stage (write) configuration
if helper.UseLoki(b.desired) {
lokiWrite := api.WriteLoki{
Labels: indexFields,
BatchSize: int(b.desired.Loki.BatchSize),
BatchWait: helper.UnstructuredDuration(b.desired.Loki.BatchWait),
MaxBackoff: helper.UnstructuredDuration(b.desired.Loki.MaxBackoff),
MaxRetries: int(helper.PtrInt32(b.desired.Loki.MaxRetries)),
MinBackoff: helper.UnstructuredDuration(b.desired.Loki.MinBackoff),
StaticLabels: model.LabelSet{},
Timeout: helper.UnstructuredDuration(b.desired.Loki.Timeout),
URL: b.desired.Loki.URL,
TimestampLabel: "TimeFlowEndMs",
TimestampScale: "1ms",
TenantID: b.desired.Loki.TenantID,
}
}

// Filter-out unused fields?
if helper.PtrBool(b.desired.Processor.DropUnusedFields) {
if helper.UseIPFIX(b.desired) {
rules := filters.GetOVSGoflowUnusedRules()
transformFilterRules = append(transformFilterRules, rules...)
for k, v := range b.desired.Loki.StaticLabels {
lokiWrite.StaticLabels[model.LabelName(k)] = model.LabelValue(v)
}
// Else: nothing for eBPF at the moment

var authorization *promConfig.Authorization
if helper.LokiUseHostToken(&b.desired.Loki) || helper.LokiForwardUserToken(&b.desired.Loki) {
b.volumes.AddToken(constants.FLPName)
authorization = &promConfig.Authorization{
Type: "Bearer",
CredentialsFile: constants.TokensPath + constants.FLPName,
}
}

if b.desired.Loki.TLS.Enable {
if b.desired.Loki.TLS.InsecureSkipVerify {
lokiWrite.ClientConfig = &promConfig.HTTPClientConfig{
Authorization: authorization,
TLSConfig: promConfig.TLSConfig{
InsecureSkipVerify: true,
},
}
} else {
caPath := b.volumes.AddCACertificate(&b.desired.Loki.TLS, "loki-certs")
lokiWrite.ClientConfig = &promConfig.HTTPClientConfig{
Authorization: authorization,
TLSConfig: promConfig.TLSConfig{
CAFile: caPath,
},
}
}
} else {
lokiWrite.ClientConfig = &promConfig.HTTPClientConfig{
Authorization: authorization,
}
}
enrichedStage.WriteLoki("loki", lokiWrite)
}
if len(transformFilterRules) > 0 {
lastStage = lastStage.TransformFilter("filter", api.TransformFilter{
Rules: transformFilterRules,
})

// write on Stdout if logging trace enabled
if b.desired.Processor.LogLevel == "trace" {
enrichedStage.WriteStdout("stdout", api.WriteStdout{Format: "json"})
}

// obtain encode_prometheus stage from metrics_definitions
promMetrics, dashboard, err := b.obtainMetricsConfiguration()
if err != nil {
return nil, err
}

var dashboardConfigMap *corev1.ConfigMap
if len(promMetrics) > 0 {
// prometheus stage (encode) configuration
promEncode := api.PromEncode{
Prefix: "netobserv_",
Metrics: promMetrics,
}
enrichedStage.EncodePrometheus("prometheus", promEncode)
dashboardConfigMap = b.makeMetricsDashboardConfigMap(dashboard)
}

b.addCustomExportStages(&enrichedStage)
return dashboardConfigMap, nil
}

func (b *builder) addConnectionTracking(indexFields []string, lastStage config.PipelineBuilderStage) ([]string, config.PipelineBuilderStage) {
outputFields := []api.OutputField{
{
Name: "Bytes",
Expand Down Expand Up @@ -484,108 +559,43 @@ func (b *builder) addTransformStages(stage *config.PipelineBuilderStage) (*corev
})

}
return indexFields, lastStage
}

// enrich stage (transform) configuration
enrichedStage := lastStage.TransformNetwork("enrich", api.TransformNetwork{
Rules: api.NetworkTransformRules{{
Input: "SrcAddr",
Output: "SrcK8S",
Type: api.AddKubernetesRuleType,
}, {
Input: "DstAddr",
Output: "DstK8S",
Type: api.AddKubernetesRuleType,
}, {
Type: api.ReinterpretDirectionRuleType,
}},
DirectionInfo: api.NetworkTransformDirectionInfo{
ReporterIPField: "AgentIP",
SrcHostField: "SrcK8S_HostIP",
DstHostField: "DstK8S_HostIP",
FlowDirectionField: "FlowDirection",
IfDirectionField: "IfDirection",
},
})

// loki stage (write) configuration
if helper.UseLoki(b.desired) {
lokiWrite := api.WriteLoki{
Labels: indexFields,
BatchSize: int(b.desired.Loki.BatchSize),
BatchWait: helper.UnstructuredDuration(b.desired.Loki.BatchWait),
MaxBackoff: helper.UnstructuredDuration(b.desired.Loki.MaxBackoff),
MaxRetries: int(helper.PtrInt32(b.desired.Loki.MaxRetries)),
MinBackoff: helper.UnstructuredDuration(b.desired.Loki.MinBackoff),
StaticLabels: model.LabelSet{},
Timeout: helper.UnstructuredDuration(b.desired.Loki.Timeout),
URL: b.desired.Loki.URL,
TimestampLabel: "TimeFlowEndMs",
TimestampScale: "1ms",
TenantID: b.desired.Loki.TenantID,
}

for k, v := range b.desired.Loki.StaticLabels {
lokiWrite.StaticLabels[model.LabelName(k)] = model.LabelValue(v)
}

var authorization *promConfig.Authorization
if helper.LokiUseHostToken(&b.desired.Loki) || helper.LokiForwardUserToken(&b.desired.Loki) {
b.volumes.AddToken(constants.FLPName)
authorization = &promConfig.Authorization{
Type: "Bearer",
CredentialsFile: constants.TokensPath + constants.FLPName,
}
}

if b.desired.Loki.TLS.Enable {
if b.desired.Loki.TLS.InsecureSkipVerify {
lokiWrite.ClientConfig = &promConfig.HTTPClientConfig{
Authorization: authorization,
TLSConfig: promConfig.TLSConfig{
InsecureSkipVerify: true,
},
}
} else {
caPath := b.volumes.AddCACertificate(&b.desired.Loki.TLS, "loki-certs")
lokiWrite.ClientConfig = &promConfig.HTTPClientConfig{
Authorization: authorization,
TLSConfig: promConfig.TLSConfig{
CAFile: caPath,
},
}
}
} else {
lokiWrite.ClientConfig = &promConfig.HTTPClientConfig{
Authorization: authorization,
}
}
enrichedStage.WriteLoki("loki", lokiWrite)
}
func (b *builder) addTransformFilter(lastStage config.PipelineBuilderStage) config.PipelineBuilderStage {
var clusterName string
transformFilterRules := []api.TransformFilterRule{}

// write on Stdout if logging trace enabled
if b.desired.Processor.LogLevel == "trace" {
enrichedStage.WriteStdout("stdout", api.WriteStdout{Format: "json"})
if b.desired.Processor.ClusterName != "" {
clusterName = b.desired.Processor.ClusterName
} else {
//take clustername from openshift
clusterName = string(golbals.DefaultClusterID)
}

// obtain encode_prometheus stage from metrics_definitions
promMetrics, dashboard, err := b.obtainMetricsConfiguration()
if err != nil {
return nil, err
if clusterName != "" {
transformFilterRules = []api.TransformFilterRule{
{
Input: "K8S_ClusterName",
Type: "add_field_if_doesnt_exist",
Value: clusterName,
},
}
}

var dashboardConfigMap *corev1.ConfigMap
if len(promMetrics) > 0 {
// prometheus stage (encode) configuration
promEncode := api.PromEncode{
Prefix: "netobserv_",
Metrics: promMetrics,
// Filter-out unused fields?
if helper.PtrBool(b.desired.Processor.DropUnusedFields) {
if helper.UseIPFIX(b.desired) {
rules := filters.GetOVSGoflowUnusedRules()
transformFilterRules = append(transformFilterRules, rules...)
}
enrichedStage.EncodePrometheus("prometheus", promEncode)
dashboardConfigMap = b.makeMetricsDashboardConfigMap(dashboard)
// Else: nothing for eBPF at the moment
}

b.addCustomExportStages(&enrichedStage)
return dashboardConfigMap, nil
if len(transformFilterRules) > 0 {
lastStage = lastStage.TransformFilter("filter", api.TransformFilter{
Rules: transformFilterRules,
})
}
return lastStage
}

func (b *builder) makeMetricsDashboardConfigMap(dashboard string) *corev1.ConfigMap {
Expand Down

0 comments on commit 1179cc9

Please sign in to comment.