Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed goflow references #103

Merged
merged 1 commit into from
May 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,17 @@ If you use OpenShift 4.10, you don't have anything to do: the operator will conf
### With upstream ovn-kubernetes (e.g. using KIND)

```bash
GF_IP=`kubectl get svc flowlogs-pipeline -n network-observability -ojsonpath='{.spec.clusterIP}'` && echo $GF_IP
kubectl set env daemonset/ovnkube-node -c ovnkube-node -n ovn-kubernetes OVN_IPFIX_TARGETS="$GF_IP:2055"
FLP_IP=`kubectl get svc flowlogs-pipeline -n network-observability -ojsonpath='{.spec.clusterIP}'` && echo $FLP_IP
kubectl set env daemonset/ovnkube-node -c ovnkube-node -n ovn-kubernetes OVN_IPFIX_TARGETS="$FLP_IP:2055"
```

### On older OpenShift with OVN-Kubernetes CNI

In OpenShift, a difference with the upstream `ovn-kubernetes` is that the flows export config is managed by the `ClusterNetworkOperator`.

```bash
GF_IP=`oc get svc flowlogs-pipeline -n network-observability -ojsonpath='{.spec.clusterIP}'` && echo $GF_IP
oc patch networks.operator.openshift.io cluster --type='json' -p "[{'op': 'add', 'path': '/spec', 'value': {'exportNetworkFlows': {'ipfix': { 'collectors': ['$GF_IP:2055']}}}}]"
FLP_IP=`oc get svc flowlogs-pipeline -n network-observability -ojsonpath='{.spec.clusterIP}'` && echo $FLP_IP
oc patch networks.operator.openshift.io cluster --type='json' -p "[{'op': 'add', 'path': '/spec', 'value': {'exportNetworkFlows': {'ipfix': { 'collectors': ['$FLP_IP:2055']}}}}]"
```

## Installing Loki
Expand Down Expand Up @@ -158,7 +158,7 @@ oc patch console.operator.openshift.io cluster --type='json' -p '[{"op": "add",
The plugin provides new views in the OpenShift Console: a new submenu _Network Traffic_ in _Observe_, and new tabs in several details views (Pods, Deployments, Services...).

![Main view](./docs/assets/network-traffic-main.png)
_Main view_
_Main view_

![Pod traffic](./docs/assets/network-traffic-pod.png)
_Pod traffic_
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,17 +121,17 @@ spec:
You need to explicitly turn on IPFIX export for the `ClusterNetworkOperator`:

```
GF_IP=`oc get svc flowlogs-pipeline -n network-observability -ojsonpath='{.spec.clusterIP}'` && echo $GF_IP
oc patch networks.operator.openshift.io cluster --type='json' -p "[{'op': 'add', 'path': '/spec', 'value': {'exportNetworkFlows': {'ipfix': { 'collectors': ['$GF_IP:2055']}}}}]"
FLP_IP=`oc get svc flowlogs-pipeline -n network-observability -ojsonpath='{.spec.clusterIP}'` && echo $FLP_IP
oc patch networks.operator.openshift.io cluster --type='json' -p "[{'op': 'add', 'path': '/spec', 'value': {'exportNetworkFlows': {'ipfix': { 'collectors': ['$FLP_IP:2055']}}}}]"
```
To turn it off, remove the `exportNetworkFlows` from `networks.operator.openshift.io/cluster`.

#### Generic Kubernetes with ovn-kubernetes
You need to explicitly turn on IPFIX export in ovn-kubernetes:

```
GF_IP=`kubectl get svc flowlogs-pipeline -n network-observability -ojsonpath='{.spec.clusterIP}'` && echo $GF_IP
kubectl set env daemonset/ovnkube-node -c ovnkube-node -n ovn-kubernetes OVN_IPFIX_TARGETS="$GF_IP:2055"
FLP_IP=`kubectl get svc flowlogs-pipeline -n network-observability -ojsonpath='{.spec.clusterIP}'` && echo $FLP_IP
kubectl set env daemonset/ovnkube-node -c ovnkube-node -n ovn-kubernetes OVN_IPFIX_TARGETS="$FLP_IP:2055"
```

To turn it off, remove the `OVN_IPFIX_TARGETS` env from `daemonset/ovnkube-node`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,17 @@ spec:
You need to explicitly turn on IPFIX export for the `ClusterNetworkOperator`:

```
GF_IP=`oc get svc flowlogs-pipeline -n network-observability -ojsonpath='{.spec.clusterIP}'` && echo $GF_IP
oc patch networks.operator.openshift.io cluster --type='json' -p "[{'op': 'add', 'path': '/spec', 'value': {'exportNetworkFlows': {'ipfix': { 'collectors': ['$GF_IP:2055']}}}}]"
FLP_IP=`oc get svc flowlogs-pipeline -n network-observability -ojsonpath='{.spec.clusterIP}'` && echo $FLP_IP
oc patch networks.operator.openshift.io cluster --type='json' -p "[{'op': 'add', 'path': '/spec', 'value': {'exportNetworkFlows': {'ipfix': { 'collectors': ['$FLP_IP:2055']}}}}]"
```
To turn it off, remove the `exportNetworkFlows` from `networks.operator.openshift.io/cluster`.

#### Generic Kubernetes with ovn-kubernetes
You need to explicitly turn on IPFIX export in ovn-kubernetes:

```
GF_IP=`kubectl get svc flowlogs-pipeline -n network-observability -ojsonpath='{.spec.clusterIP}'` && echo $GF_IP
kubectl set env daemonset/ovnkube-node -c ovnkube-node -n ovn-kubernetes OVN_IPFIX_TARGETS="$GF_IP:2055"
FLP_IP=`kubectl get svc flowlogs-pipeline -n network-observability -ojsonpath='{.spec.clusterIP}'` && echo $FLP_IP
kubectl set env daemonset/ovnkube-node -c ovnkube-node -n ovn-kubernetes OVN_IPFIX_TARGETS="$FLP_IP:2055"
```

To turn it off, remove the `OVN_IPFIX_TARGETS` env from `daemonset/ovnkube-node`.
Expand Down
14 changes: 7 additions & 7 deletions controllers/flowcollector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,22 +116,22 @@ func (r *FlowCollectorReconciler) Reconcile(ctx context.Context, req ctrl.Reques
previousNamespace := desired.Status.Namespace

// Create reconcilers
gfReconciler := flowlogspipeline.NewReconciler(clientHelper, ns, previousNamespace)
flpReconciler := flowlogspipeline.NewReconciler(clientHelper, ns, previousNamespace)
var cpReconciler consoleplugin.CPReconciler
if r.consoleAvailable {
cpReconciler = consoleplugin.NewReconciler(clientHelper, ns, previousNamespace)
}

// Check namespace changed
if ns != previousNamespace {
if err := r.handleNamespaceChanged(ctx, previousNamespace, ns, desired, &gfReconciler, &cpReconciler); err != nil {
if err := r.handleNamespaceChanged(ctx, previousNamespace, ns, desired, &flpReconciler, &cpReconciler); err != nil {
log.Error(err, "Failed to handle namespace change")
return ctrl.Result{}, err
}
}

// Flowlogs-pipeline
if err := gfReconciler.Reconcile(ctx, desired); err != nil {
if err := flpReconciler.Reconcile(ctx, desired); err != nil {
log.Error(err, "Failed to reconcile flowlogs-pipeline")
return ctrl.Result{}, err
}
Expand All @@ -142,7 +142,7 @@ func (r *FlowCollectorReconciler) Reconcile(ctx context.Context, req ctrl.Reques
desired.Spec.ClusterNetworkOperator.Namespace,
ovsFlowsConfigMapName,
r.lookupIP)
if err := ovsConfigController.Reconcile(ctx, desired, gfReconciler.GetServiceName(desired.Spec.Kafka)); err != nil {
if err := ovsConfigController.Reconcile(ctx, desired, flpReconciler.GetServiceName(desired.Spec.Kafka)); err != nil {
return ctrl.Result{},
fmt.Errorf("failed to reconcile ovs-flows-config ConfigMap: %w", err)
}
Expand Down Expand Up @@ -170,14 +170,14 @@ func (r *FlowCollectorReconciler) handleNamespaceChanged(
ctx context.Context,
oldNS, newNS string,
desired *flowsv1alpha1.FlowCollector,
gfReconciler *flowlogspipeline.FLPReconciler,
flpReconciler *flowlogspipeline.FLPReconciler,
cpReconciler *consoleplugin.CPReconciler,
) error {
log := log.FromContext(ctx)
if oldNS == "" {
// First install: create one-shot resources
log.Info("FlowCollector first install: creating initial resources")
err := gfReconciler.InitStaticResources(ctx)
err := flpReconciler.InitStaticResources(ctx)
if err != nil {
return err
}
Expand All @@ -190,7 +190,7 @@ func (r *FlowCollectorReconciler) handleNamespaceChanged(
} else {
// Namespace updated, clean up previous namespace
log.Info("FlowCollector namespace change detected: cleaning up previous namespace and preparing next one", "old namespace", oldNS, "new namepace", newNS)
err := gfReconciler.PrepareNamespaceChange(ctx)
err := flpReconciler.PrepareNamespaceChange(ctx)
if err != nil {
return err
}
Expand Down
60 changes: 30 additions & 30 deletions controllers/flowcollector_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,19 @@ func flowCollectorControllerSpecs() {
Name: "ovs-flows-config",
Namespace: "openshift-network-operator",
}
gfKey1 := types.NamespacedName{
flpKey1 := types.NamespacedName{
Name: constants.FLPName,
Namespace: operatorNamespace,
}
gfKey2 := types.NamespacedName{
flpKey2 := types.NamespacedName{
Name: constants.FLPName,
Namespace: otherNamespace,
}
gfKeyKafkaIngester := types.NamespacedName{
flpKeyKafkaIngester := types.NamespacedName{
Name: constants.FLPName + flowlogspipeline.FlpConfSuffix[flowlogspipeline.ConfKafkaIngester],
Namespace: operatorNamespace,
}
gfKeyKafkaTransformer := types.NamespacedName{
flpKeyKafkaTransformer := types.NamespacedName{
Name: constants.FLPName + flowlogspipeline.FlpConfSuffix[flowlogspipeline.ConfKafkaTransformer],
Namespace: operatorNamespace,
}
Expand Down Expand Up @@ -145,7 +145,7 @@ func flowCollectorControllerSpecs() {
By("Expecting to create the flowlogs-pipeline Deployment")
Eventually(func() interface{} {
dp := appsv1.Deployment{}
if err := k8sClient.Get(ctx, gfKey1, &dp); err != nil {
if err := k8sClient.Get(ctx, flpKey1, &dp); err != nil {
return err
}
oldDigest = dp.Spec.Template.Annotations[flowlogspipeline.PodConfigurationDigest]
Expand All @@ -159,7 +159,7 @@ func flowCollectorControllerSpecs() {
svc := v1.Service{}
By("Expecting to create the flowlogs-pipeline Service")
Eventually(func() interface{} {
if err := k8sClient.Get(ctx, gfKey1, &svc); err != nil {
if err := k8sClient.Get(ctx, flpKey1, &svc); err != nil {
return err
}
return svc
Expand All @@ -174,7 +174,7 @@ func flowCollectorControllerSpecs() {
By("Expecting to create the flowlogs-pipeline ServiceAccount")
Eventually(func() interface{} {
svcAcc := v1.ServiceAccount{}
if err := k8sClient.Get(ctx, gfKey1, &svcAcc); err != nil {
if err := k8sClient.Get(ctx, flpKey1, &svcAcc); err != nil {
return err
}
return svcAcc
Expand Down Expand Up @@ -212,7 +212,7 @@ func flowCollectorControllerSpecs() {
By("Expecting updated flowlogs-pipeline Service port")
Eventually(func() interface{} {
svc := v1.Service{}
if err := k8sClient.Get(ctx, gfKey1, &svc); err != nil {
if err := k8sClient.Get(ctx, flpKey1, &svc); err != nil {
return err
}
return svc.Spec.Ports[0].Port
Expand Down Expand Up @@ -246,7 +246,7 @@ func flowCollectorControllerSpecs() {
By("Expecting that the flowlogsPipeline.PodConfigurationDigest attribute has changed")
Eventually(func() error {
dp := appsv1.Deployment{}
if err := k8sClient.Get(ctx, gfKey1, &dp); err != nil {
if err := k8sClient.Get(ctx, flpKey1, &dp); err != nil {
return err
}
currentConfigDigest := dp.Spec.Template.Annotations[flowlogspipeline.PodConfigurationDigest]
Expand All @@ -260,7 +260,7 @@ func flowCollectorControllerSpecs() {

It("Should autoscale when the HPA options change", func() {
hpa := ascv2.HorizontalPodAutoscaler{}
Expect(k8sClient.Get(ctx, gfKey1, &hpa)).To(Succeed())
Expect(k8sClient.Get(ctx, flpKey1, &hpa)).To(Succeed())
Expect(*hpa.Spec.MinReplicas).To(Equal(int32(1)))
Expect(hpa.Spec.MaxReplicas).To(Equal(int32(1)))
Expect(*hpa.Spec.Metrics[0].Resource.Target.AverageUtilization).To(Equal(int32(90)))
Expand All @@ -273,7 +273,7 @@ func flowCollectorControllerSpecs() {

By("Changing the Horizontal Pod Autoscaler instance")
Eventually(func() error {
if err := k8sClient.Get(ctx, gfKey1, &hpa); err != nil {
if err := k8sClient.Get(ctx, flpKey1, &hpa); err != nil {
return err
}
if *hpa.Spec.MinReplicas != int32(2) || hpa.Spec.MaxReplicas != int32(2) ||
Expand Down Expand Up @@ -321,7 +321,7 @@ func flowCollectorControllerSpecs() {
}))

ds := appsv1.DaemonSet{}
Expect(k8sClient.Get(ctx, gfKey1, &ds)).To(Succeed())
Expect(k8sClient.Get(ctx, flpKey1, &ds)).To(Succeed())

oldConfigDigest = ds.Spec.Template.Annotations[flowlogspipeline.PodConfigurationDigest]
Expect(oldConfigDigest).ToNot(BeEmpty())
Expand Down Expand Up @@ -370,7 +370,7 @@ func flowCollectorControllerSpecs() {
By("Expecting that the flowlogsPipeline.PodConfigurationDigest attribute has changed")
Eventually(func() error {
dp := appsv1.DaemonSet{}
if err := k8sClient.Get(ctx, gfKey1, &dp); err != nil {
if err := k8sClient.Get(ctx, flpKey1, &dp); err != nil {
return err
}
currentConfigDigest := dp.Spec.Template.Annotations[flowlogspipeline.PodConfigurationDigest]
Expand Down Expand Up @@ -398,24 +398,24 @@ func flowCollectorControllerSpecs() {
It("Should deploy kafka ingester and transformer", func() {
By("Expecting ingester daemonset to be created")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKeyKafkaIngester, &appsv1.DaemonSet{})
return k8sClient.Get(ctx, flpKeyKafkaIngester, &appsv1.DaemonSet{})
}, timeout, interval).Should(Succeed())

By("Expecting transformer deployment to be created")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKeyKafkaTransformer, &appsv1.Deployment{})
return k8sClient.Get(ctx, flpKeyKafkaTransformer, &appsv1.Deployment{})
}, timeout, interval).Should(Succeed())

By("Not Expecting transformer service to be created")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKeyKafkaTransformer, &v1.Service{})
return k8sClient.Get(ctx, flpKeyKafkaTransformer, &v1.Service{})
}, timeout, interval).Should(MatchError(`services "flowlogs-pipeline-transformer" not found`))
})

It("Should delete previous flp deployment", func() {
By("Expecting deployment to be deleted")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKey1, &appsv1.DaemonSet{})
return k8sClient.Get(ctx, flpKey1, &appsv1.DaemonSet{})
}, timeout, interval).Should(MatchError(`daemonsets.apps "flowlogs-pipeline" not found`))
})

Expand All @@ -433,19 +433,19 @@ func flowCollectorControllerSpecs() {
It("Should deploy single flp again", func() {
By("Expecting daemonset to be created")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKey1, &appsv1.DaemonSet{})
return k8sClient.Get(ctx, flpKey1, &appsv1.DaemonSet{})
}, timeout, interval).Should(Succeed())
})

It("Should delete kafka ingester and transformer", func() {
By("Expecting ingester daemonset to be deleted")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKeyKafkaIngester, &appsv1.DaemonSet{})
return k8sClient.Get(ctx, flpKeyKafkaIngester, &appsv1.DaemonSet{})
}, timeout, interval).Should(MatchError(`daemonsets.apps "flowlogs-pipeline-ingester" not found`))

By("Expecting transformer deployment to be deleted")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKeyKafkaTransformer, &appsv1.Deployment{})
return k8sClient.Get(ctx, flpKeyKafkaTransformer, &appsv1.Deployment{})
}, timeout, interval).Should(MatchError(`deployments.apps "flowlogs-pipeline-transformer" not found`))
})

Expand All @@ -471,37 +471,37 @@ func flowCollectorControllerSpecs() {
It("Should redeploy goglow-kube in new namespace", func() {
By("Expecting daemonset in previous namespace to be deleted")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKey1, &appsv1.DaemonSet{})
return k8sClient.Get(ctx, flpKey1, &appsv1.DaemonSet{})
}, timeout, interval).Should(MatchError(`daemonsets.apps "flowlogs-pipeline" not found`))

By("Expecting deployment in previous namespace to be deleted")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKey1, &appsv1.Deployment{})
return k8sClient.Get(ctx, flpKey1, &appsv1.Deployment{})
}, timeout, interval).Should(MatchError(`deployments.apps "flowlogs-pipeline" not found`))

By("Expecting service in previous namespace to be deleted")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKey1, &v1.Service{})
return k8sClient.Get(ctx, flpKey1, &v1.Service{})
}, timeout, interval).Should(MatchError(`services "flowlogs-pipeline" not found`))

By("Expecting service account in previous namespace to be deleted")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKey1, &v1.ServiceAccount{})
return k8sClient.Get(ctx, flpKey1, &v1.ServiceAccount{})
}, timeout, interval).Should(MatchError(`serviceaccounts "flowlogs-pipeline" not found`))

By("Expecting deployment to be created in new namespace")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKey2, &appsv1.Deployment{})
return k8sClient.Get(ctx, flpKey2, &appsv1.Deployment{})
}, timeout, interval).Should(Succeed())

By("Expecting service to be created in new namespace")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKey2, &v1.Service{})
return k8sClient.Get(ctx, flpKey2, &v1.Service{})
}, timeout, interval).Should(Succeed())

By("Expecting service account to be created in new namespace")
Eventually(func() interface{} {
return k8sClient.Get(ctx, gfKey2, &v1.ServiceAccount{})
return k8sClient.Get(ctx, flpKey2, &v1.ServiceAccount{})
}, timeout, interval).Should(Succeed())
})

Expand Down Expand Up @@ -572,21 +572,21 @@ func flowCollectorControllerSpecs() {
By("Expecting flowlogs-pipeline deployment to be garbage collected")
Eventually(func() interface{} {
d := appsv1.Deployment{}
_ = k8sClient.Get(ctx, gfKey2, &d)
_ = k8sClient.Get(ctx, flpKey2, &d)
return &d
}, timeout, interval).Should(BeGarbageCollectedBy(&flowCR))

By("Expecting flowlogs-pipeline service to be garbage collected")
Eventually(func() interface{} {
svc := v1.Service{}
_ = k8sClient.Get(ctx, gfKey2, &svc)
_ = k8sClient.Get(ctx, flpKey2, &svc)
return &svc
}, timeout, interval).Should(BeGarbageCollectedBy(&flowCR))

By("Expecting flowlogs-pipeline service account to be garbage collected")
Eventually(func() interface{} {
svcAcc := v1.ServiceAccount{}
_ = k8sClient.Get(ctx, gfKey2, &svcAcc)
_ = k8sClient.Get(ctx, flpKey2, &svcAcc)
return &svcAcc
}, timeout, interval).Should(BeGarbageCollectedBy(&flowCR))

Expand Down
4 changes: 2 additions & 2 deletions controllers/flowlogspipeline/flp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,8 @@ func TestAutoScalerUpdateCheck(t *testing.T) {
func TestLabels(t *testing.T) {
assert := assert.New(t)

gfk := getFLPConfig()
builder := newBuilder("ns", corev1.ProtocolUDP, &gfk, nil, ConfSingle)
flpk := getFLPConfig()
builder := newBuilder("ns", corev1.ProtocolUDP, &flpk, nil, ConfSingle)

// Deployment
depl := builder.deployment("digest")
Expand Down