Skip to content

Commit

Permalink
Moved kafka config in flowcollector spec
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierCazade committed May 2, 2022
1 parent 2caa1c4 commit 38edcb6
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 90 deletions.
10 changes: 5 additions & 5 deletions api/v1alpha1/flowcollector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ type FlowCollectorSpec struct {
// Loki contains settings related to the loki client
Loki FlowCollectorLoki `json:"loki,omitempty"`

// Kafka configurations, if empty the operator will deploy a all-in-one FLP
// +optional
Kafka *FlowCollectorKafka `json:"kafka,omitempty"`

// ConsolePlugin contains settings related to the console dynamic plugin
ConsolePlugin FlowCollectorConsolePlugin `json:"consolePlugin,omitempty"`

Expand Down Expand Up @@ -165,7 +169,7 @@ type FlowCollectorKafka struct {
Address string `json:"address"`

//+kubebuilder:default:=""
// Address of the kafka topic to use
// Kafka topic to use
Topic string `json:"topic"`
}

Expand Down Expand Up @@ -231,10 +235,6 @@ type FlowCollectorFLP struct {
//+kubebuilder:default:=true
// EnableKubeProbes is a flag to enable or disable Kubernetes liveness/readiness probes
EnableKubeProbes bool `json:"enableKubeProbes,omitempty"`

// Kafka configurations, if empty the operator will deploy a all-in-one FLP
// +optional
Kafka *FlowCollectorKafka `json:"kafka,omitempty"`
}

type FlowCollectorHPA struct {
Expand Down
10 changes: 5 additions & 5 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 16 additions & 16 deletions config/crd/bases/flows.netobserv.io_flowcollectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1338,22 +1338,6 @@ spec:
- Always
- Never
type: string
kafka:
description: Kafka configurations, if empty the operator will
deploy a all-in-one FLP
properties:
address:
default: ""
description: Address of the kafka server
type: string
topic:
default: ""
description: Address of the kafka topic to use
type: string
required:
- address
- topic
type: object
kind:
default: DaemonSet
description: Kind is the workload kind, either DaemonSet or Deployment
Expand Down Expand Up @@ -1461,6 +1445,22 @@ spec:
minimum: 0
type: integer
type: object
kafka:
description: Kafka configurations, if empty the operator will deploy
a all-in-one FLP
properties:
address:
default: ""
description: Address of the kafka server
type: string
topic:
default: ""
description: Kafka topic to use
type: string
required:
- address
- topic
type: object
loki:
description: Loki contains settings related to the loki client
properties:
Expand Down
2 changes: 1 addition & 1 deletion controllers/flowcollector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (r *FlowCollectorReconciler) Reconcile(ctx context.Context, req ctrl.Reques
desired.Spec.ClusterNetworkOperator.Namespace,
ovsFlowsConfigMapName,
r.lookupIP)
if err := ovsConfigController.Reconcile(ctx, desired, gfReconciler.GetServiceName(&desired.Spec.FlowlogsPipeline)); err != nil {
if err := ovsConfigController.Reconcile(ctx, desired, gfReconciler.GetServiceName(desired.Spec.Kafka)); err != nil {
return ctrl.Result{},
fmt.Errorf("failed to reconcile ovs-flows-config ConfigMap: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions controllers/flowcollector_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func flowCollectorControllerSpecs() {
if err := k8sClient.Get(ctx, crKey, &fc); err != nil {
return err
}
fc.Spec.FlowlogsPipeline.Kafka = &flowsv1alpha1.FlowCollectorKafka{Address: "loaclhost:9092", Topic: "FLP"}
fc.Spec.Kafka = &flowsv1alpha1.FlowCollectorKafka{Address: "loaclhost:9092", Topic: "FLP"}
return k8sClient.Update(ctx, &fc)
}).Should(Succeed())
})
Expand Down Expand Up @@ -429,7 +429,7 @@ func flowCollectorControllerSpecs() {
if err := k8sClient.Get(ctx, crKey, &fc); err != nil {
return err
}
fc.Spec.FlowlogsPipeline.Kafka = nil
fc.Spec.Kafka = nil
return k8sClient.Update(ctx, &fc)
}).Should(Succeed())
})
Expand Down
14 changes: 7 additions & 7 deletions controllers/flowlogspipeline/flp_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ func validateDesired(desired *flpSpec) error {
return nil
}

func (r *FLPReconciler) GetServiceName(desiredFLP *flpSpec) string {
if single, _ := checkDeployNeeded(desiredFLP, ConfKafkaIngestor); single {
func (r *FLPReconciler) GetServiceName(kafka *flowsv1alpha1.FlowCollectorKafka) string {
if single, _ := checkDeployNeeded(kafka, ConfKafkaIngestor); single {
return constants.FLPName + FlpConfSuffix[ConfKafkaIngestor]
}
return constants.FLPName + FlpConfSuffix[ConfSingle]
Expand All @@ -121,15 +121,15 @@ func (r *FLPReconciler) Reconcile(ctx context.Context, desired *flowsv1alpha1.Fl
}

// Check if a configKind should be deployed
func checkDeployNeeded(desiredFLP *flpSpec, confKind string) (bool, error) {
func checkDeployNeeded(kafka *flowsv1alpha1.FlowCollectorKafka, confKind string) (bool, error) {
switch confKind {
case ConfSingle:
return desiredFLP.Kafka == nil, nil
return kafka == nil, nil
case ConfKafkaTransformer:
return desiredFLP.Kafka != nil, nil
return kafka != nil, nil
case ConfKafkaIngestor:
//TODO should be disabled if ebpf-agent is enabled with kafka
return desiredFLP.Kafka != nil, nil
return kafka != nil, nil
default:
return false, fmt.Errorf("unknown flowlogs-pipelines config kind")
}
Expand All @@ -143,7 +143,7 @@ func (r *singleFLPReconciler) Reconcile(ctx context.Context, desired *flowsv1alp
if err != nil {
return err
}
shouldDeploy, err := checkDeployNeeded(desiredFLP, r.confKind)
shouldDeploy, err := checkDeployNeeded(desired.Spec.Kafka, r.confKind)
if err != nil {
return err
}
Expand Down
16 changes: 7 additions & 9 deletions controllers/flowlogspipeline/flp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,28 +442,26 @@ func TestLabels(t *testing.T) {
func TestDeployNeeded(t *testing.T) {
assert := assert.New(t)

flp := getFLPConfig()

// Kafka not configured
res, err := checkDeployNeeded(&flp, ConfSingle)
res, err := checkDeployNeeded(nil, ConfSingle)
assert.True(res)
assert.NoError(err)
res, err = checkDeployNeeded(&flp, ConfKafkaIngestor)
res, err = checkDeployNeeded(nil, ConfKafkaIngestor)
assert.False(res)
assert.NoError(err)
res, err = checkDeployNeeded(&flp, ConfKafkaTransformer)
res, err = checkDeployNeeded(nil, ConfKafkaTransformer)
assert.False(res)
assert.NoError(err)

// Kafka not configured
flp.Kafka = &flowsv1alpha1.FlowCollectorKafka{Address: "loaclhost:9092", Topic: "FLP"}
res, err = checkDeployNeeded(&flp, ConfSingle)
kafka := &flowsv1alpha1.FlowCollectorKafka{Address: "loaclhost:9092", Topic: "FLP"}
res, err = checkDeployNeeded(kafka, ConfSingle)
assert.False(res)
assert.NoError(err)
res, err = checkDeployNeeded(&flp, ConfKafkaIngestor)
res, err = checkDeployNeeded(kafka, ConfKafkaIngestor)
assert.True(res)
assert.NoError(err)
res, err = checkDeployNeeded(&flp, ConfKafkaTransformer)
res, err = checkDeployNeeded(kafka, ConfKafkaTransformer)
assert.True(res)
assert.NoError(err)

Expand Down
90 changes: 45 additions & 45 deletions docs/FlowCollector.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ FlowCollectorSpec defines the desired state of FlowCollector
<i>Default</i>: map[sampling:400]<br/>
</td>
<td>false</td>
</tr><tr>
<td><b><a href="#flowcollectorspeckafka">kafka</a></b></td>
<td>object</td>
<td>
Kafka configurations, if empty the operator will deploy a all-in-one FLP<br/>
</td>
<td>false</td>
</tr><tr>
<td><b><a href="#flowcollectorspecloki">loki</a></b></td>
<td>object</td>
Expand Down Expand Up @@ -1471,13 +1478,6 @@ FlowlogsPipeline contains settings related to the flowlogs-pipeline component
<i>Default</i>: IfNotPresent<br/>
</td>
<td>false</td>
</tr><tr>
<td><b><a href="#flowcollectorspecflowlogspipelinekafka">kafka</a></b></td>
<td>object</td>
<td>
Kafka configurations, if empty the operator will deploy a all-in-one FLP<br/>
</td>
<td>false</td>
</tr><tr>
<td><b>kind</b></td>
<td>enum</td>
Expand Down Expand Up @@ -2455,44 +2455,6 @@ target specifies the target value for the given metric
</table>


### FlowCollector.spec.flowlogsPipeline.kafka
<sup><sup>[↩ Parent](#flowcollectorspecflowlogspipeline)</sup></sup>



Kafka configurations, if empty the operator will deploy a all-in-one FLP

<table>
<thead>
<tr>
<th>Name</th>
<th>Type</th>
<th>Description</th>
<th>Required</th>
</tr>
</thead>
<tbody><tr>
<td><b>address</b></td>
<td>string</td>
<td>
Address of the kafka server<br/>
<br/>
<i>Default</i>: <br/>
</td>
<td>true</td>
</tr><tr>
<td><b>topic</b></td>
<td>string</td>
<td>
Address of the kafka topic to use<br/>
<br/>
<i>Default</i>: <br/>
</td>
<td>true</td>
</tr></tbody>
</table>


### FlowCollector.spec.flowlogsPipeline.resources
<sup><sup>[↩ Parent](#flowcollectorspecflowlogspipeline)</sup></sup>

Expand Down Expand Up @@ -2578,6 +2540,44 @@ IPFIX contains the settings of an IPFIX-based flow reporter when the "agent" pro
</table>


### FlowCollector.spec.kafka
<sup><sup>[↩ Parent](#flowcollectorspec)</sup></sup>



Kafka configurations, if empty the operator will deploy a all-in-one FLP

<table>
<thead>
<tr>
<th>Name</th>
<th>Type</th>
<th>Description</th>
<th>Required</th>
</tr>
</thead>
<tbody><tr>
<td><b>address</b></td>
<td>string</td>
<td>
Address of the kafka server<br/>
<br/>
<i>Default</i>: <br/>
</td>
<td>true</td>
</tr><tr>
<td><b>topic</b></td>
<td>string</td>
<td>
Kafka topic to use<br/>
<br/>
<i>Default</i>: <br/>
</td>
<td>true</td>
</tr></tbody>
</table>


### FlowCollector.spec.loki
<sup><sup>[↩ Parent](#flowcollectorspec)</sup></sup>

Expand Down

0 comments on commit 38edcb6

Please sign in to comment.