Skip to content

Commit

Permalink
Refactor FLP builder (#494)
Browse files Browse the repository at this point in the history
Make it easier to build a FLP pipeline outside of the FLP reconciler.
This will be useful for NETOBSERV-627

- Create a new file flp_pipeline_builder.go that contains the
  pipeline building code, extracted from flp_common_objects.go
- Make some of these functions exported, for external usage
- Some API enhancements such as builder.NewGRPCPipeline for consumer
  friendlyness
  • Loading branch information
jotak authored Nov 20, 2023
1 parent 58e2e2a commit 21591e4
Show file tree
Hide file tree
Showing 8 changed files with 586 additions and 550 deletions.
482 changes: 74 additions & 408 deletions controllers/flowlogspipeline/flp_common_objects.go

Large diffs are not rendered by default.

28 changes: 6 additions & 22 deletions controllers/flowlogspipeline/flp_ingest_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
flowslatest "github.com/netobserv/network-observability-operator/api/v1beta2"
"github.com/netobserv/network-observability-operator/controllers/reconcilers"
"github.com/netobserv/network-observability-operator/pkg/helper"
Expand All @@ -18,7 +16,7 @@ type ingestBuilder struct {
}

func newIngestBuilder(info *reconcilers.Instance, desired *flowslatest.FlowCollectorSpec) (ingestBuilder, error) {
gen, err := newBuilder(info, desired, ConfKafkaIngester)
gen, err := NewBuilder(info, desired, ConfKafkaIngester)
return ingestBuilder{
generic: gen,
}, err
Expand All @@ -42,31 +40,17 @@ func (b *ingestBuilder) daemonSet(annotations map[string]string) *appsv1.DaemonS
}

func (b *ingestBuilder) configMap() (*corev1.ConfigMap, string, error) {
stages, params, err := b.buildPipelineConfig()
if err != nil {
return nil, "", err
}
return b.generic.configMap(stages, params)
}

func (b *ingestBuilder) buildPipelineConfig() ([]config.Stage, []config.StageParam, error) {
var pipeline config.PipelineBuilderStage
var pipeline PipelineBuilder
if helper.UseIPFIX(b.generic.desired) {
// IPFIX collector
pipeline = config.NewCollectorPipeline("ipfix", api.IngestCollector{
Port: int(b.generic.desired.Processor.Port),
HostName: "0.0.0.0",
})
pipeline = b.generic.NewIPFIXPipeline()
} else {
// GRPC collector (eBPF agent)
pipeline = config.NewGRPCPipeline("grpc", api.IngestGRPCProto{
Port: int(b.generic.desired.Processor.Port),
})
pipeline = b.generic.NewGRPCPipeline()
}

pipeline = b.generic.createKafkaWriteStage("kafka-write", &b.generic.desired.Kafka, &pipeline)

return pipeline.GetStages(), pipeline.GetStageParams(), nil
pipeline.AddKafkaWriteStage("kafka-write", &b.generic.desired.Kafka)
return b.generic.ConfigMap()
}

func (b *ingestBuilder) promService() *corev1.Service {
Expand Down
30 changes: 7 additions & 23 deletions controllers/flowlogspipeline/flp_monolith_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
flowslatest "github.com/netobserv/network-observability-operator/api/v1beta2"
"github.com/netobserv/network-observability-operator/controllers/reconcilers"
"github.com/netobserv/network-observability-operator/pkg/helper"
Expand All @@ -18,7 +16,7 @@ type monolithBuilder struct {
}

func newMonolithBuilder(info *reconcilers.Instance, desired *flowslatest.FlowCollectorSpec) (monolithBuilder, error) {
gen, err := newBuilder(info, desired, ConfMonolith)
gen, err := NewBuilder(info, desired, ConfMonolith)
return monolithBuilder{
generic: gen,
}, err
Expand All @@ -42,34 +40,20 @@ func (b *monolithBuilder) daemonSet(annotations map[string]string) *appsv1.Daemo
}

func (b *monolithBuilder) configMap() (*corev1.ConfigMap, string, error) {
stages, params, err := b.buildPipelineConfig()
if err != nil {
return nil, "", err
}
pipelineConfigMap, digest, err := b.generic.configMap(stages, params)
return pipelineConfigMap, digest, err
}

func (b *monolithBuilder) buildPipelineConfig() ([]config.Stage, []config.StageParam, error) {
var pipeline config.PipelineBuilderStage
var pipeline PipelineBuilder
if helper.UseIPFIX(b.generic.desired) {
// IPFIX collector
pipeline = config.NewCollectorPipeline("ipfix", api.IngestCollector{
Port: int(b.generic.desired.Processor.Port),
HostName: "0.0.0.0",
})
pipeline = b.generic.NewIPFIXPipeline()
} else {
// GRPC collector (eBPF agent)
pipeline = config.NewGRPCPipeline("grpc", api.IngestGRPCProto{
Port: int(b.generic.desired.Processor.Port),
})
pipeline = b.generic.NewGRPCPipeline()
}

err := b.generic.addTransformStages(&pipeline)
err := pipeline.AddProcessorStages()
if err != nil {
return nil, nil, err
return nil, "", err
}
return pipeline.GetStages(), pipeline.GetStageParams(), nil
return b.generic.ConfigMap()
}

func (b *monolithBuilder) promService() *corev1.Service {
Expand Down
2 changes: 1 addition & 1 deletion controllers/flowlogspipeline/flp_monolith_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (r *flpMonolithReconciler) reconcilePermissions(ctx context.Context, builde
if err := r.ReconcileClusterRole(ctx, cr); err != nil {
return err
}
cr = buildClusterRoleTransformer()
cr = BuildClusterRoleTransformer()
if err := r.ReconcileClusterRole(ctx, cr); err != nil {
return err
}
Expand Down
Loading

0 comments on commit 21591e4

Please sign in to comment.