Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding CloudEventSource metrics in Prometheus & OpenTelemetry #5259

Merged
merged 12 commits into from
Dec 22, 2023
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ Here is an overview of all new **experimental** features:

### Improvements

- **General**: Add CloudEventSource metrics in Prometheus & OpenTelemetry ([#3531](https://github.com/kedacore/keda/issues/3531))
- **General**: Add parameter queryParameters to prometheus-scaler ([#4962](https://github.com/kedacore/keda/issues/4962))
- **General**: Support TriggerAuthentication properties from ConfigMap ([#4830](https://github.com/kedacore/keda/issues/4830))
- **General**: Use client-side round-robin load balancing for grpc calls ([#5224](https://github.com/kedacore/keda/issues/5224))
Expand Down
24 changes: 24 additions & 0 deletions pkg/eventemitter/eventemitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (

eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1"
"github.com/kedacore/keda/v2/pkg/eventemitter/eventdata"
"github.com/kedacore/keda/v2/pkg/metricscollector"
kedastatus "github.com/kedacore/keda/v2/pkg/status"
)

Expand Down Expand Up @@ -228,8 +229,10 @@ func (e *EventEmitter) startEventLoop(ctx context.Context, cloudEventSource *eve
e.log.V(1).Info("Consuming events from CloudEventSource.")
e.emitEventByHandler(eventData)
e.checkEventHandlers(ctx, cloudEventSource, cloudEventSourceMutex)
metricscollector.RecordCloudEventQueueStatus(cloudEventSource.Namespace, len(e.cloudEventProcessingChan))
case <-ctx.Done():
e.log.V(1).Info("CloudEventSource loop has stopped.")
metricscollector.RecordCloudEventQueueStatus(cloudEventSource.Namespace, len(e.cloudEventProcessingChan))
return
}
}
Expand Down Expand Up @@ -293,6 +296,7 @@ func (e *EventEmitter) Emit(object runtime.Object, namesapce types.NamespacedNam
}

func (e *EventEmitter) enqueueEventData(eventData eventdata.EventData) {
metricscollector.RecordCloudEventQueueStatus(eventData.Namespace, len(e.cloudEventProcessingChan))
select {
case e.cloudEventProcessingChan <- eventData:
e.log.V(1).Info("Event enqueued successfully.")
Expand Down Expand Up @@ -321,6 +325,8 @@ func (e *EventEmitter) emitEventByHandler(eventData eventdata.EventData) {
eventData.HandlerKey = key
if handler.GetActiveStatus() == metav1.ConditionTrue {
go handler.EmitEvent(eventData, e.emitErrorHandle)

metricscollector.RecordCloudEventEmitted(eventData.Namespace, getSourceNameFromKey(eventData.HandlerKey), getHandlerTypeFromKey(key))
} else {
e.log.V(1).Info("EventHandler's status is not active. Please check if event endpoint works well", "CloudEventSource", eventData.ObjectName)
}
Expand All @@ -335,6 +341,8 @@ func (e *EventEmitter) emitEventByHandler(eventData eventdata.EventData) {
}

func (e *EventEmitter) emitErrorHandle(eventData eventdata.EventData, err error) {
metricscollector.RecordCloudEventEmittedError(eventData.Namespace, getSourceNameFromKey(eventData.HandlerKey), getHandlerTypeFromKey(eventData.HandlerKey))

if eventData.RetryTimes >= maxRetryTimes {
e.log.V(1).Info("Failed to emit Event multiple times. Will set handler failure status.", "handler", eventData.HandlerKey, "retry times", eventData.RetryTimes)
handler, found := e.eventHandlersCache[eventData.HandlerKey]
Expand Down Expand Up @@ -389,3 +397,19 @@ func (e *EventEmitter) updateCloudEventSourceStatus(ctx context.Context, cloudEv
func newEventHandlerKey(kindNamespaceName string, handlerType string) string { //nolint:unparam
return fmt.Sprintf("%s.%s", kindNamespaceName, handlerType)
}

func getHandlerTypeFromKey(handlerKey string) string {
keys := strings.Split(handlerKey, ".")
if len(keys) >= 4 {
return keys[3]
}
return ""
}

func getSourceNameFromKey(handlerKey string) string {
keys := strings.Split(handlerKey, ".")
if len(keys) >= 4 {
return keys[2]
}
return ""
}
30 changes: 30 additions & 0 deletions pkg/metricscollector/metricscollectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ type MetricsCollector interface {
IncrementCRDTotal(crdType, namespace string)

DecrementCRDTotal(crdType, namespace string)

// RecordCloudEventEmitted counts the number of cloudevent that emitted to user's sink
RecordCloudEventEmitted(namespace string, cloudeventsource string, eventsink string)

// RecordCloudEventEmittedError counts the number of errors occurred in trying emit cloudevent
RecordCloudEventEmittedError(namespace string, cloudeventsource string, eventsink string)

// RecordCloudEventQueueStatus record the number of cloudevents that are waiting for emitting
RecordCloudEventQueueStatus(namespace string, value int)
}

func NewMetricsCollectors(enablePrometheusMetrics bool, enableOpenTelemetryMetrics bool) {
Expand Down Expand Up @@ -144,3 +153,24 @@ func DecrementCRDTotal(crdType, namespace string) {
element.DecrementCRDTotal(crdType, namespace)
}
}

// RecordCloudEventEmitted counts the number of cloudevent that emitted to user's sink
func RecordCloudEventEmitted(namespace string, cloudeventsource string, eventsink string) {
for _, element := range collectors {
element.RecordCloudEventEmitted(namespace, cloudeventsource, eventsink)
}
}

// RecordCloudEventEmittedError counts the number of errors occurred in trying emit cloudevent
func RecordCloudEventEmittedError(namespace string, cloudeventsource string, eventsink string) {
for _, element := range collectors {
element.RecordCloudEventEmittedError(namespace, cloudeventsource, eventsink)
}
}

// RecordCloudEventQueueStatus record the number of cloudevents that are waiting for emitting
func RecordCloudEventQueueStatus(namespace string, value int) {
for _, element := range collectors {
element.RecordCloudEventQueueStatus(namespace, value)
}
}
57 changes: 57 additions & 0 deletions pkg/metricscollector/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ var (
otelInternalLoopLatencyVal OtelMetricFloat64Val
otelBuildInfoVal OtelMetricInt64Val

otCloudEventEmittedCounter api.Int64Counter
otCloudEventQueueStatusVal OtelMetricFloat64Val

otelScalerActiveVal OtelMetricFloat64Val
)

Expand Down Expand Up @@ -140,6 +143,20 @@ func initMeters() {
if err != nil {
otLog.Error(err, msg)
}

otCloudEventEmittedCounter, err = meter.Int64Counter("keda.cloudeventsource.events.emitted.count", api.WithDescription("Measured the total number of emitted cloudevents. 'namespace': namespace of CloudEventSource 'cloudeventsource': name of CloudEventSource object. 'eventsink': destination of this emitted event 'state':indicated events emitted successfully or not"))
if err != nil {
otLog.Error(err, msg)
}

_, err = meter.Float64ObservableGauge(
"keda.cloudeventsource.events.queued",
api.WithDescription("Indicates how many events are still queue"),
api.WithFloat64Callback(CloudeventQueueStatusCallback),
)
if err != nil {
otLog.Error(err, msg)
}
}

func BuildInfoCallback(_ context.Context, obsrv api.Int64Observer) error {
Expand Down Expand Up @@ -324,3 +341,43 @@ func getScalerMeasurementOption(namespace string, scaledObject string, scaler st
attribute.Key("metric").String(metric),
)
}

// RecordCloudEventEmitted counts the number of cloudevent that emitted to user's sink
func (o *OtelMetrics) RecordCloudEventEmitted(namespace string, cloudeventsource string, eventsink string) {
opt := api.WithAttributes(
attribute.Key("namespace").String(namespace),
attribute.Key("cloudEventSource").String(cloudeventsource),
attribute.Key("eventsink").String(eventsink),
attribute.Key("state").String("emitted"),
)
otCloudEventEmittedCounter.Add(context.Background(), 1, opt)
}

// RecordCloudEventEmitted counts the number of errors occurred in trying emit cloudevent
func (o *OtelMetrics) RecordCloudEventEmittedError(namespace string, cloudeventsource string, eventsink string) {
opt := api.WithAttributes(
attribute.Key("namespace").String(namespace),
attribute.Key("cloudEventSource").String(cloudeventsource),
attribute.Key("eventsink").String(eventsink),
attribute.Key("state").String("failed"),
)
otCloudEventEmittedCounter.Add(context.Background(), 1, opt)
}

func CloudeventQueueStatusCallback(_ context.Context, obsrv api.Float64Observer) error {
if otCloudEventQueueStatusVal.measurementOption != nil {
obsrv.Observe(otCloudEventQueueStatusVal.val, otCloudEventQueueStatusVal.measurementOption)
}
otCloudEventQueueStatusVal = OtelMetricFloat64Val{}
return nil
}

// RecordCloudEventSourceQueueStatus record the number of cloudevents that are waiting for emitting
func (o *OtelMetrics) RecordCloudEventQueueStatus(namespace string, value int) {
opt := api.WithAttributes(
attribute.Key("namespace").String(namespace),
)

otCloudEventQueueStatusVal.val = float64(value)
otCloudEventQueueStatusVal.measurementOption = opt
}
41 changes: 41 additions & 0 deletions pkg/metricscollector/prommetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,27 @@ var (
},
[]string{"namespace", "type", "resource"},
)

// Total emitted cloudevents.
cloudeventEmitted = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: DefaultPromMetricsNamespace,
Subsystem: "cloudeventsource",
Name: "events_emitted_total",
Help: "Measured the total number of emitted cloudevents. 'namespace': namespace of CloudEventSource 'cloudeventsource': name of CloudEventSource object. 'eventsink': destination of this emitted event 'state':indicated events emitted successfully or not",
},
[]string{"namespace", "cloudeventsource", "eventsink", "state"},
)

cloudeventQueueStatus = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: DefaultPromMetricsNamespace,
Subsystem: "cloudeventsource",
Name: "queue_queued",
Help: "Indicates how many events are still queue",
},
[]string{"namespace"},
)
)

type PromMetrics struct {
Expand All @@ -149,6 +170,9 @@ func NewPromMetrics() *PromMetrics {
metrics.Registry.MustRegister(crdTotalsGaugeVec)
metrics.Registry.MustRegister(buildInfo)

metrics.Registry.MustRegister(cloudeventEmitted)
metrics.Registry.MustRegister(cloudeventQueueStatus)

RecordBuildInfo()
return &PromMetrics{}
}
Expand Down Expand Up @@ -260,3 +284,20 @@ func (p *PromMetrics) DecrementCRDTotal(crdType, namespace string) {

crdTotalsGaugeVec.WithLabelValues(crdType, namespace).Dec()
}

// RecordCloudEventEmitted counts the number of cloudevent that emitted to user's sink
func (p *PromMetrics) RecordCloudEventEmitted(namespace string, cloudeventsource string, eventsink string) {
labels := prometheus.Labels{"namespace": namespace, "cloudeventsource": cloudeventsource, "eventsink": eventsink, "state": "emitted"}
cloudeventEmitted.With(labels).Inc()
}

// RecordCloudEventEmittedError counts the number of errors occurred in trying emit cloudevent
func (p *PromMetrics) RecordCloudEventEmittedError(namespace string, cloudeventsource string, eventsink string) {
labels := prometheus.Labels{"namespace": namespace, "cloudeventsource": cloudeventsource, "eventsink": eventsink, "state": "failed"}
cloudeventEmitted.With(labels).Inc()
}

// RecordCloudEventSourceQueueStatus record the number of cloudevents that are waiting for emitting
func (p *PromMetrics) RecordCloudEventQueueStatus(namespace string, value int) {
cloudeventQueueStatus.With(prometheus.Labels{"namespace": namespace}).Set(float64(value))
}
Loading