Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(events): configurable buffers and predicates #7735

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/generated/kuma-cp.md
Original file line number Diff line number Diff line change
Expand Up @@ -730,4 +730,10 @@ proxy:
tracing:
openTelemetry:
endpoint: "" # e.g. otel-collector:4317

# Configuration of the event bus which is local to one instance of CP
eventBus:
# BufferSize controls the buffer for every single event listener.
# If we go over buffer, additional delay may happen to various operation like insight recomputation or KDS.
bufferSize: 100 # ENV: KUMA_EVENT_BUS_BUFFER_SIZE
```
6 changes: 6 additions & 0 deletions docs/generated/raw/kuma-cp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -727,3 +727,9 @@ proxy:
tracing:
openTelemetry:
endpoint: "" # e.g. otel-collector:4317

# Configuration of the event bus which is local to one instance of CP
eventBus:
# BufferSize controls the buffer for every single event listener.
# If we go over buffer, additional delay may happen to various operation like insight recomputation or KDS.
bufferSize: 100 # ENV: KUMA_EVENT_BUS_BUFFER_SIZE
8 changes: 6 additions & 2 deletions pkg/config/app/kuma-cp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/kumahq/kuma/pkg/config/diagnostics"
dns_server "github.com/kumahq/kuma/pkg/config/dns-server"
dp_server "github.com/kumahq/kuma/pkg/config/dp-server"
"github.com/kumahq/kuma/pkg/config/eventbus"
"github.com/kumahq/kuma/pkg/config/intercp"
"github.com/kumahq/kuma/pkg/config/mads"
"github.com/kumahq/kuma/pkg/config/multizone"
Expand Down Expand Up @@ -169,6 +170,8 @@ type Config struct {
InterCp intercp.InterCpConfig `json:"interCp"`
// Tracing
Tracing tracing.Config `json:"tracing"`
// EventBus is a configuration of the event bus which is local to one instance of CP.
EventBus eventbus.Config `json:"eventBus"`
}

func (c *Config) Sanitize() {
Expand Down Expand Up @@ -238,8 +241,9 @@ var DefaultConfig = func() Config {
FullResyncInterval: config_types.Duration{Duration: 1 * time.Minute},
},
},
Proxy: xds.DefaultProxyConfig(),
InterCp: intercp.DefaultInterCpConfig(),
Proxy: xds.DefaultProxyConfig(),
InterCp: intercp.DefaultInterCpConfig(),
EventBus: eventbus.Default(),
}
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/config/app/kuma-cp/kuma-cp.defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -727,3 +727,9 @@ proxy:
tracing:
openTelemetry:
endpoint: "" # e.g. otel-collector:4317

# Configuration of the event bus which is local to one instance of CP
eventBus:
# BufferSize controls the buffer for every single event listener.
# If we go over buffer, additional delay may happen to various operation like insight recomputation or KDS.
bufferSize: 100 # ENV: KUMA_EVENT_BUS_BUFFER_SIZE
17 changes: 17 additions & 0 deletions pkg/config/eventbus/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package eventbus

type Config struct {
// BufferSize controls the buffer for every single event listener.
// If we go over buffer, additional delay may happen to various operation like insight recomputation or KDS.
BufferSize uint `json:"bufferSize" envconfig:"kuma_event_bus_buffer_size"`
}

func (c Config) Validate() error {
return nil
}

func Default() Config {
return Config{
BufferSize: 100,
}
}
4 changes: 4 additions & 0 deletions pkg/config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ var _ = Describe("Config loader", func() {
Expect(cfg.Experimental.KDSEventBasedWatchdog.FullResyncInterval.Duration).To(Equal(15 * time.Second))

Expect(cfg.Proxy.Gateway.GlobalDownstreamMaxConnections).To(BeNumerically("==", 1))
Expect(cfg.EventBus.BufferSize).To(Equal(uint(30)))
},
Entry("from config file", testCase{
envVars: map[string]string{},
Expand Down Expand Up @@ -676,6 +677,8 @@ experimental:
proxy:
gateway:
globalDownstreamMaxConnections: 1
eventBus:
bufferSize: 30
`,
}),
Entry("from env variables", testCase{
Expand Down Expand Up @@ -921,6 +924,7 @@ proxy:
"KUMA_EXPERIMENTAL_KDS_EVENT_BASED_WATCHDOG_FULL_RESYNC_INTERVAL": "15s",
"KUMA_PROXY_GATEWAY_GLOBAL_DOWNSTREAM_MAX_CONNECTIONS": "1",
"KUMA_TRACING_OPENTELEMETRY_ENDPOINT": "otel-collector:4317",
"KUMA_EVENT_BUS_BUFFER_SIZE": "30",
},
yamlFileConfig: "",
}),
Expand Down
5 changes: 4 additions & 1 deletion pkg/core/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,10 @@ func initializeResourceStore(cfg kuma_cp.Config, builder *core_runtime.Builder)
return err
}
builder.WithResourceStore(rs)
eventBus := events.NewEventBus()
eventBus, err := events.NewEventBus(cfg.EventBus.BufferSize, builder.Metrics())
if err != nil {
return err
}
if err := plugin.EventListener(builder, eventBus); err != nil {
return err
}
Expand Down
62 changes: 51 additions & 11 deletions pkg/events/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,54 @@ package events
import (
"sync"

"github.com/prometheus/client_golang/prometheus"

"github.com/kumahq/kuma/pkg/core"
core_metrics "github.com/kumahq/kuma/pkg/metrics"
)

func NewEventBus() EventBus {
return &eventBus{
subscribers: map[string]chan Event{},
var log = core.Log.WithName("eventbus")

type subscriber struct {
ch chan Event
predicates []Predicate
}

func NewEventBus(bufferSize uint, metrics core_metrics.Metrics) (EventBus, error) {
metric := prometheus.NewCounter(prometheus.CounterOpts{
Name: "events_dropped",
Help: "Number of dropped events in event bus due to full channels",
})
if err := metrics.Register(metric); err != nil {
return nil, err
}
return &eventBus{
subscribers: map[string]subscriber{},
bufferSize: bufferSize,
metric: metric,
}, nil
}

type eventBus struct {
mtx sync.RWMutex
subscribers map[string]chan Event
subscribers map[string]subscriber
bufferSize uint
metric prometheus.Counter
}

func (b *eventBus) Subscribe() Listener {
// Subscribe subscribes to a stream of events given Predicates
// Predicate should not block on I/O, otherwise the whole event bus can block.
jakubdyszkiewicz marked this conversation as resolved.
Show resolved Hide resolved
// All predicates must pass for the event to enqueued.
func (b *eventBus) Subscribe(predicates ...Predicate) Listener {
id := core.NewUUID()
b.mtx.Lock()
defer b.mtx.Unlock()

events := make(chan Event, 10)
b.subscribers[id] = events
events := make(chan Event, b.bufferSize)
b.subscribers[id] = subscriber{
ch: events,
predicates: predicates,
}
return &reader{
events: events,
close: func() {
Expand All @@ -37,10 +64,23 @@ func (b *eventBus) Subscribe() Listener {
func (b *eventBus) Send(event Event) {
b.mtx.RLock()
defer b.mtx.RUnlock()
switch e := event.(type) {
case ResourceChangedEvent:
for _, channel := range b.subscribers {
channel <- e
for _, sub := range b.subscribers {
matched := true
for _, predicate := range sub.predicates {
if !predicate(event) {
matched = false
}
}
if matched {
select {
case sub.ch <- event:
default:
b.metric.Inc()
log.Info("[WARNING] event is not sent because the channel is full. Ignoring event. Consider increasing buffer size using KUMA_EVENT_BUS_BUFFER_SIZE",
"bufferSize", b.bufferSize,
"event", event,
)
}
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/events/eventbus_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package events_test

import (
"testing"

"github.com/kumahq/kuma/pkg/test"
)

func TestEvents(t *testing.T) {
test.RunSpecs(t, "Events Suite")
}
69 changes: 69 additions & 0 deletions pkg/events/eventbus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package events_test

import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/kumahq/kuma/pkg/events"
core_metrics "github.com/kumahq/kuma/pkg/metrics"
test_metrics "github.com/kumahq/kuma/pkg/test/metrics"
)

var _ = Describe("EventBus", func() {
chHadEvent := func(ch <-chan events.Event) bool {
select {
case <-ch:
return true
default:
return false
}
}

It("should not block on Send", func() {
// given
metrics, err := core_metrics.NewMetrics("")
Expect(err).ToNot(HaveOccurred())
eventBus, err := events.NewEventBus(1, metrics)
Expect(err).ToNot(HaveOccurred())
listener := eventBus.Subscribe()
event1 := events.ResourceChangedEvent{TenantID: "1"}
event2 := events.ResourceChangedEvent{TenantID: "2"}

// when
eventBus.Send(event1)
eventBus.Send(event2)

// then
event := <-listener.Recv()
Expect(event).To(Equal(event1))

// and second event was ignored because buffer was full
Expect(chHadEvent(listener.Recv())).To(BeFalse())
Expect(test_metrics.FindMetric(metrics, "events_dropped").Counter.GetValue()).To(Equal(1.0))
})

It("should only send events matched predicate", func() {
// given
metrics, err := core_metrics.NewMetrics("")
Expect(err).ToNot(HaveOccurred())
eventBus, err := events.NewEventBus(10, metrics)
Expect(err).ToNot(HaveOccurred())
listener := eventBus.Subscribe(func(event events.Event) bool {
return event.(events.ResourceChangedEvent).TenantID == "1"
})
event1 := events.ResourceChangedEvent{TenantID: "1"}
event2 := events.ResourceChangedEvent{TenantID: "2"}

// when
eventBus.Send(event1)
eventBus.Send(event2)

// then
event := <-listener.Recv()
Expect(event).To(Equal(event1))

// and second event was ignored, because it did not match predicate
Expect(chHadEvent(listener.Recv())).To(BeFalse())
Expect(test_metrics.FindMetric(metrics, "events_dropped").Counter.GetValue()).To(Equal(0.0))
})
})
4 changes: 3 additions & 1 deletion pkg/events/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ type Listener interface {
Close()
}

type Predicate = func(event Event) bool

type Emitter interface {
Send(Event)
}

type ListenerFactory interface {
Subscribe() Listener
Subscribe(...Predicate) Listener
}

type EventBus interface {
Expand Down
27 changes: 17 additions & 10 deletions pkg/insights/resyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,22 @@ func (r *resyncer) Start(stop <-chan struct{}) error {
}
}
}()
eventReader := r.eventFactory.Subscribe()
eventReader := r.eventFactory.Subscribe(func(event events.Event) bool {
if _, ok := event.(events.TriggerInsightsComputationEvent); ok {
return true
}
if resourceChanged, ok := event.(events.ResourceChangedEvent); ok {
desc, err := r.registry.DescriptorFor(resourceChanged.Type)
if err != nil {
log.Error(err, "Resource is not registered in the registry, ignoring it", "resource", resourceChanged.Type)
return false
}
if desc.Scope == model.ScopeGlobal && desc.Name != core_mesh.MeshType {
return false
}
}
return true
})
defer eventReader.Close()
batch := &eventBatch{events: map[string]*resyncEvent{}}
ticker := r.tick(r.minResyncInterval)
Expand Down Expand Up @@ -257,14 +272,6 @@ func (r *resyncer) Start(stop <-chan struct{}) error {
}
}
if resourceChanged, ok := event.(events.ResourceChangedEvent); ok {
desc, err := r.registry.DescriptorFor(resourceChanged.Type)
if err != nil {
log.Error(err, "Resource is not registered in the registry, ignoring it", "resource", resourceChanged.Type)
continue
}
if desc.Scope == model.ScopeGlobal && desc.Name != core_mesh.MeshType {
continue
}
supported, err := r.tenantFn.IDSupported(ctx, resourceChanged.TenantID)
if err != nil {
log.Error(err, "could not determine if tenant ID is supported", "tenantID", resourceChanged.TenantID)
Expand All @@ -274,7 +281,7 @@ func (r *resyncer) Start(stop <-chan struct{}) error {
continue
}
meshName := resourceChanged.Key.Mesh
if desc.Name == core_mesh.MeshType {
if resourceChanged.Type == core_mesh.MeshType {
meshName = resourceChanged.Key.Name
}
var f actionFlag
Expand Down
2 changes: 1 addition & 1 deletion pkg/insights/test/test_event_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ type TestEventReaderFactory struct {
Reader *TestEventReader
}

func (t *TestEventReaderFactory) Subscribe() events.Listener {
func (t *TestEventReaderFactory) Subscribe(...events.Predicate) events.Listener {
return t.Reader
}
2 changes: 1 addition & 1 deletion pkg/kds/v2/server/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func newSyncTracker(
return &EventBasedWatchdog{
Ctx: ctx,
Node: node,
Listener: eventBus.Subscribe(),
EventBus: eventBus,
Reconciler: reconciler,
ProvidedTypes: changedTypes,
Metrics: kdsMetrics,
Expand Down
Loading