Skip to content

Commit

Permalink
Adding CloudEventSource metrics in Prometheus & OpenTelemetry (kedaco…
Browse files Browse the repository at this point in the history
…re#5259)

Signed-off-by: SpiritZhou <[email protected]>
Signed-off-by: Siva Guruvareddiar <[email protected]>
  • Loading branch information
SpiritZhou authored and Siva Guruvareddiar committed Dec 24, 2023
1 parent df16ac1 commit bc424b0
Show file tree
Hide file tree
Showing 7 changed files with 515 additions and 42 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ Here is an overview of all new **experimental** features:

### Improvements

- **General**: Add CloudEventSource metrics in Prometheus & OpenTelemetry ([#3531](https://github.com/kedacore/keda/issues/3531))
- **General**: Add parameter queryParameters to prometheus-scaler ([#4962](https://github.com/kedacore/keda/issues/4962))
- **General**: Add validations for replica counts when creating ScaledObjects ([#5288](https://github.com/kedacore/keda/issues/5288))
- **General**: Bubble up AuthRef TriggerAuthentication errors as ScaledObject events ([#5190](https://github.com/kedacore/keda/issues/5190))
Expand Down
24 changes: 24 additions & 0 deletions pkg/eventemitter/eventemitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (

eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1"
"github.com/kedacore/keda/v2/pkg/eventemitter/eventdata"
"github.com/kedacore/keda/v2/pkg/metricscollector"
kedastatus "github.com/kedacore/keda/v2/pkg/status"
)

Expand Down Expand Up @@ -228,8 +229,10 @@ func (e *EventEmitter) startEventLoop(ctx context.Context, cloudEventSource *eve
e.log.V(1).Info("Consuming events from CloudEventSource.")
e.emitEventByHandler(eventData)
e.checkEventHandlers(ctx, cloudEventSource, cloudEventSourceMutex)
metricscollector.RecordCloudEventQueueStatus(cloudEventSource.Namespace, len(e.cloudEventProcessingChan))
case <-ctx.Done():
e.log.V(1).Info("CloudEventSource loop has stopped.")
metricscollector.RecordCloudEventQueueStatus(cloudEventSource.Namespace, len(e.cloudEventProcessingChan))
return
}
}
Expand Down Expand Up @@ -295,6 +298,7 @@ func (e *EventEmitter) Emit(object runtime.Object, namesapce types.NamespacedNam
}

func (e *EventEmitter) enqueueEventData(eventData eventdata.EventData) {
metricscollector.RecordCloudEventQueueStatus(eventData.Namespace, len(e.cloudEventProcessingChan))
select {
case e.cloudEventProcessingChan <- eventData:
e.log.V(1).Info("Event enqueued successfully.")
Expand Down Expand Up @@ -323,6 +327,8 @@ func (e *EventEmitter) emitEventByHandler(eventData eventdata.EventData) {
eventData.HandlerKey = key
if handler.GetActiveStatus() == metav1.ConditionTrue {
go handler.EmitEvent(eventData, e.emitErrorHandle)

metricscollector.RecordCloudEventEmitted(eventData.Namespace, getSourceNameFromKey(eventData.HandlerKey), getHandlerTypeFromKey(key))
} else {
e.log.V(1).Info("EventHandler's status is not active. Please check if event endpoint works well", "CloudEventSource", eventData.ObjectName)
}
Expand All @@ -337,6 +343,8 @@ func (e *EventEmitter) emitEventByHandler(eventData eventdata.EventData) {
}

func (e *EventEmitter) emitErrorHandle(eventData eventdata.EventData, err error) {
metricscollector.RecordCloudEventEmittedError(eventData.Namespace, getSourceNameFromKey(eventData.HandlerKey), getHandlerTypeFromKey(eventData.HandlerKey))

if eventData.RetryTimes >= maxRetryTimes {
e.log.V(1).Info("Failed to emit Event multiple times. Will set handler failure status.", "handler", eventData.HandlerKey, "retry times", eventData.RetryTimes)
handler, found := e.eventHandlersCache[eventData.HandlerKey]
Expand Down Expand Up @@ -391,3 +399,19 @@ func (e *EventEmitter) updateCloudEventSourceStatus(ctx context.Context, cloudEv
func newEventHandlerKey(kindNamespaceName string, handlerType string) string { //nolint:unparam
return fmt.Sprintf("%s.%s", kindNamespaceName, handlerType)
}

func getHandlerTypeFromKey(handlerKey string) string {
keys := strings.Split(handlerKey, ".")
if len(keys) >= 4 {
return keys[3]
}
return ""
}

func getSourceNameFromKey(handlerKey string) string {
keys := strings.Split(handlerKey, ".")
if len(keys) >= 4 {
return keys[2]
}
return ""
}
30 changes: 30 additions & 0 deletions pkg/metricscollector/metricscollectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ type MetricsCollector interface {
IncrementCRDTotal(crdType, namespace string)

DecrementCRDTotal(crdType, namespace string)

// RecordCloudEventEmitted counts the number of cloudevent that emitted to user's sink
RecordCloudEventEmitted(namespace string, cloudeventsource string, eventsink string)

// RecordCloudEventEmittedError counts the number of errors occurred in trying emit cloudevent
RecordCloudEventEmittedError(namespace string, cloudeventsource string, eventsink string)

// RecordCloudEventQueueStatus record the number of cloudevents that are waiting for emitting
RecordCloudEventQueueStatus(namespace string, value int)
}

func NewMetricsCollectors(enablePrometheusMetrics bool, enableOpenTelemetryMetrics bool) {
Expand Down Expand Up @@ -144,3 +153,24 @@ func DecrementCRDTotal(crdType, namespace string) {
element.DecrementCRDTotal(crdType, namespace)
}
}

// RecordCloudEventEmitted counts the number of cloudevent that emitted to user's sink
func RecordCloudEventEmitted(namespace string, cloudeventsource string, eventsink string) {
for _, element := range collectors {
element.RecordCloudEventEmitted(namespace, cloudeventsource, eventsink)
}
}

// RecordCloudEventEmittedError counts the number of errors occurred in trying emit cloudevent
func RecordCloudEventEmittedError(namespace string, cloudeventsource string, eventsink string) {
for _, element := range collectors {
element.RecordCloudEventEmittedError(namespace, cloudeventsource, eventsink)
}
}

// RecordCloudEventQueueStatus record the number of cloudevents that are waiting for emitting
func RecordCloudEventQueueStatus(namespace string, value int) {
for _, element := range collectors {
element.RecordCloudEventQueueStatus(namespace, value)
}
}
57 changes: 57 additions & 0 deletions pkg/metricscollector/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ var (
otelInternalLoopLatencyVal OtelMetricFloat64Val
otelBuildInfoVal OtelMetricInt64Val

otCloudEventEmittedCounter api.Int64Counter
otCloudEventQueueStatusVal OtelMetricFloat64Val

otelScalerActiveVal OtelMetricFloat64Val
)

Expand Down Expand Up @@ -140,6 +143,20 @@ func initMeters() {
if err != nil {
otLog.Error(err, msg)
}

otCloudEventEmittedCounter, err = meter.Int64Counter("keda.cloudeventsource.events.emitted.count", api.WithDescription("Measured the total number of emitted cloudevents. 'namespace': namespace of CloudEventSource 'cloudeventsource': name of CloudEventSource object. 'eventsink': destination of this emitted event 'state':indicated events emitted successfully or not"))
if err != nil {
otLog.Error(err, msg)
}

_, err = meter.Float64ObservableGauge(
"keda.cloudeventsource.events.queued",
api.WithDescription("Indicates how many events are still queue"),
api.WithFloat64Callback(CloudeventQueueStatusCallback),
)
if err != nil {
otLog.Error(err, msg)
}
}

func BuildInfoCallback(_ context.Context, obsrv api.Int64Observer) error {
Expand Down Expand Up @@ -324,3 +341,43 @@ func getScalerMeasurementOption(namespace string, scaledObject string, scaler st
attribute.Key("metric").String(metric),
)
}

// RecordCloudEventEmitted counts the number of cloudevent that emitted to user's sink
func (o *OtelMetrics) RecordCloudEventEmitted(namespace string, cloudeventsource string, eventsink string) {
opt := api.WithAttributes(
attribute.Key("namespace").String(namespace),
attribute.Key("cloudEventSource").String(cloudeventsource),
attribute.Key("eventsink").String(eventsink),
attribute.Key("state").String("emitted"),
)
otCloudEventEmittedCounter.Add(context.Background(), 1, opt)
}

// RecordCloudEventEmitted counts the number of errors occurred in trying emit cloudevent
func (o *OtelMetrics) RecordCloudEventEmittedError(namespace string, cloudeventsource string, eventsink string) {
opt := api.WithAttributes(
attribute.Key("namespace").String(namespace),
attribute.Key("cloudEventSource").String(cloudeventsource),
attribute.Key("eventsink").String(eventsink),
attribute.Key("state").String("failed"),
)
otCloudEventEmittedCounter.Add(context.Background(), 1, opt)
}

func CloudeventQueueStatusCallback(_ context.Context, obsrv api.Float64Observer) error {
if otCloudEventQueueStatusVal.measurementOption != nil {
obsrv.Observe(otCloudEventQueueStatusVal.val, otCloudEventQueueStatusVal.measurementOption)
}
otCloudEventQueueStatusVal = OtelMetricFloat64Val{}
return nil
}

// RecordCloudEventSourceQueueStatus record the number of cloudevents that are waiting for emitting
func (o *OtelMetrics) RecordCloudEventQueueStatus(namespace string, value int) {
opt := api.WithAttributes(
attribute.Key("namespace").String(namespace),
)

otCloudEventQueueStatusVal.val = float64(value)
otCloudEventQueueStatusVal.measurementOption = opt
}
41 changes: 41 additions & 0 deletions pkg/metricscollector/prommetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,27 @@ var (
},
[]string{"namespace", "type", "resource"},
)

// Total emitted cloudevents.
cloudeventEmitted = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: DefaultPromMetricsNamespace,
Subsystem: "cloudeventsource",
Name: "events_emitted_total",
Help: "Measured the total number of emitted cloudevents. 'namespace': namespace of CloudEventSource 'cloudeventsource': name of CloudEventSource object. 'eventsink': destination of this emitted event 'state':indicated events emitted successfully or not",
},
[]string{"namespace", "cloudeventsource", "eventsink", "state"},
)

cloudeventQueueStatus = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: DefaultPromMetricsNamespace,
Subsystem: "cloudeventsource",
Name: "events_queued",
Help: "Indicates how many events are still queue",
},
[]string{"namespace"},
)
)

type PromMetrics struct {
Expand All @@ -149,6 +170,9 @@ func NewPromMetrics() *PromMetrics {
metrics.Registry.MustRegister(crdTotalsGaugeVec)
metrics.Registry.MustRegister(buildInfo)

metrics.Registry.MustRegister(cloudeventEmitted)
metrics.Registry.MustRegister(cloudeventQueueStatus)

RecordBuildInfo()
return &PromMetrics{}
}
Expand Down Expand Up @@ -260,3 +284,20 @@ func (p *PromMetrics) DecrementCRDTotal(crdType, namespace string) {

crdTotalsGaugeVec.WithLabelValues(crdType, namespace).Dec()
}

// RecordCloudEventEmitted counts the number of cloudevent that emitted to user's sink
func (p *PromMetrics) RecordCloudEventEmitted(namespace string, cloudeventsource string, eventsink string) {
labels := prometheus.Labels{"namespace": namespace, "cloudeventsource": cloudeventsource, "eventsink": eventsink, "state": "emitted"}
cloudeventEmitted.With(labels).Inc()
}

// RecordCloudEventEmittedError counts the number of errors occurred in trying emit cloudevent
func (p *PromMetrics) RecordCloudEventEmittedError(namespace string, cloudeventsource string, eventsink string) {
labels := prometheus.Labels{"namespace": namespace, "cloudeventsource": cloudeventsource, "eventsink": eventsink, "state": "failed"}
cloudeventEmitted.With(labels).Inc()
}

// RecordCloudEventSourceQueueStatus record the number of cloudevents that are waiting for emitting
func (p *PromMetrics) RecordCloudEventQueueStatus(namespace string, value int) {
cloudeventQueueStatus.With(prometheus.Labels{"namespace": namespace}).Set(float64(value))
}
Loading

0 comments on commit bc424b0

Please sign in to comment.