Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding CloudEventSource metrics in Prometheus & OpenTelemetry #5259

Merged
merged 12 commits into from
Dec 22, 2023
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ Here is an overview of all new **experimental** features:

### Improvements

- **General**: Add CloudEventSource metrics in Prometheus & OpenTelemetry ([#3531](https://github.com/kedacore/keda/issues/3531))
- **General**: Add parameter queryParameters to prometheus-scaler ([#4962](https://github.com/kedacore/keda/issues/4962))
- **General**: Support TriggerAuthentication properties from ConfigMap ([#4830](https://github.com/kedacore/keda/issues/4830))
- **General**: Use client-side round-robin load balancing for grpc calls ([#5224](https://github.com/kedacore/keda/issues/5224))
Expand Down
27 changes: 27 additions & 0 deletions pkg/eventemitter/eventemitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (

eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1"
"github.com/kedacore/keda/v2/pkg/eventemitter/eventdata"
"github.com/kedacore/keda/v2/pkg/metricscollector"
kedastatus "github.com/kedacore/keda/v2/pkg/status"
)

Expand Down Expand Up @@ -181,6 +182,7 @@ func (e *EventEmitter) createEventHandlers(ctx context.Context, cloudEventSource
return
}

metricscollector.RecordCloudEventSink(cloudEventSource.Namespace, cloudEventSource.Name, cloudEventHandlerTypeHTTP, true)
eventHandlerKey := newEventHandlerKey(key, cloudEventHandlerTypeHTTP)
if h, ok := e.eventHandlersCache[eventHandlerKey]; ok {
h.CloseHandler()
Expand All @@ -202,6 +204,8 @@ func (e *EventEmitter) clearEventHandlersCache(cloudEventSource *eventingv1alpha
if eventHandler, found := e.eventHandlersCache[eventHandlerKey]; found {
eventHandler.CloseHandler()
delete(e.eventHandlersCache, key)

metricscollector.RecordCloudEventSink(cloudEventSource.Namespace, cloudEventSource.Name, cloudEventHandlerTypeHTTP, false)
}
}
}
Expand All @@ -228,8 +232,10 @@ func (e *EventEmitter) startEventLoop(ctx context.Context, cloudEventSource *eve
e.log.V(1).Info("Consuming events from CloudEventSource.")
e.emitEventByHandler(eventData)
e.checkEventHandlers(ctx, cloudEventSource, cloudEventSourceMutex)
metricscollector.RecordCloudEventQueueStatus(len(e.cloudEventProcessingChan), true)
case <-ctx.Done():
e.log.V(1).Info("CloudEventSource loop has stopped.")
metricscollector.RecordCloudEventQueueStatus(len(e.cloudEventProcessingChan), false)
return
}
}
Expand Down Expand Up @@ -293,6 +299,7 @@ func (e *EventEmitter) Emit(object runtime.Object, namesapce types.NamespacedNam
}

func (e *EventEmitter) enqueueEventData(eventData eventdata.EventData) {
metricscollector.RecordCloudEventQueueStatus(len(e.cloudEventProcessingChan), true)
select {
case e.cloudEventProcessingChan <- eventData:
e.log.V(1).Info("Event enqueued successfully.")
Expand Down Expand Up @@ -321,6 +328,8 @@ func (e *EventEmitter) emitEventByHandler(eventData eventdata.EventData) {
eventData.HandlerKey = key
if handler.GetActiveStatus() == metav1.ConditionTrue {
go handler.EmitEvent(eventData, e.emitErrorHandle)

metricscollector.RecordCloudEventEmitted(eventData.Namespace, getSourceNameFromKey(eventData.HandlerKey), getHandlerTypeFromKey(key))
} else {
e.log.V(1).Info("EventHandler's status is not active. Please check if event endpoint works well", "CloudEventSource", eventData.ObjectName)
}
Expand All @@ -335,6 +344,8 @@ func (e *EventEmitter) emitEventByHandler(eventData eventdata.EventData) {
}

func (e *EventEmitter) emitErrorHandle(eventData eventdata.EventData, err error) {
metricscollector.RecordCloudEventEmittedError(eventData.Namespace, getSourceNameFromKey(eventData.HandlerKey), getHandlerTypeFromKey(eventData.HandlerKey))

if eventData.RetryTimes >= maxRetryTimes {
e.log.V(1).Info("Failed to emit Event multiple times. Will set handler failure status.", "handler", eventData.HandlerKey, "retry times", eventData.RetryTimes)
handler, found := e.eventHandlersCache[eventData.HandlerKey]
Expand Down Expand Up @@ -389,3 +400,19 @@ func (e *EventEmitter) updateCloudEventSourceStatus(ctx context.Context, cloudEv
func newEventHandlerKey(kindNamespaceName string, handlerType string) string { //nolint:unparam
return fmt.Sprintf("%s.%s", kindNamespaceName, handlerType)
}

func getHandlerTypeFromKey(handlerKey string) string {
keys := strings.Split(handlerKey, ".")
if len(keys) >= 4 {
return keys[3]
}
return ""
}

func getSourceNameFromKey(handlerKey string) string {
keys := strings.Split(handlerKey, ".")
if len(keys) >= 4 {
return keys[2]
}
return ""
}
40 changes: 40 additions & 0 deletions pkg/metricscollector/metricscollectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ type MetricsCollector interface {
IncrementCRDTotal(crdType, namespace string)

DecrementCRDTotal(crdType, namespace string)

// RecordCloudEventEmitted counts the number of cloudevent that emitted to user's sink
RecordCloudEventEmitted(namespace string, cloudeventsource string, eventsink string)

// RecordCloudEventEmittedError counts the number of errors occurred in trying emit cloudevent
RecordCloudEventEmittedError(namespace string, cloudeventsource string, eventsink string)

// RecordCloudEventSink records user's eventsink
RecordCloudEventSink(namespace string, eventsink string, eventsinktype string, isactive bool)

// RecordCloudEventQueueStatus record the number of cloudevents that are waiting for emitting
RecordCloudEventQueueStatus(value int, isActive bool)
}

func NewMetricsCollectors(enablePrometheusMetrics bool, enableOpenTelemetryMetrics bool) {
Expand Down Expand Up @@ -144,3 +156,31 @@ func DecrementCRDTotal(crdType, namespace string) {
element.DecrementCRDTotal(crdType, namespace)
}
}

// RecordCloudEventEmitted counts the number of cloudevent that emitted to user's sink
func RecordCloudEventEmitted(namespace string, cloudeventsource string, eventsink string) {
for _, element := range collectors {
element.RecordCloudEventEmitted(namespace, cloudeventsource, eventsink)
}
}

// RecordCloudEventEmittedError counts the number of errors occurred in trying emit cloudevent
func RecordCloudEventEmittedError(namespace string, cloudeventsource string, eventsink string) {
for _, element := range collectors {
element.RecordCloudEventEmittedError(namespace, cloudeventsource, eventsink)
}
}

// RecordCloudEventSink records user's eventsink
func RecordCloudEventSink(namespace string, eventsink string, eventsinktype string, isactive bool) {
for _, element := range collectors {
element.RecordCloudEventSink(namespace, eventsink, eventsinktype, isactive)
}
}

// RecordCloudEventQueueStatus record the number of cloudevents that are waiting for emitting
func RecordCloudEventQueueStatus(value int, isActive bool) {
for _, element := range collectors {
element.RecordCloudEventQueueStatus(value, isActive)
}
}
101 changes: 101 additions & 0 deletions pkg/metricscollector/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ var (
otelInternalLoopLatencyVal OtelMetricFloat64Val
otelBuildInfoVal OtelMetricInt64Val

otCloudEventEmittedCounter api.Int64Counter
otCloudEventEmittedErrorCounter api.Int64Counter
otCloudEventSinkVal OtelMetricFloat64Val
otCloudEventQueueStatusVal OtelMetricFloat64Val

otelScalerActiveVal OtelMetricFloat64Val
)

Expand Down Expand Up @@ -140,6 +145,34 @@ func initMeters() {
if err != nil {
otLog.Error(err, msg)
}

otCloudEventEmittedCounter, err = meter.Int64Counter("keda.cloudeventsource.emitted", api.WithDescription("Total emitted cloudevents"))
if err != nil {
otLog.Error(err, msg)
}

otCloudEventEmittedErrorCounter, err = meter.Int64Counter("keda.cloudeventsource.emitted.errors", api.WithDescription("Total cloudevent emitted errors"))
if err != nil {
otLog.Error(err, msg)
}

_, err = meter.Float64ObservableGauge(
"keda.cloudeventsource.sink",
api.WithDescription("Indicates the created event sinks"),
api.WithFloat64Callback(EventSinkCreatedCallback),
)
if err != nil {
otLog.Error(err, msg)
}

_, err = meter.Float64ObservableGauge(
"keda.cloudeventsource.queue.status",
api.WithDescription("Indicates how many events are still queue"),
api.WithFloat64Callback(CloudeventQueueStatusCallback),
)
if err != nil {
otLog.Error(err, msg)
}
}

func BuildInfoCallback(_ context.Context, obsrv api.Int64Observer) error {
Expand Down Expand Up @@ -324,3 +357,71 @@ func getScalerMeasurementOption(namespace string, scaledObject string, scaler st
attribute.Key("metric").String(metric),
)
}

// RecordCloudEventEmitted counts the number of cloudevent that emitted to user's sink
func (o *OtelMetrics) RecordCloudEventEmitted(namespace string, cloudeventsource string, eventsink string) {
opt := api.WithAttributes(
attribute.Key("namespace").String(namespace),
attribute.Key("cloudEventSource").String(cloudeventsource),
attribute.Key("eventsink").String(eventsink),
)
otCloudEventEmittedCounter.Add(context.Background(), 1, opt)
}

// RecordCloudEventEmittedError counts the number of errors occurred in trying emit cloudevent
func (o *OtelMetrics) RecordCloudEventEmittedError(namespace string, cloudeventsource string, eventsink string) {
opt := api.WithAttributes(
attribute.Key("namespace").String(namespace),
attribute.Key("cloudEventSource").String(cloudeventsource),
attribute.Key("eventsink").String(eventsink),
)
otCloudEventEmittedErrorCounter.Add(context.Background(), 1, opt)
}

func EventSinkCreatedCallback(_ context.Context, obsrv api.Float64Observer) error {
if otCloudEventSinkVal.measurementOption != nil {
obsrv.Observe(otCloudEventSinkVal.val, otCloudEventSinkVal.measurementOption)
}
otCloudEventSinkVal = OtelMetricFloat64Val{}
return nil
}

// RecordCloudEventSink records user's eventsink
func (o *OtelMetrics) RecordCloudEventSink(namespace string, eventsink string, eventsinktype string, isactive bool) {
activeVal := float64(0)
if isactive {
activeVal = 1
}

opt := api.WithAttributes(
attribute.Key("namespace").String(namespace),
attribute.Key("eventsink").String(eventsink),
attribute.Key("eventsinktype").String(eventsinktype),
)

otCloudEventSinkVal.val = activeVal
otCloudEventSinkVal.measurementOption = opt
}

func CloudeventQueueStatusCallback(_ context.Context, obsrv api.Float64Observer) error {
if otCloudEventQueueStatusVal.measurementOption != nil {
obsrv.Observe(otCloudEventQueueStatusVal.val, otCloudEventQueueStatusVal.measurementOption)
}
otCloudEventQueueStatusVal = OtelMetricFloat64Val{}
return nil
}

// RecordCloudEventSourceQueueStatus record the number of cloudevents that are waiting for emitting
func (o *OtelMetrics) RecordCloudEventQueueStatus(value int, isactive bool) {
activeVal := "0"
if isactive {
activeVal = "1"
}

opt := api.WithAttributes(
attribute.Key("isQueueActive").String(activeVal),
)

otCloudEventQueueStatusVal.val = float64(value)
otCloudEventQueueStatusVal.measurementOption = opt
}
79 changes: 79 additions & 0 deletions pkg/metricscollector/prommetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,46 @@ var (
},
[]string{"namespace", "type", "resource"},
)

cloudeventEmitted = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: DefaultPromMetricsNamespace,
Subsystem: "cloudeventsource",
Name: "emitted_total",
Help: "Total emitted cloudevents",
},
[]string{"namespace", "cloudeventsource", "eventsink"},
)

cloudeventEmittedErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: DefaultPromMetricsNamespace,
Subsystem: "cloudeventsource",
Name: "emitted_errors_total",
Help: "Total cloudevent emitted errors",
},
[]string{"namespace", "cloudeventsource", "eventsink"},
)

cloudeventSink = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: DefaultPromMetricsNamespace,
Subsystem: "cloudeventsource",
Name: "sink",
Help: "Indicates the created event sinks",
},
[]string{"namespace", "eventsink", "eventsinktype"},
)

cloudeventQueueStatus = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: DefaultPromMetricsNamespace,
Subsystem: "cloudeventsource",
Name: "queue",
Help: "Indicates how many events are still queue",
},
[]string{"namespace", "isQueueActive"},
)
)

type PromMetrics struct {
Expand All @@ -149,6 +189,11 @@ func NewPromMetrics() *PromMetrics {
metrics.Registry.MustRegister(crdTotalsGaugeVec)
metrics.Registry.MustRegister(buildInfo)

metrics.Registry.MustRegister(cloudeventEmitted)
metrics.Registry.MustRegister(cloudeventEmittedErrors)
metrics.Registry.MustRegister(cloudeventSink)
metrics.Registry.MustRegister(cloudeventQueueStatus)

RecordBuildInfo()
return &PromMetrics{}
}
Expand Down Expand Up @@ -260,3 +305,37 @@ func (p *PromMetrics) DecrementCRDTotal(crdType, namespace string) {

crdTotalsGaugeVec.WithLabelValues(crdType, namespace).Dec()
}

// RecordCloudEventEmitted counts the number of cloudevent that emitted to user's sink
func (p *PromMetrics) RecordCloudEventEmitted(namespace string, cloudeventsource string, eventsink string) {
labels := prometheus.Labels{"namespace": namespace, "cloudeventsource": cloudeventsource, "eventsink": eventsink}
cloudeventEmitted.With(labels).Inc()
}

// RecordCloudEventEmittedError counts the number of errors occurred in trying emit cloudevent
func (p *PromMetrics) RecordCloudEventEmittedError(namespace string, cloudeventsource string, eventsink string) {
labels := prometheus.Labels{"namespace": namespace, "cloudeventsource": cloudeventsource, "eventsink": eventsink}
cloudeventEmittedErrors.With(labels).Inc()
}

// RecordCloudEventSink records user's eventsink
func (p *PromMetrics) RecordCloudEventSink(namespace string, eventsink string, eventsinktype string, isactive bool) {
labels := prometheus.Labels{"namespace": namespace, "eventsink": eventsink, "eventsinktype": eventsinktype}

activeVal := 0
if isactive {
activeVal = 1
}

cloudeventSink.With(labels).Set(float64(activeVal))
}

// RecordCloudEventSourceQueueStatus record the number of cloudevents that are waiting for emitting
func (p *PromMetrics) RecordCloudEventQueueStatus(value int, isactive bool) {
activeVal := "0"
if isactive {
activeVal = "1"
}

cloudeventQueueStatus.With(prometheus.Labels{"namespace": DefaultPromMetricsNamespace, "isQueueActive": activeVal}).Set(float64(value))
}
Loading