Skip to content

Commit

Permalink
add cluster ID to flow logs (#386)
Browse files Browse the repository at this point in the history
  • Loading branch information
KalmanMeth authored Aug 4, 2023
1 parent 5cb41cb commit 9d208dd
Show file tree
Hide file tree
Showing 12 changed files with 206 additions and 124 deletions.
1 change: 1 addition & 0 deletions api/v1alpha1/zz_generated.conversion.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions api/v1beta1/flowcollector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,11 @@ type FlowCollectorFLP struct {
// `conversationTerminatingTimeout` is the time to wait from detected FIN flag to end a conversation. Only relevant for TCP flows.
ConversationTerminatingTimeout *metav1.Duration `json:"conversationTerminatingTimeout,omitempty"`

//+kubebuilder:default:=""
// +optional
// `clusterName` is the name of the cluster to appear in the flows data. This is useful in a multi-cluster context. When using OpenShift, leave empty to make it automatically determined.
ClusterName string `json:"clusterName,omitempty"`

// `debug` allows setting some aspects of the internal configuration of the flow processor.
// This section is aimed exclusively for debugging and fine-grained performance optimizations,
// such as GOGC and GOMAXPROCS env vars. Users setting its values do it at their own risk.
Expand Down
6 changes: 6 additions & 0 deletions bundle/manifests/flows.netobserv.io_flowcollectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3793,6 +3793,12 @@ spec:
and forwards them to the Loki persistence layer and/or any available
exporter.'
properties:
clusterName:
default: ""
description: '`clusterName` is the name of the cluster to appear
in the flows data. This is useful in a multi-cluster context.
When using OpenShift, leave empty to make it automatically determined.'
type: string
conversationEndTimeout:
default: 10s
description: '`conversationEndTimeout` is the time to wait after
Expand Down
21 changes: 10 additions & 11 deletions bundle/manifests/netobserv-operator.clusterserviceversion.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,14 @@ spec:
- patch
- update
- watch
- apiGroups:
- config.openshift.io
resources:
- clusterversions
verbs:
- get
- list
- watch
- apiGroups:
- console.openshift.io
resources:
Expand Down Expand Up @@ -614,24 +622,15 @@ spec:
- rbac.authorization.k8s.io
resources:
- clusterrolebindings
- rolebindings
verbs:
- create
- delete
- get
- list
- update
- watch
- apiGroups:
- rbac.authorization.k8s.io
resources:
- clusterroles
- rolebindings
- roles
verbs:
- create
- delete
- get
- list
- update
- watch
- apiGroups:
- security.openshift.io
Expand Down
6 changes: 6 additions & 0 deletions config/crd/bases/flows.netobserv.io_flowcollectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3780,6 +3780,12 @@ spec:
and forwards them to the Loki persistence layer and/or any available
exporter.'
properties:
clusterName:
default: ""
description: '`clusterName` is the name of the cluster to appear
in the flows data. This is useful in a multi-cluster context.
When using OpenShift, leave empty to make it automatically determined.'
type: string
conversationEndTimeout:
default: 10s
description: '`conversationEndTimeout` is the time to wait after
Expand Down
21 changes: 10 additions & 11 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ rules:
- patch
- update
- watch
- apiGroups:
- config.openshift.io
resources:
- clusterversions
verbs:
- get
- list
- watch
- apiGroups:
- console.openshift.io
resources:
Expand Down Expand Up @@ -155,24 +163,15 @@ rules:
- rbac.authorization.k8s.io
resources:
- clusterrolebindings
- rolebindings
verbs:
- create
- delete
- get
- list
- update
- watch
- apiGroups:
- rbac.authorization.k8s.io
resources:
- clusterroles
- rolebindings
- roles
verbs:
- create
- delete
- get
- list
- update
- watch
- apiGroups:
- security.openshift.io
Expand Down
2 changes: 2 additions & 0 deletions config/samples/flows_v1beta1_flowcollector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ spec:
conversationTerminatingTimeout: 5s
conversationHeartbeatInterval: 30s
conversationEndTimeout: 10s
# Append a unique cluster name to each record
# clusterName: <CLUSTER NAME>
kafka:
address: "kafka-cluster-kafka-bootstrap.netobserv"
topic: network-flows
Expand Down
18 changes: 16 additions & 2 deletions controllers/flowcollector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"net"

"github.com/netobserv/network-observability-operator/controllers/globals"
configv1 "github.com/openshift/api/config/v1"
osv1alpha1 "github.com/openshift/api/console/v1alpha1"
securityv1 "github.com/openshift/api/security/v1"
appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -64,8 +66,7 @@ func NewFlowCollectorReconciler(client client.Client, scheme *runtime.Scheme, co
//+kubebuilder:rbac:groups=apps,resources=deployments;daemonsets,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=namespaces;services;serviceaccounts;configmaps;secrets,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=endpoints,verbs=get;list;watch
//+kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=clusterroles;roles,verbs=get;create;delete;watch;list
//+kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=clusterrolebindings;rolebindings,verbs=get;list;create;delete;update;watch
//+kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=clusterrolebindings;clusterroles;rolebindings;roles,verbs=get;list;create;delete;update;watch
//+kubebuilder:rbac:groups=console.openshift.io,resources=consoleplugins,verbs=get;create;delete;update;patch;list;watch
//+kubebuilder:rbac:groups=operator.openshift.io,resources=consoles,verbs=get;update;list;update;watch
//+kubebuilder:rbac:groups=flows.netobserv.io,resources=flowcollectors,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -75,6 +76,7 @@ func NewFlowCollectorReconciler(client client.Client, scheme *runtime.Scheme, co
//+kubebuilder:rbac:groups=security.openshift.io,resources=securitycontextconstraints,verbs=list;create;update;watch
//+kubebuilder:rbac:groups=apiregistration.k8s.io,resources=apiservices,verbs=list;get;watch
//+kubebuilder:rbac:groups=monitoring.coreos.com,resources=servicemonitors;prometheusrules,verbs=get;create;delete;update;patch;list;watch
//+kubebuilder:rbac:groups=config.openshift.io,resources=clusterversions,verbs=get;list;watch
//+kubebuilder:rbac:urls="/metrics",verbs=get

// Reconcile is part of the main kubernetes reconciliation loop which aims to
Expand All @@ -101,6 +103,18 @@ func (r *FlowCollectorReconciler) Reconcile(ctx context.Context, _ ctrl.Request)

var didChange, isInProgress bool
previousNamespace := desired.Status.Namespace

// obtain default cluster ID - api is specific to openshift
if r.permissions.Vendor(ctx) == discover.VendorOpenShift && globals.DefaultClusterID == "" {
cversion := &configv1.ClusterVersion{}
key := client.ObjectKey{Name: "version"}
if err := r.Client.Get(ctx, key, cversion); err != nil {
log.Error(err, "unable to obtain cluster ID")
} else {
globals.DefaultClusterID = cversion.Spec.ClusterID
}
}

reconcilersInfo := r.newCommonInfo(ctx, desired, ns, previousNamespace, func(b bool) { didChange = b }, func(b bool) { isInProgress = b })

err = r.reconcileOperator(ctx, &reconcilersInfo, desired)
Expand Down
1 change: 1 addition & 0 deletions controllers/flowcollector_controller_iso_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func flowCollectorIsoSpecs() {
ConversationHeartbeatInterval: &metav1.Duration{Duration: time.Second},
ConversationEndTimeout: &metav1.Duration{Duration: time.Second},
ConversationTerminatingTimeout: &metav1.Duration{Duration: time.Second},
ClusterName: "testCluster",
Debug: flowslatest.DebugConfig{},
LogTypes: &outputRecordTypes,
Metrics: flowslatest.FLPMetrics{
Expand Down
Loading

0 comments on commit 9d208dd

Please sign in to comment.