From 4bb45a88107335af8ccfbf7ab240d89c8400fc1d Mon Sep 17 00:00:00 2001 From: Mateusz Szostok Date: Mon, 28 Nov 2022 19:26:03 +0100 Subject: [PATCH 1/5] Add support for handling sources plugins, add plugin devel docs --- .goreleaser.plugin.yaml | 14 ++ CONTRIBUTING.md | 44 ++++- Makefile | 9 +- cmd/botkube/main.go | 13 +- cmd/executor/echo/main.go | 2 +- cmd/source/cm-watcher/README.md | 11 ++ cmd/source/cm-watcher/main.go | 58 +++++++ hack/goreleaser.sh | 7 + internal/plugin/collector.go | 35 +++- internal/plugin/manager.go | 56 +++++-- internal/plugin/store.go | 31 ++-- internal/plugin/store_test.go | 2 +- {pkg => internal}/source/registration.go | 0 {pkg => internal}/source/registration_test.go | 0 {pkg => internal}/source/router.go | 0 {pkg => internal}/source/router_test.go | 0 internal/source/source.go | 72 +++++++++ pkg/api/source/grpc_adapter.go | 144 +++++++++++++++++ pkg/api/source/source.pb.go | 153 ++++++++++++++++++ pkg/api/source/source_grpc.pb.go | 133 +++++++++++++++ pkg/config/config.go | 1 + pkg/config/config_test.go | 10 ++ .../TestLoadConfigSuccess/config-all.yaml | 6 + .../TestLoadConfigSuccess/config.golden.yaml | 7 +- .../TestLoadConfigWithPlugins/config-all.yaml | 7 + pkg/controller/controller.go | 2 +- proto/source.proto | 16 ++ test/e2e/bots_test.go | 5 +- .../plugin_server.go} | 78 +++++---- test/helpers/plugin_server.go | 34 ++++ 30 files changed, 881 insertions(+), 69 deletions(-) create mode 100644 cmd/source/cm-watcher/README.md create mode 100644 cmd/source/cm-watcher/main.go rename {pkg => internal}/source/registration.go (100%) rename {pkg => internal}/source/registration_test.go (100%) rename {pkg => internal}/source/router.go (100%) rename {pkg => internal}/source/router_test.go (100%) create mode 100644 internal/source/source.go create mode 100644 pkg/api/source/grpc_adapter.go create mode 100644 pkg/api/source/source.pb.go create mode 100644 pkg/api/source/source_grpc.pb.go create mode 100644 proto/source.proto rename test/e2e/{plugin_server_test.go => fake/plugin_server.go} (56%) create mode 100644 test/helpers/plugin_server.go diff --git a/.goreleaser.plugin.yaml b/.goreleaser.plugin.yaml index 1e1afcead..f1159c5db 100644 --- a/.goreleaser.plugin.yaml +++ b/.goreleaser.plugin.yaml @@ -17,3 +17,17 @@ builds: - arm64 goarm: - 7 + - id: cm-watcher + binary: source_cm-watcher_{{ .Os }}_{{ .Arch }} + no_unique_dist_dir: true + main: cmd/source/cm-watcher/main.go + env: + - CGO_ENABLED=0 + goos: + - linux + - darwin + goarch: + - amd64 + - arm64 + goarm: + - 7 diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 05ae4af98..c847d8782 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -36,7 +36,7 @@ This section describes how to build and run Botkube from source code. This is ideal for running Botkube on a local cluster, e.g. using [kind](https://kind.sigs.k8s.io) or [`minikube`](https://minikube.sigs.k8s.io/docs/). Remember to set the `IMAGE_PLATFORM` env var to your target architecture. By default, the build targets `linux/amd64`. - + For example, the command below builds the `linux/arm64` target: ```sh @@ -62,23 +62,23 @@ This section describes how to build and run Botkube from source code. docker tag ghcr.io/kubeshop/botkube:v9.99.9-dev-amd64 {your_account}/botkube:v9.99.9-dev docker push {your_account}/botkube:v9.99.9-dev ``` - + Where `{your_account}` is Docker hub or any other registry provider account to which you can push the image. 2. Install Botkube with any of communication platform configured, according to [the installation instructions](https://docs.botkube.io/installation/). During the Helm chart installation step, set the following flags: - + ```sh export IMAGE_REGISTRY="{imageRegistry}" # e.g. docker.io export IMAGE_PULL_POLICY="{pullPolicy}" # e.g. Always or IfNotPresent - + --set image.registry=${IMAGE_REGISTRY} \ --set image.repository={your_account}/botkube \ --set image.tag=v9.99.9-dev \ --set image.pullPolicy=${IMAGE_PULL_POLICY} ``` - + Check [values.yaml](./helm/botkube/values.yaml) for default options. - + ### Build and run locally For faster development, you can also build and run Botkube outside K8s cluster. @@ -130,6 +130,38 @@ For faster development, you can also build and run Botkube outside K8s cluster. ./botkube ``` +#### Develop Botkube plugins + +**Prerequisite** + +- Being able to start the Botkube binary locally. +- [GoReleaser](https://goreleaser.com/install/) + +**Steps** + +1. Start fake plugins server to serve binaries from [`dist`](dist) folder: + + ```bash + go run test/helpers/plugin_server.go + ``` + +2. Export Botkube plugins cache directory: + + ```bash + export BOTKUBE_PLUGINS_CACHE__DIR="/tmp/plugins" + ``` + +3. Each time you make a change to [source](cmd/source) or [executors](cmd/executor) plugins, run: + + ```bash + # rebuild plugins only for current GOOS and GOARCH + make build-plugins-single && + # remove cached plugins + rm -rf $BOTKUBE_PLUGINS_CACHE__DIR && + # start botkube to download fresh plugins + ./botkube + ``` + ## Making A Change - Before making any significant changes, please [open an issue](https://github.com/kubeshop/botkube/issues). Discussing your proposed changes ahead of time will make the contribution process smooth for everyone. diff --git a/Makefile b/Makefile index 9e5fa2244..000da31ed 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ .DEFAULT_GOAL := build -.PHONY: container-image test test-integration-slack test-integration-discord build pre-build publish lint lint-fix go-import-fmt system-check save-images load-and-push-images gen-grpc-resources build-plugins +.PHONY: container-image test test-integration-slack test-integration-discord build pre-build publish lint lint-fix go-import-fmt system-check save-images load-and-push-images gen-grpc-resources build-plugins build-plugins-single # Show this help. help: @@ -28,11 +28,18 @@ build: pre-build @cd cmd/botkube;GOOS_VAL=$(shell go env GOOS) CGO_ENABLED=0 GOARCH_VAL=$(shell go env GOARCH) go build -o $(shell go env GOPATH)/bin/botkube @echo "Build completed successfully" +# Build Botkube official plugins for all supported platforms. build-plugins: pre-build @echo "Building plugins binaries" @./hack/goreleaser.sh build_plugins @echo "Build completed successfully" +# Build Botkube official plugins only for current GOOS and GOARCH. +build-plugins-single: pre-build + @echo "Building single target plugins binaries" + @./hack/goreleaser.sh build_plugins_single + @echo "Build completed successfully" + # Build the image container-image: pre-build @echo "Building docker image" diff --git a/cmd/botkube/main.go b/cmd/botkube/main.go index 19facbd24..a14f031ff 100644 --- a/cmd/botkube/main.go +++ b/cmd/botkube/main.go @@ -30,6 +30,7 @@ import ( "github.com/kubeshop/botkube/internal/analytics" "github.com/kubeshop/botkube/internal/lifecycle" "github.com/kubeshop/botkube/internal/plugin" + "github.com/kubeshop/botkube/internal/source" "github.com/kubeshop/botkube/internal/storage" "github.com/kubeshop/botkube/pkg/action" "github.com/kubeshop/botkube/pkg/bot" @@ -43,7 +44,6 @@ import ( "github.com/kubeshop/botkube/pkg/notifier" "github.com/kubeshop/botkube/pkg/recommendation" "github.com/kubeshop/botkube/pkg/sink" - "github.com/kubeshop/botkube/pkg/source" ) const ( @@ -100,8 +100,8 @@ func run() error { errGroup, ctx := errgroup.WithContext(ctx) collector := plugin.NewCollector(logger) - enabledPluginExecutors := collector.GetAllEnabledAndUsedPlugins(conf) - pluginManager := plugin.NewManager(logger, conf.Plugins, enabledPluginExecutors) + enabledPluginExecutors, enabledPluginSources := collector.GetAllEnabledAndUsedPlugins(conf) + pluginManager := plugin.NewManager(logger, conf.Plugins, enabledPluginExecutors, enabledPluginSources) err = pluginManager.Start(ctx) if err != nil { @@ -322,6 +322,13 @@ func run() error { actionProvider := action.NewProvider(logger.WithField(componentLogFieldKey, "Action Provider"), conf.Actions, executorFactory) router.AddEnabledActionBindings(conf.Actions) + // TODO: temporary dispatcher for events + sourcePluginDispatcher := source.NewDispatcher(logger, notifiers, pluginManager, enabledPluginSources) + err = sourcePluginDispatcher.Start(ctx) + if err != nil { + return fmt.Errorf("while starting source plugin event dispatcher: %w", err) + } + // Create and start controller ctrl := controller.New( logger.WithField(componentLogFieldKey, "Controller"), diff --git a/cmd/executor/echo/main.go b/cmd/executor/echo/main.go index c948d7858..fb0cce976 100644 --- a/cmd/executor/echo/main.go +++ b/cmd/executor/echo/main.go @@ -30,7 +30,7 @@ func (EchoExecutor) Execute(_ context.Context, req *executor.ExecuteRequest) (*e } return &executor.ExecuteResponse{ - Data: data, + Data: data + "v2", }, nil } diff --git a/cmd/source/cm-watcher/README.md b/cmd/source/cm-watcher/README.md new file mode 100644 index 000000000..3021b81c8 --- /dev/null +++ b/cmd/source/cm-watcher/README.md @@ -0,0 +1,11 @@ +# Echo executor + +Echo is the example Botkube executor used during [e2e tests](../../../test/e2e). + +## Configuration parameters + +The Echo configuration should be specified in YAML format. It accepts such parameters: + +```yaml +configMapName: cm-map-watcher # config map name to react to. +``` diff --git a/cmd/source/cm-watcher/main.go b/cmd/source/cm-watcher/main.go new file mode 100644 index 000000000..0d414a58e --- /dev/null +++ b/cmd/source/cm-watcher/main.go @@ -0,0 +1,58 @@ +package main + +import ( + "context" + "encoding/json" + "time" + + "github.com/hashicorp/go-plugin" + + "github.com/kubeshop/botkube/pkg/api/source" +) + +const pluginName = "cm-watcher" + +// Config holds executor configuration. +type Config struct { + ConfigMapName string +} + +// CMWatcher implements Botkube source plugin. +type CMWatcher struct{} + +// Stream returns a given command as response. +func (CMWatcher) Stream(ctx context.Context) (source.StreamOutput, error) { + // TODO: in request we should receive the executor configuration. + cfg := Config{ + ConfigMapName: "cm-watcher-trigger", + } + + raw, err := json.Marshal(cfg) + if err != nil { + return source.StreamOutput{}, err + } + out := source.StreamOutput{ + Output: make(chan []byte), + } + + go func() { + for { + select { + case <-time.Tick(1 * time.Second): + out.Output <- raw + case <-ctx.Done(): + return + } + } + }() + + return out, nil +} + +func main() { + source.Serve(map[string]plugin.Plugin{ + pluginName: &source.Plugin{ + Source: &CMWatcher{}, + }, + }) +} diff --git a/hack/goreleaser.sh b/hack/goreleaser.sh index 2e0a5156e..0268afcc2 100755 --- a/hack/goreleaser.sh +++ b/hack/goreleaser.sh @@ -96,6 +96,10 @@ build_plugins() { goreleaser build -f .goreleaser.plugin.yaml --rm-dist --snapshot } +build_plugins_single() { + goreleaser build -f .goreleaser.plugin.yaml --rm-dist --snapshot --single-target +} + build_single() { export IMAGE_TAG=v9.99.9-dev docker run --rm --privileged \ @@ -127,6 +131,9 @@ case "${1}" in build_plugins) build_plugins ;; + build_plugins_single) + build_plugins_single + ;; build_single) build_single ;; diff --git a/internal/plugin/collector.go b/internal/plugin/collector.go index f4f388d6b..4ac63aeef 100644 --- a/internal/plugin/collector.go +++ b/internal/plugin/collector.go @@ -18,15 +18,21 @@ func NewCollector(log logrus.FieldLogger) *Collector { // GetAllEnabledAndUsedPlugins returns the list of all plugins that are both enabled and bind to at // least one communicator that is also enabled. -func (c *Collector) GetAllEnabledAndUsedPlugins(cfg *config.Config) []string { - // Collect all used executor - bindExecutors := map[string]struct{}{} +func (c *Collector) GetAllEnabledAndUsedPlugins(cfg *config.Config) ([]string, []string) { + // Collect all used executor/sources + var ( + bindExecutors = map[string]struct{}{} + bindSources = map[string]struct{}{} + ) collect := func(channels config.IdentifiableMap[config.ChannelBindingsByName]) { for _, bindings := range channels { for _, name := range bindings.Bindings.Executors { bindExecutors[name] = struct{}{} } + for _, name := range bindings.Bindings.Sources { + bindSources[name] = struct{}{} + } } } @@ -47,6 +53,9 @@ func (c *Collector) GetAllEnabledAndUsedPlugins(cfg *config.Config) []string { for _, name := range commGroupCfg.Teams.Bindings.Executors { bindExecutors[name] = struct{}{} } + for _, name := range commGroupCfg.Teams.Bindings.Sources { + bindSources[name] = struct{}{} + } } if commGroupCfg.Discord.Enabled { @@ -54,6 +63,9 @@ func (c *Collector) GetAllEnabledAndUsedPlugins(cfg *config.Config) []string { for _, name := range bindings.Bindings.Executors { bindExecutors[name] = struct{}{} } + for _, name := range bindings.Bindings.Sources { + bindSources[name] = struct{}{} + } } } } @@ -82,5 +94,20 @@ func (c *Collector) GetAllEnabledAndUsedPlugins(cfg *config.Config) []string { } } - return usedExecutorPlugins + // Collect all sources that are both enabled and bind to at least one communicator that is enabled. + var usedSourcePlugins []string + for groupName, groupItems := range cfg.Sources { + for name, source := range groupItems.Plugins { + if !source.Enabled { + continue + } + _, found := bindSources[groupName] + if !found { + continue + } + + usedSourcePlugins = append(usedSourcePlugins, name) + } + } + return usedExecutorPlugins, usedSourcePlugins } diff --git a/internal/plugin/manager.go b/internal/plugin/manager.go index e7d65bbeb..e2120ead2 100644 --- a/internal/plugin/manager.go +++ b/internal/plugin/manager.go @@ -18,12 +18,14 @@ import ( "github.com/kubeshop/botkube/pkg/api" "github.com/kubeshop/botkube/pkg/api/executor" + "github.com/kubeshop/botkube/pkg/api/source" "github.com/kubeshop/botkube/pkg/config" "github.com/kubeshop/botkube/pkg/multierror" ) const ( executorPluginName = "executor" + sourcePluginName = "source" dirPerms = 0o775 binPerms = 0o755 filePerms = 0o664 @@ -33,8 +35,7 @@ const ( // This map is used in order to identify a plugin called Dispense. // This map is globally available and must stay consistent in order for all the plugins to work. var pluginMap = map[string]plugin.Plugin{ - // TODO(plugin-sources): add me: - //sourcePluginName: &source.Plugin{}, + sourcePluginName: &source.Plugin{}, executorPluginName: &executor.Plugin{}, } @@ -47,28 +48,34 @@ type Manager struct { executorsToEnable []string executorsStore store[executor.Executor] + + sourcesStore store[source.Source] + sourcesToEnable []string } // NewManager returns a new Manager instance. -func NewManager(logger logrus.FieldLogger, cfg config.Plugins, executors []string) *Manager { +func NewManager(logger logrus.FieldLogger, cfg config.Plugins, executors, sources []string) *Manager { return &Manager{ cfg: cfg, httpClient: newHTTPClient(), executorsToEnable: executors, executorsStore: newStore[executor.Executor](), + sourcesToEnable: sources, + sourcesStore: newStore[source.Source](), log: logger.WithField("component", "Plugin Manager"), } } // Start downloads and starts all enabled plugins. func (m *Manager) Start(ctx context.Context) error { - if len(m.executorsToEnable) == 0 { + if len(m.executorsToEnable) == 0 && len(m.sourcesToEnable) == 0 { m.log.Info("No external plugins are enabled.") return nil } m.log.WithFields(logrus.Fields{ "enabledExecutors": strings.Join(m.executorsToEnable, ","), + "enabledSources": strings.Join(m.sourcesToEnable, ","), }).Info("Starting Plugin Manager for all enabled plugins") err := m.start(ctx, false) @@ -100,6 +107,17 @@ func (m *Manager) start(ctx context.Context, forceUpdate bool) error { return fmt.Errorf("while creating executor plugins: %w", err) } m.executorsStore.EnabledPlugins = executorClients + + sourcesPlugins, err := m.loadPlugins(ctx, sourcePluginName, m.sourcesToEnable, m.sourcesStore.Repository) + if err != nil { + return err + } + sourcesClients, err := createGRPCClients[source.Source](sourcesPlugins, sourcePluginName) + if err != nil { + return fmt.Errorf("while creating source plugins: %w", err) + } + m.sourcesStore.EnabledPlugins = sourcesClients + return nil } @@ -111,7 +129,21 @@ func (m *Manager) GetExecutor(name string) (executor.Executor, error) { client, found := m.executorsStore.EnabledPlugins[name] if !found || client.Client == nil { - return nil, fmt.Errorf("client for plugin %q not found", name) + return nil, fmt.Errorf("client for executor plugin %q not found", name) + } + + return client.Client, nil +} + +// GetSource returns the source client for a given plugin. +func (m *Manager) GetSource(name string) (source.Source, error) { + if !m.isStarted.Load() { + return nil, ErrNotStartedPluginManager + } + + client, found := m.sourcesStore.EnabledPlugins[name] + if !found || client.Client == nil { + return nil, fmt.Errorf("client for source plugin %q not found", name) } return client.Client, nil @@ -121,7 +153,13 @@ func (m *Manager) GetExecutor(name string) (executor.Executor, error) { // This method blocks until all cleanup is finished. func (m *Manager) Shutdown() { var wg sync.WaitGroup - for _, p := range m.executorsStore.EnabledPlugins { + releasePlugins(&wg, m.sourcesStore.EnabledPlugins) + releasePlugins(&wg, m.executorsStore.EnabledPlugins) + wg.Wait() +} + +func releasePlugins[T any](wg *sync.WaitGroup, enabledPlugins storePlugins[T]) { + for _, p := range enabledPlugins { wg.Add(1) go func(close func()) { @@ -131,7 +169,6 @@ func (m *Manager) Shutdown() { wg.Done() }(p.Cleanup) } - wg.Wait() } func (m *Manager) loadPlugins(ctx context.Context, pluginType string, pluginsToEnable []string, repo storeRepository) (map[string]string, error) { @@ -205,11 +242,12 @@ func (m *Manager) loadRepositoriesMetadata(ctx context.Context, forceUpdate bool rawIndexes[repo] = data } - executorsRepos, err := newStoreRepository(rawIndexes) + executorsRepos, sourcesRepos, err := newStoreRepositories(rawIndexes) if err != nil { - return fmt.Errorf("while building executors repository store: %w", err) + return fmt.Errorf("while building repositories store: %w", err) } m.executorsStore.Repository = executorsRepos + m.sourcesStore.Repository = sourcesRepos return nil } diff --git a/internal/plugin/store.go b/internal/plugin/store.go index fec05d470..679a8283b 100644 --- a/internal/plugin/store.go +++ b/internal/plugin/store.go @@ -41,42 +41,53 @@ func newStore[T any]() store[T] { } } -func newStoreRepository(indexes map[string][]byte) (storeRepository, error) { - executorsRepository := storeRepository{} +func newStoreRepositories(indexes map[string][]byte) (storeRepository, storeRepository, error) { + var ( + executorsRepositories = storeRepository{} + sourcesRepositories = storeRepository{} + ) for repo, data := range indexes { var index Index if err := yaml.Unmarshal(data, &index); err != nil { - return nil, fmt.Errorf("while unmarshaling index: %w", err) + fmt.Println(string(data)) + return nil, nil, fmt.Errorf("while unmarshaling index: %w", err) } for _, entry := range index.Entries { // omit version, as we want to collect plugins with different version together key, err := BuildPluginKey(repo, entry.Name, "") if err != nil { - return nil, fmt.Errorf("while building key for entry in %s repository: %w", repo, err) + return nil, nil, fmt.Errorf("while building key for entry in %s repository: %w", repo, err) } switch entry.Type { case TypeExecutor: - executorsRepository[key] = append(executorsRepository[key], storeEntry{ + executorsRepositories[key] = append(executorsRepositories[key], storeEntry{ Description: entry.Description, Version: entry.Version, URLs: mapBinaryURLs(entry.URLs), }) case TypeSource: - // TODO(plugin-sources): add me. + sourcesRepositories[key] = append(sourcesRepositories[key], storeEntry{ + Description: entry.Description, + Version: entry.Version, + URLs: mapBinaryURLs(entry.URLs), + }) } } } // sort loaded entries by version - for key := range executorsRepository { - sort.Sort(byIndexEntryVersion(executorsRepository[key])) + for key := range executorsRepositories { + sort.Sort(byIndexEntryVersion(executorsRepositories[key])) + } + + for key := range sourcesRepositories { + sort.Sort(byIndexEntryVersion(sourcesRepositories[key])) } - // TODO(plugin-sources): add sorting - return executorsRepository, nil + return executorsRepositories, sourcesRepositories, nil } func mapBinaryURLs(in []IndexURL) map[string]string { diff --git a/internal/plugin/store_test.go b/internal/plugin/store_test.go index 7f4c65052..63892d74b 100644 --- a/internal/plugin/store_test.go +++ b/internal/plugin/store_test.go @@ -54,7 +54,7 @@ func TestNewStoreRepository(t *testing.T) { } // when - executors, err := newStoreRepository(repositories) + executors, _, err := newStoreRepositories(repositories) // then require.NoError(t, err) diff --git a/pkg/source/registration.go b/internal/source/registration.go similarity index 100% rename from pkg/source/registration.go rename to internal/source/registration.go diff --git a/pkg/source/registration_test.go b/internal/source/registration_test.go similarity index 100% rename from pkg/source/registration_test.go rename to internal/source/registration_test.go diff --git a/pkg/source/router.go b/internal/source/router.go similarity index 100% rename from pkg/source/router.go rename to internal/source/router.go diff --git a/pkg/source/router_test.go b/internal/source/router_test.go similarity index 100% rename from pkg/source/router_test.go rename to internal/source/router_test.go diff --git a/internal/source/source.go b/internal/source/source.go new file mode 100644 index 000000000..e5df9b534 --- /dev/null +++ b/internal/source/source.go @@ -0,0 +1,72 @@ +package source + +import ( + "context" + "fmt" + + "github.com/sirupsen/logrus" + + "github.com/kubeshop/botkube/internal/plugin" + "github.com/kubeshop/botkube/pkg/bot/interactive" + "github.com/kubeshop/botkube/pkg/notifier" +) + +// Dispatcher watches for enabled sources events and send them to notifiers. +type Dispatcher struct { + log logrus.FieldLogger + notifiers []notifier.Notifier + manager *plugin.Manager + sources []string +} + +// NewDispatcher create a new Dispatcher instance. +func NewDispatcher(log logrus.FieldLogger, notifiers []notifier.Notifier, manager *plugin.Manager, sources []string) *Dispatcher { + return &Dispatcher{ + log: log, + notifiers: notifiers, + manager: manager, + sources: sources, + } +} + +// Start starts all sources and dispatch received events. +func (d *Dispatcher) Start(ctx context.Context) error { + for _, name := range d.sources { + sourceClient, err := d.manager.GetSource(name) + if err != nil { + return fmt.Errorf("while getting source client for %s: %w", name, err) + } + out, err := sourceClient.Stream(ctx) + if err != nil { + return fmt.Errorf("while opening stream for %s: %w", name, err) + } + + go func() { + for { + select { + case event := <-out.Output: + d.dispatch(ctx, event) + case <-ctx.Done(): + return + } + } + }() + } + + return nil +} + +func (d *Dispatcher) dispatch(ctx context.Context, event []byte) { + for _, n := range d.notifiers { + go func(n notifier.Notifier) { + err := n.SendMessageToAll(ctx, interactive.Message{ + Base: interactive.Base{ + Header: string(event), + }, + }) + if err != nil { + d.log.Errorf("while sending event: %s", err.Error()) + } + }(n) + } +} diff --git a/pkg/api/source/grpc_adapter.go b/pkg/api/source/grpc_adapter.go new file mode 100644 index 000000000..3ab32058e --- /dev/null +++ b/pkg/api/source/grpc_adapter.go @@ -0,0 +1,144 @@ +package source + +import ( + "context" + "io" + "log" + + "github.com/hashicorp/go-plugin" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/kubeshop/botkube/pkg/api" +) + +// Source defines the Botkube source plugin functionality. +type Source interface { + Stream(ctx context.Context) (StreamOutput, error) +} + +// StreamOutput contains the stream data. +type StreamOutput struct { + // Output represents the streamed events. It is from start of plugin execution. + Output chan []byte + // TODO: we should consider adding error feedback channel too. +} + +// ProtocolVersion is the version that must match between Botkube core +// and Botkube plugins. This should be bumped whenever a change happens in +// one or the other that makes it so that they can't safely communicate. +// This could be adding a new interface value, it could be how helper/schema computes diffs, etc. +// +// NOTE: In the future we can consider using VersionedPlugins. These can be used to negotiate +// a compatible version between client and server. If this is set, Handshake.ProtocolVersion is not required. +const ProtocolVersion = 1 + +var _ plugin.GRPCPlugin = &Plugin{} + +// Plugin This is the implementation of plugin.GRPCPlugin, so we can serve and consume different Botkube Sources. +type Plugin struct { + // The GRPC plugin must still implement the Plugin interface. + plugin.NetRPCUnsupportedPlugin + + // Source represent a concrete implementation that handles the business logic. + Source Source +} + +// GRPCServer registers plugin for serving with the given GRPCServer. +func (p *Plugin) GRPCServer(_ *plugin.GRPCBroker, s *grpc.Server) error { + RegisterSourceServer(s, &grpcServer{ + Source: p.Source, + }) + return nil +} + +// GRPCClient returns the interface implementation for the plugin that is serving via gRPC by GRPCServer. +func (p *Plugin) GRPCClient(_ context.Context, _ *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) { + return &grpcClient{ + client: NewSourceClient(c), + }, nil +} + +type grpcClient struct { + client SourceClient +} + +func (p *grpcClient) Stream(ctx context.Context) (StreamOutput, error) { + stream, err := p.client.Stream(ctx, &emptypb.Empty{}) + if err != nil { + return StreamOutput{}, err + } + + out := StreamOutput{ + Output: make(chan []byte), + } + + go func() { + for { + // RecvMsg blocks until it receives a message into m or the stream is + // done. It returns io.EOF when the stream completes successfully. + feature, err := stream.Recv() + if err == io.EOF { + break + } + + // On any other error, the stream is aborted and the error contains the RPC + // status. + if err != nil { + log.Print(err) + // TODO: we should consider adding error feedback channel to StreamOutput. + return + } + out.Output <- feature.Output + } + }() + + return out, nil +} + +type grpcServer struct { + UnimplementedSourceServer + Source Source +} + +func (p *grpcServer) Stream(_ *emptypb.Empty, gstream Source_StreamServer) error { + ctx := gstream.Context() + + // It's up to the 'Stream' method to close the returned channels as it sends the data to it. + // We can only use 'ctx' to cancel streaming and release associated resources. + stream, err := p.Source.Stream(ctx) + if err != nil { + return err + } + + for { + select { + case <-ctx.Done(): // client canceled stream, we can release this connection. + return ctx.Err() + case out, ok := <-stream.Output: + if !ok { + return nil // output closed, no more chunk logs + } + + err := gstream.Send(&StreamResponse{ + Output: out, + }) + if err != nil { + return err + } + } + } +} + +// Serve serves given plugins. +func Serve(p map[string]plugin.Plugin) { + plugin.Serve(&plugin.ServeConfig{ + Plugins: p, + HandshakeConfig: plugin.HandshakeConfig{ + ProtocolVersion: ProtocolVersion, + MagicCookieKey: api.HandshakeConfig.MagicCookieKey, + MagicCookieValue: api.HandshakeConfig.MagicCookieValue, + }, + GRPCServer: plugin.DefaultGRPCServer, + }) +} diff --git a/pkg/api/source/source.pb.go b/pkg/api/source/source.pb.go new file mode 100644 index 000000000..d22f227bf --- /dev/null +++ b/pkg/api/source/source.pb.go @@ -0,0 +1,153 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.27.1 +// protoc v3.19.4 +// source: source.proto + +package source + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type StreamResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Output represents the streamed Source events. It is from start of Source execution. + Output []byte `protobuf:"bytes,1,opt,name=output,proto3" json:"output,omitempty"` +} + +func (x *StreamResponse) Reset() { + *x = StreamResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_source_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StreamResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamResponse) ProtoMessage() {} + +func (x *StreamResponse) ProtoReflect() protoreflect.Message { + mi := &file_source_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamResponse.ProtoReflect.Descriptor instead. +func (*StreamResponse) Descriptor() ([]byte, []int) { + return file_source_proto_rawDescGZIP(), []int{0} +} + +func (x *StreamResponse) GetOutput() []byte { + if x != nil { + return x.Output + } + return nil +} + +var File_source_proto protoreflect.FileDescriptor + +var file_source_proto_rawDesc = []byte{ + 0x0a, 0x0c, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, + 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x22, 0x28, 0x0a, 0x0e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x32, 0x46, 0x0a, + 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x3c, 0x0a, 0x06, 0x53, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x16, 0x2e, 0x73, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, 0x10, 0x5a, 0x0e, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, + 0x2f, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_source_proto_rawDescOnce sync.Once + file_source_proto_rawDescData = file_source_proto_rawDesc +) + +func file_source_proto_rawDescGZIP() []byte { + file_source_proto_rawDescOnce.Do(func() { + file_source_proto_rawDescData = protoimpl.X.CompressGZIP(file_source_proto_rawDescData) + }) + return file_source_proto_rawDescData +} + +var file_source_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_source_proto_goTypes = []interface{}{ + (*StreamResponse)(nil), // 0: source.StreamResponse + (*emptypb.Empty)(nil), // 1: google.protobuf.Empty +} +var file_source_proto_depIdxs = []int32{ + 1, // 0: source.Source.Stream:input_type -> google.protobuf.Empty + 0, // 1: source.Source.Stream:output_type -> source.StreamResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_source_proto_init() } +func file_source_proto_init() { + if File_source_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_source_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StreamResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_source_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_source_proto_goTypes, + DependencyIndexes: file_source_proto_depIdxs, + MessageInfos: file_source_proto_msgTypes, + }.Build() + File_source_proto = out.File + file_source_proto_rawDesc = nil + file_source_proto_goTypes = nil + file_source_proto_depIdxs = nil +} diff --git a/pkg/api/source/source_grpc.pb.go b/pkg/api/source/source_grpc.pb.go new file mode 100644 index 000000000..5f9efae78 --- /dev/null +++ b/pkg/api/source/source_grpc.pb.go @@ -0,0 +1,133 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.19.4 +// source: source.proto + +package source + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + emptypb "google.golang.org/protobuf/types/known/emptypb" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// SourceClient is the client API for Source service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type SourceClient interface { + Stream(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (Source_StreamClient, error) +} + +type sourceClient struct { + cc grpc.ClientConnInterface +} + +func NewSourceClient(cc grpc.ClientConnInterface) SourceClient { + return &sourceClient{cc} +} + +func (c *sourceClient) Stream(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (Source_StreamClient, error) { + stream, err := c.cc.NewStream(ctx, &Source_ServiceDesc.Streams[0], "/source.Source/Stream", opts...) + if err != nil { + return nil, err + } + x := &sourceStreamClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Source_StreamClient interface { + Recv() (*StreamResponse, error) + grpc.ClientStream +} + +type sourceStreamClient struct { + grpc.ClientStream +} + +func (x *sourceStreamClient) Recv() (*StreamResponse, error) { + m := new(StreamResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// SourceServer is the server API for Source service. +// All implementations must embed UnimplementedSourceServer +// for forward compatibility +type SourceServer interface { + Stream(*emptypb.Empty, Source_StreamServer) error + mustEmbedUnimplementedSourceServer() +} + +// UnimplementedSourceServer must be embedded to have forward compatible implementations. +type UnimplementedSourceServer struct { +} + +func (UnimplementedSourceServer) Stream(*emptypb.Empty, Source_StreamServer) error { + return status.Errorf(codes.Unimplemented, "method Stream not implemented") +} +func (UnimplementedSourceServer) mustEmbedUnimplementedSourceServer() {} + +// UnsafeSourceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to SourceServer will +// result in compilation errors. +type UnsafeSourceServer interface { + mustEmbedUnimplementedSourceServer() +} + +func RegisterSourceServer(s grpc.ServiceRegistrar, srv SourceServer) { + s.RegisterService(&Source_ServiceDesc, srv) +} + +func _Source_Stream_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(emptypb.Empty) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(SourceServer).Stream(m, &sourceStreamServer{stream}) +} + +type Source_StreamServer interface { + Send(*StreamResponse) error + grpc.ServerStream +} + +type sourceStreamServer struct { + grpc.ServerStream +} + +func (x *sourceStreamServer) Send(m *StreamResponse) error { + return x.ServerStream.SendMsg(m) +} + +// Source_ServiceDesc is the grpc.ServiceDesc for Source service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Source_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "source.Source", + HandlerType: (*SourceServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Stream", + Handler: _Source_Stream_Handler, + ServerStreams: true, + }, + }, + Metadata: "source.proto", +} diff --git a/pkg/config/config.go b/pkg/config/config.go index d8ccb2345..d0cef5495 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -198,6 +198,7 @@ type ActionBindings struct { type Sources struct { DisplayName string `yaml:"displayName"` Kubernetes KubernetesSource `yaml:"kubernetes"` + Plugins PluginsExecutors `koanf:",remain"` } // KubernetesSource contains configuration for Kubernetes sources. diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index e542e5653..99821d51c 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -50,6 +50,15 @@ func TestLoadConfigSuccess(t *testing.T) { func TestLoadConfigWithPlugins(t *testing.T) { // given + expSourcePlugin := config.PluginsExecutors{ + "botkube/keptn": { + Enabled: true, + Config: map[string]interface{}{ + "field": "value", + }, + }, + } + expExecutorPlugin := config.PluginsExecutors{ "botkube/echo": { Enabled: true, @@ -70,6 +79,7 @@ func TestLoadConfigWithPlugins(t *testing.T) { require.NoError(t, err) require.NotNil(t, gotCfg) + assert.Equal(t, expSourcePlugin, gotCfg.Sources["k8s-events"].Plugins) assert.Equal(t, expExecutorPlugin, gotCfg.Executors["plugin-based"].Plugins) } diff --git a/pkg/config/testdata/TestLoadConfigSuccess/config-all.yaml b/pkg/config/testdata/TestLoadConfigSuccess/config-all.yaml index d4d0dff6c..bf10b4a83 100644 --- a/pkg/config/testdata/TestLoadConfigSuccess/config-all.yaml +++ b/pkg/config/testdata/TestLoadConfigSuccess/config-all.yaml @@ -104,6 +104,7 @@ communications: # req 1 elm. sources: 'k8s-events': + displayName: "Plugins & Builtins" kubernetes: recommendations: @@ -204,6 +205,11 @@ sources: - spec.template.spec.containers[*].image - status.readyReplicas + botkube/keptn: + enabled: true + config: + field: value + filters: kubernetes: objectAnnotationChecker: true diff --git a/pkg/config/testdata/TestLoadConfigSuccess/config.golden.yaml b/pkg/config/testdata/TestLoadConfigSuccess/config.golden.yaml index bf1b6ebdf..6ad30aa77 100644 --- a/pkg/config/testdata/TestLoadConfigSuccess/config.golden.yaml +++ b/pkg/config/testdata/TestLoadConfigSuccess/config.golden.yaml @@ -10,7 +10,7 @@ actions: - kubectl-read-only sources: k8s-events: - displayName: "" + displayName: Plugins & Builtins kubernetes: recommendations: ingress: @@ -271,6 +271,11 @@ sources: my-annotation: "true" labels: my-label: "true" + plugins: + botkube/keptn: + enabled: true + config: + field: value executors: kubectl-read-only: kubectl: diff --git a/pkg/config/testdata/TestLoadConfigWithPlugins/config-all.yaml b/pkg/config/testdata/TestLoadConfigWithPlugins/config-all.yaml index 794b56bcd..978caf411 100644 --- a/pkg/config/testdata/TestLoadConfigWithPlugins/config-all.yaml +++ b/pkg/config/testdata/TestLoadConfigWithPlugins/config-all.yaml @@ -16,6 +16,7 @@ communications: # req 1 elm. sources: 'k8s-events': + displayName: "Plugins & Builtins" kubernetes: events: @@ -27,6 +28,12 @@ sources: resources: - name: v1/pods + botkube/keptn: + enabled: true + config: + field: value + + executors: 'kubectl-read-only': # Kubectl executor configs diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 3e785dd1e..4d127742d 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -14,6 +14,7 @@ import ( "k8s.io/client-go/tools/cache" "github.com/kubeshop/botkube/internal/analytics" + "github.com/kubeshop/botkube/internal/source" "github.com/kubeshop/botkube/pkg/bot/interactive" "github.com/kubeshop/botkube/pkg/config" "github.com/kubeshop/botkube/pkg/event" @@ -21,7 +22,6 @@ import ( "github.com/kubeshop/botkube/pkg/multierror" "github.com/kubeshop/botkube/pkg/notifier" "github.com/kubeshop/botkube/pkg/recommendation" - "github.com/kubeshop/botkube/pkg/source" ) const ( diff --git a/proto/source.proto b/proto/source.proto new file mode 100644 index 000000000..52c568210 --- /dev/null +++ b/proto/source.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +import "google/protobuf/empty.proto"; + +option go_package = "pkg/api/source"; + +package source; + +message StreamResponse { + // Output represents the streamed Source events. It is from start of Source execution. + bytes output = 1; +} + +service Source { + rpc Stream(google.protobuf.Empty) returns (stream StreamResponse) {} +} diff --git a/test/e2e/bots_test.go b/test/e2e/bots_test.go index 077c8a2fd..4f22658d1 100644 --- a/test/e2e/bots_test.go +++ b/test/e2e/bots_test.go @@ -22,6 +22,7 @@ import ( "github.com/kubeshop/botkube/pkg/bot/interactive" "github.com/kubeshop/botkube/pkg/config" "github.com/kubeshop/botkube/pkg/filterengine/filters" + "github.com/kubeshop/botkube/test/e2e/fake" ) type Config struct { @@ -41,7 +42,7 @@ type Config struct { BotkubePluginRepoURL string `envconfig:"default=BOTKUBE_PLUGINS_REPOSITORIES_BOTKUBE"` } } - Plugins PluginsConfig + Plugins fake.PluginConfig ConfigMap struct { Namespace string `envconfig:"default=botkube"` } @@ -153,7 +154,7 @@ func runBotTest(t *testing.T, require.NoError(t, err) t.Log("Starting plugin server...") - indexEndpoint, startServerFn := NewPluginServer(appCfg.Plugins) + indexEndpoint, startServerFn := fake.NewPluginServer(appCfg.Plugins) go func() { require.NoError(t, startServerFn()) }() diff --git a/test/e2e/plugin_server_test.go b/test/e2e/fake/plugin_server.go similarity index 56% rename from test/e2e/plugin_server_test.go rename to test/e2e/fake/plugin_server.go index acf1a4de4..435293301 100644 --- a/test/e2e/plugin_server_test.go +++ b/test/e2e/fake/plugin_server.go @@ -1,4 +1,4 @@ -package e2e +package fake import ( "fmt" @@ -15,18 +15,22 @@ import ( const ( executorBinaryPrefix = "executor_" + sourceBinaryPrefix = "source_" indexFileEndpoint = "/botkube.yaml" ) -type PluginsConfig struct { - BinariesDirectory string `envconfig:"default=dist"` - Server struct { +type ( + PluginConfig struct { + BinariesDirectory string + Server PluginServer + } + PluginServer struct { Host string `envconfig:"default=http://host.k3d.internal"` Port int `envconfig:"default=3000"` } -} +) -func NewPluginServer(cfg PluginsConfig) (string, func() error) { +func NewPluginServer(cfg PluginConfig) (string, func() error) { fs := http.FileServer(http.Dir(cfg.BinariesDirectory)) http.Handle("/static/", http.StripPrefix("/static/", fs)) @@ -75,33 +79,19 @@ func buildIndex(urlBasePath string, dir string) (plugin.Index, error) { if entry.IsDir() { continue } - if !strings.HasPrefix(entry.Name(), executorBinaryPrefix) { - continue - } - - parts := strings.Split(entry.Name(), "_") - if len(parts) != 4 { - return plugin.Index{}, fmt.Errorf("path %s doesn't follow required pattern ___", entry.Name()) - } - name, os, arch := parts[1], parts[2], parts[3] - item, found := entries[name] - if !found { - item = plugin.IndexEntry{ - Name: name, - Type: plugin.TypeExecutor, - Description: "Executor description", - Version: "v0.1.0", + switch { + case strings.HasPrefix(entry.Name(), executorBinaryPrefix): + err := appendEntry(entries, entry.Name(), "Executor description", urlBasePath, plugin.TypeExecutor) + if err != nil { + return plugin.Index{}, err + } + case strings.HasPrefix(entry.Name(), sourceBinaryPrefix): + err := appendEntry(entries, entry.Name(), "Source description", urlBasePath, plugin.TypeSource) + if err != nil { + return plugin.Index{}, err } } - item.URLs = append(item.URLs, plugin.IndexURL{ - URL: fmt.Sprintf("%s/static/%s", urlBasePath, entry.Name()), - Platform: plugin.IndexURLPlatform{ - OS: os, - Arch: arch, - }, - }) - entries[name] = item } var out plugin.Index @@ -110,3 +100,31 @@ func buildIndex(urlBasePath string, dir string) (plugin.Index, error) { } return out, nil } + +func appendEntry(entries map[string]plugin.IndexEntry, entryName, desc, urlBasePath string, pluginType plugin.Type) error { + parts := strings.Split(entryName, "_") + if len(parts) != 4 { + return fmt.Errorf("path %s doesn't follow required pattern ___", entryName) + } + + name, os, arch := parts[1], parts[2], parts[3] + item, found := entries[name] + if !found { + item = plugin.IndexEntry{ + Name: name, + Type: pluginType, + Description: desc, + Version: "v0.1.0", + } + } + item.URLs = append(item.URLs, plugin.IndexURL{ + URL: fmt.Sprintf("%s/static/%s", urlBasePath, entryName), + Platform: plugin.IndexURLPlatform{ + OS: os, + Arch: arch, + }, + }) + entries[name] = item + + return nil +} diff --git a/test/helpers/plugin_server.go b/test/helpers/plugin_server.go new file mode 100644 index 000000000..36cf69437 --- /dev/null +++ b/test/helpers/plugin_server.go @@ -0,0 +1,34 @@ +package main + +import ( + "log" + "os" + "path/filepath" + + "github.com/kubeshop/botkube/test/e2e/fake" +) + +func main() { + dir, err := os.Getwd() + exitOnErr(err) + + binDir := filepath.Join(dir, "dist") + indexEndpoint, startServerFn := fake.NewPluginServer(fake.PluginConfig{ + BinariesDirectory: binDir, + Server: fake.PluginServer{ + Host: "http://localhost", + Port: 3000, + }, + }) + + log.Printf("Service plugin binaries from %s\n", binDir) + log.Printf("Botkube repository index URL: %s", indexEndpoint) + err = startServerFn() + exitOnErr(err) +} + +func exitOnErr(err error) { + if err != nil { + log.Fatal(err) + } +} From 0da015a6b1aa664c22afdfb69cafb38b6b0baaf1 Mon Sep 17 00:00:00 2001 From: Mateusz Szostok Date: Tue, 29 Nov 2022 10:41:31 +0100 Subject: [PATCH 2/5] Update source example --- CONTRIBUTING.md | 5 +- cmd/executor/echo/README.md | 2 +- cmd/executor/echo/main.go | 2 +- cmd/source/cm-watcher/README.md | 10 ++-- cmd/source/cm-watcher/main.go | 98 ++++++++++++++++++++++++++------- 5 files changed, 91 insertions(+), 26 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index c847d8782..7ee930b36 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -151,7 +151,7 @@ For faster development, you can also build and run Botkube outside K8s cluster. export BOTKUBE_PLUGINS_CACHE__DIR="/tmp/plugins" ``` -3. Each time you make a change to [source](cmd/source) or [executors](cmd/executor) plugins, run: +3. In other terminal window, run: ```bash # rebuild plugins only for current GOOS and GOARCH @@ -162,6 +162,9 @@ For faster development, you can also build and run Botkube outside K8s cluster. ./botkube ``` + > **Note** + > Each time you make a change to the [source](cmd/source) or [executors](cmd/executor) plugins re-run the above command. + ## Making A Change - Before making any significant changes, please [open an issue](https://github.com/kubeshop/botkube/issues). Discussing your proposed changes ahead of time will make the contribution process smooth for everyone. diff --git a/cmd/executor/echo/README.md b/cmd/executor/echo/README.md index 02b215bc7..96ee1edea 100644 --- a/cmd/executor/echo/README.md +++ b/cmd/executor/echo/README.md @@ -4,7 +4,7 @@ Echo is the example Botkube executor used during [e2e tests](../../../test/e2e). ## Configuration parameters -The Echo configuration should be specified in YAML format. It accepts such parameters: +The configuration should be specified in YAML format. Such parameters are supported: ```yaml changeResponseToUpperCase: true # default is 'false'. diff --git a/cmd/executor/echo/main.go b/cmd/executor/echo/main.go index fb0cce976..c948d7858 100644 --- a/cmd/executor/echo/main.go +++ b/cmd/executor/echo/main.go @@ -30,7 +30,7 @@ func (EchoExecutor) Execute(_ context.Context, req *executor.ExecuteRequest) (*e } return &executor.ExecuteResponse{ - Data: data + "v2", + Data: data, }, nil } diff --git a/cmd/source/cm-watcher/README.md b/cmd/source/cm-watcher/README.md index 3021b81c8..035a55c35 100644 --- a/cmd/source/cm-watcher/README.md +++ b/cmd/source/cm-watcher/README.md @@ -1,11 +1,13 @@ -# Echo executor +# ConfigMap watcher source -Echo is the example Botkube executor used during [e2e tests](../../../test/e2e). +ConfigMap watcher source is the example Botkube source used during [e2e tests](../../../test/e2e). ## Configuration parameters -The Echo configuration should be specified in YAML format. It accepts such parameters: +The configuration should be specified in YAML format. Such parameters are supported: ```yaml -configMapName: cm-map-watcher # config map name to react to. +configMap: + name: cm-map-watcher # config map name to react to + namespace: botkube # config map namespace ``` diff --git a/cmd/source/cm-watcher/main.go b/cmd/source/cm-watcher/main.go index 0d414a58e..b97895d0a 100644 --- a/cmd/source/cm-watcher/main.go +++ b/cmd/source/cm-watcher/main.go @@ -2,10 +2,22 @@ package main import ( "context" - "encoding/json" - "time" + "fmt" + "log" + "os" + "path/filepath" "github.com/hashicorp/go-plugin" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/clientcmd" + + watchtools "k8s.io/client-go/tools/watch" "github.com/kubeshop/botkube/pkg/api/source" ) @@ -13,9 +25,15 @@ import ( const pluginName = "cm-watcher" // Config holds executor configuration. -type Config struct { - ConfigMapName string -} +type ( + Config struct { + ConfigMap Object + } + Object struct { + Name string `yaml:"name"` + Namespace string `yaml:"namespace"` + } +) // CMWatcher implements Botkube source plugin. type CMWatcher struct{} @@ -24,29 +42,65 @@ type CMWatcher struct{} func (CMWatcher) Stream(ctx context.Context) (source.StreamOutput, error) { // TODO: in request we should receive the executor configuration. cfg := Config{ - ConfigMapName: "cm-watcher-trigger", + ConfigMap: Object{ + Name: "cm-watcher-trigger", + Namespace: "botkube", + }, } - raw, err := json.Marshal(cfg) - if err != nil { - return source.StreamOutput{}, err - } out := source.StreamOutput{ Output: make(chan []byte), } - go func() { - for { - select { - case <-time.Tick(1 * time.Second): - out.Output <- raw - case <-ctx.Done(): + go listenEvents(ctx, cfg.ConfigMap, out.Output) + + return out, nil +} + +func listenEvents(ctx context.Context, obj Object, sink chan<- []byte) { + home, err := os.UserHomeDir() + exitOnError(err) + + config, err := clientcmd.BuildConfigFromFlags("", filepath.Join(home, ".kube", "config")) + exitOnError(err) + clientset, err := kubernetes.NewForConfig(config) + exitOnError(err) + + fieldSelector := fields.OneTermEqualSelector("metadata.name", obj.Name).String() + lw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.FieldSelector = fieldSelector + return clientset.CoreV1().Pods(obj.Namespace).List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.FieldSelector = fieldSelector + return clientset.CoreV1().Pods(obj.Namespace).Watch(ctx, options) + }, + } + + fmt.Println("starting informer") + _, informer, watcher, _ := watchtools.NewIndexerInformerWatcher(lw, &corev1.ConfigMap{}) + defer watcher.Stop() + + fmt.Println("waiting for cache sync") + cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) + + ch := watcher.ResultChan() + defer watcher.Stop() + + for { + select { + case event, ok := <-ch: + fmt.Println("get event", event) + if !ok { // finished return } + cm := event.Object.(*corev1.ConfigMap) + sink <- []byte(cm.Name) + case <-ctx.Done(): // client closed streaming + return } - }() - - return out, nil + } } func main() { @@ -56,3 +110,9 @@ func main() { }, }) } + +func exitOnError(err error) { + if err != nil { + log.Fatal(err) + } +} From 5de6a86b4f087d2ea6fb480e04a4fb9249cf3068 Mon Sep 17 00:00:00 2001 From: Mateusz Szostok Date: Tue, 29 Nov 2022 23:55:53 +0100 Subject: [PATCH 3/5] Refactor dispatcher --- Makefile | 4 +- cmd/botkube/main.go | 2 +- cmd/source/cm-watcher/main.go | 50 ++++---- helm/botkube/e2e-test-values.yaml | 9 ++ internal/plugin/store_test.go | 39 ++++++- .../TestNewStoreRepository/botkube.yaml | 26 +++++ internal/source/source.go | 107 ++++++++++++++---- internal/source/testdata/config.yaml | 46 ++++++++ test/e2e/bots_test.go | 45 ++++++-- test/{e2e => }/fake/plugin_server.go | 4 + test/helpers/plugin_server.go | 2 +- 11 files changed, 268 insertions(+), 66 deletions(-) create mode 100644 internal/source/testdata/config.yaml rename test/{e2e => }/fake/plugin_server.go (94%) diff --git a/Makefile b/Makefile index 000da31ed..81df9a7af 100644 --- a/Makefile +++ b/Makefile @@ -18,10 +18,10 @@ test: system-check @go test -v -race ./... test-integration-slack: system-check - @go test -v -tags=integration -race -count=1 ./test/... -run "TestSlack" + @go test -v -tags=integration -race -count=1 ./test/e2e/... -run "TestSlack" test-integration-discord: system-check - @go test -v -tags=integration -race -count=1 ./test/... -run "TestDiscord" + @go test -v -tags=integration -race -count=1 ./test/e2e/... -run "TestDiscord" # Build the binary build: pre-build diff --git a/cmd/botkube/main.go b/cmd/botkube/main.go index a14f031ff..704e0f181 100644 --- a/cmd/botkube/main.go +++ b/cmd/botkube/main.go @@ -323,7 +323,7 @@ func run() error { router.AddEnabledActionBindings(conf.Actions) // TODO: temporary dispatcher for events - sourcePluginDispatcher := source.NewDispatcher(logger, notifiers, pluginManager, enabledPluginSources) + sourcePluginDispatcher := source.NewDispatcher(logger, notifiers, pluginManager, conf) err = sourcePluginDispatcher.Start(ctx) if err != nil { return fmt.Errorf("while starting source plugin event dispatcher: %w", err) diff --git a/cmd/source/cm-watcher/main.go b/cmd/source/cm-watcher/main.go index b97895d0a..c1e6861a7 100644 --- a/cmd/source/cm-watcher/main.go +++ b/cmd/source/cm-watcher/main.go @@ -5,7 +5,6 @@ import ( "fmt" "log" "os" - "path/filepath" "github.com/hashicorp/go-plugin" corev1 "k8s.io/api/core/v1" @@ -16,7 +15,6 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" - watchtools "k8s.io/client-go/tools/watch" "github.com/kubeshop/botkube/pkg/api/source" @@ -24,14 +22,16 @@ import ( const pluginName = "cm-watcher" -// Config holds executor configuration. type ( + // Config holds executor configuration. Config struct { ConfigMap Object } + // Object holds information about object to watch. Object struct { - Name string `yaml:"name"` - Namespace string `yaml:"namespace"` + Name string `yaml:"name"` + Namespace string `yaml:"namespace"` + Event watch.EventType `yaml:"event"` } ) @@ -45,6 +45,7 @@ func (CMWatcher) Stream(ctx context.Context) (source.StreamOutput, error) { ConfigMap: Object{ Name: "cm-watcher-trigger", Namespace: "botkube", + Event: "ADDED", }, } @@ -58,10 +59,7 @@ func (CMWatcher) Stream(ctx context.Context) (source.StreamOutput, error) { } func listenEvents(ctx context.Context, obj Object, sink chan<- []byte) { - home, err := os.UserHomeDir() - exitOnError(err) - - config, err := clientcmd.BuildConfigFromFlags("", filepath.Join(home, ".kube", "config")) + config, err := clientcmd.BuildConfigFromFlags("", os.Getenv("KUBECONFIG")) exitOnError(err) clientset, err := kubernetes.NewForConfig(config) exitOnError(err) @@ -70,37 +68,27 @@ func listenEvents(ctx context.Context, obj Object, sink chan<- []byte) { lw := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { options.FieldSelector = fieldSelector - return clientset.CoreV1().Pods(obj.Namespace).List(ctx, options) + return clientset.CoreV1().ConfigMaps(obj.Namespace).List(ctx, options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { options.FieldSelector = fieldSelector - return clientset.CoreV1().Pods(obj.Namespace).Watch(ctx, options) + return clientset.CoreV1().ConfigMaps(obj.Namespace).Watch(ctx, options) }, } - fmt.Println("starting informer") - _, informer, watcher, _ := watchtools.NewIndexerInformerWatcher(lw, &corev1.ConfigMap{}) - defer watcher.Stop() - - fmt.Println("waiting for cache sync") - cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) - - ch := watcher.ResultChan() - defer watcher.Stop() - - for { - select { - case event, ok := <-ch: - fmt.Println("get event", event) - if !ok { // finished - return - } + infiniteWatch := func(event watch.Event) (bool, error) { + if event.Type == obj.Event { cm := event.Object.(*corev1.ConfigMap) - sink <- []byte(cm.Name) - case <-ctx.Done(): // client closed streaming - return + msg := fmt.Sprintf("Detected `%s` event on `%s/%s`", obj.Event, cm.Namespace, cm.Name) + sink <- []byte(msg) } + + // always continue - context will cancel this watch for us :) + return false, nil } + + _, err = watchtools.UntilWithSync(ctx, lw, &corev1.ConfigMap{}, nil, infiniteWatch) + exitOnError(err) } func main() { diff --git a/helm/botkube/e2e-test-values.yaml b/helm/botkube/e2e-test-values.yaml index 6ac8e70e0..f9beaf00a 100644 --- a/helm/botkube/e2e-test-values.yaml +++ b/helm/botkube/e2e-test-values.yaml @@ -20,6 +20,7 @@ communications: - k8s-events - k8s-annotated-cm-delete - k8s-pod-create-events + - plugin-based 'secondary': name: "" # Tests will override this temporarily notification: @@ -47,6 +48,7 @@ communications: - k8s-events - k8s-annotated-cm-delete - k8s-pod-create-events + - plugin-based 'secondary': id: "" # Tests will override this channel ID temporarily notification: @@ -120,6 +122,13 @@ sources: event: # overrides top level `event` entry types: - update + + 'plugin-based': + botkube/cm-watcher: + enabled: true + config: + configMapName: cm-watcher-trigger + executors: 'kubectl-read-only': kubectl: diff --git a/internal/plugin/store_test.go b/internal/plugin/store_test.go index 63892d74b..e67f0193c 100644 --- a/internal/plugin/store_test.go +++ b/internal/plugin/store_test.go @@ -52,13 +52,50 @@ func TestNewStoreRepository(t *testing.T) { }, }, } + expectedSources := storeRepository{ + "botkube/kubernetes": { + { + Description: "Kubernetes source", + Version: "v1.0.0", + URLs: map[string]string{ + "darwin/amd64": "https://github.com/kubeshop/botkube/releases/download/v0.17.0/darwin_amd64_source_kubernetes", + "darwin/arm64": "https://github.com/kubeshop/botkube/releases/download/v0.17.0/darwin_arm64_source_kubernetes", + "linux/amd64": "https://github.com/kubeshop/botkube/releases/download/v0.17.0/linux-_md64_source_kubernetes", + "linux/arm64": "https://github.com/kubeshop/botkube/releases/download/v0.17.0/linux-_rm64_source_kubernetes", + }, + }, + { + Description: "Kubernetes source", + Version: "0.1.0", // should support also version without `v` + URLs: map[string]string{ + "darwin/amd64": "https://github.com/kubeshop/botkube/releases/download/v0.1.0/darwin_amd64_source_kubernetes", + "darwin/arm64": "https://github.com/kubeshop/botkube/releases/download/v0.1.0/darwin_arm64_source_kubernetes", + "linux/amd64": "https://github.com/kubeshop/botkube/releases/download/v0.1.0/linux-_md64_source_kubernetes", + "linux/arm64": "https://github.com/kubeshop/botkube/releases/download/v0.1.0/linux-_rm64_source_kubernetes", + }, + }, + }, + "mszostok/cm-watcher": { + { + Description: "Source suitable for e2e testing.", + Version: "v1.0.0", + URLs: map[string]string{ + "darwin/amd64": "https://github.com/mszostok/botkube-plugins/releases/download/v1.0.0/darwin_amd64_cmd-watcher", + "darwin/arm64": "https://github.com/mszostok/botkube-plugins/releases/download/v1.0.0/darwin_arm64_cmd-watcher", + "linux/amd64": "https://github.com/mszostok/botkube-plugins/releases/download/v1.0.0/linux-_md64_cmd-watcher", + "linux/arm64": "https://github.com/mszostok/botkube-plugins/releases/download/v1.0.0/linux-_rm64_cmd-watcher", + }, + }, + }, + } // when - executors, _, err := newStoreRepositories(repositories) + executors, sources, err := newStoreRepositories(repositories) // then require.NoError(t, err) assert.Equal(t, executors, expectedExecutors) + assert.Equal(t, sources, expectedSources) } func loadTestdataFile(t *testing.T, name string) []byte { diff --git a/internal/plugin/testdata/TestNewStoreRepository/botkube.yaml b/internal/plugin/testdata/TestNewStoreRepository/botkube.yaml index 8593dc51b..b72314dca 100644 --- a/internal/plugin/testdata/TestNewStoreRepository/botkube.yaml +++ b/internal/plugin/testdata/TestNewStoreRepository/botkube.yaml @@ -66,3 +66,29 @@ entries: platform: os: linux architecture: arm64 + + - name: "kubernetes" + type: "source" + description: "Kubernetes source" + version: "0.1.0" # should support also without the `v` + urls: + # different url syntax + - url: https://github.com/kubeshop/botkube/releases/download/v0.1.0/darwin_amd64_source_kubernetes + platform: + os: darwin + architecture: amd64 + # different url syntax + - url: https://github.com/kubeshop/botkube/releases/download/v0.1.0/darwin_arm64_source_kubernetes + platform: + os: darwin + architecture: arm64 + # different url syntax + - url: https://github.com/kubeshop/botkube/releases/download/v0.1.0/linux-_md64_source_kubernetes + platform: + os: linux + architecture: amd64 + # different url syntax + - url: https://github.com/kubeshop/botkube/releases/download/v0.1.0/linux-_rm64_source_kubernetes + platform: + os: linux + architecture: arm64 diff --git a/internal/source/source.go b/internal/source/source.go index e5df9b534..4efb95654 100644 --- a/internal/source/source.go +++ b/internal/source/source.go @@ -3,11 +3,13 @@ package source import ( "context" "fmt" + "strings" "github.com/sirupsen/logrus" "github.com/kubeshop/botkube/internal/plugin" "github.com/kubeshop/botkube/pkg/bot/interactive" + "github.com/kubeshop/botkube/pkg/config" "github.com/kubeshop/botkube/pkg/notifier" ) @@ -16,54 +18,115 @@ type Dispatcher struct { log logrus.FieldLogger notifiers []notifier.Notifier manager *plugin.Manager - sources []string + cfg *config.Config } // NewDispatcher create a new Dispatcher instance. -func NewDispatcher(log logrus.FieldLogger, notifiers []notifier.Notifier, manager *plugin.Manager, sources []string) *Dispatcher { +func NewDispatcher(log logrus.FieldLogger, notifiers []notifier.Notifier, manager *plugin.Manager, cfg *config.Config) *Dispatcher { return &Dispatcher{ log: log, notifiers: notifiers, manager: manager, - sources: sources, + cfg: cfg, } } // Start starts all sources and dispatch received events. func (d *Dispatcher) Start(ctx context.Context) error { - for _, name := range d.sources { - sourceClient, err := d.manager.GetSource(name) - if err != nil { - return fmt.Errorf("while getting source client for %s: %w", name, err) + // startProcesses => ['source-name1;source-name2'] + startProcesses := map[string]struct{}{} + + startPlugin := func(bindSources []string) error { + key := strings.Join(bindSources, ";") + + _, found := startProcesses[key] + if found { + return nil // such configuration was already started } - out, err := sourceClient.Stream(ctx) - if err != nil { - return fmt.Errorf("while opening stream for %s: %w", name, err) + + // sourcePluginConfigs => ['botkube/kubernetes@v1.0.0']->[]{"cfg1", "cfg"} + sourcePluginConfigs := map[string][]any{} + for _, sourceCfgGroupName := range bindSources { + plugins := d.cfg.Sources[sourceCfgGroupName].Plugins + for pluginName, pluginCfg := range plugins { + if !pluginCfg.Enabled { + continue + } + sourcePluginConfigs[pluginName] = append(sourcePluginConfigs[pluginName], pluginCfg.Config) + } } - go func() { - for { - select { - case event := <-out.Output: - d.dispatch(ctx, event) - case <-ctx.Done(): - return + for pluginName, configs := range sourcePluginConfigs { + err := d.start(ctx, pluginName, configs, bindSources) + if err != nil { + return fmt.Errorf("while starting plugin source %s: %w", pluginName, err) + } + } + return nil + } + + for _, commGroupCfg := range d.cfg.Communications { + if commGroupCfg.SocketSlack.Enabled { + channels := commGroupCfg.SocketSlack.Channels + + for _, channel := range channels { + bindSources := channel.Bindings.Sources + if err := startPlugin(bindSources); err != nil { + return err } } - }() + } + } + + return nil +} + +// Once we will have the gRPC contract established with proper Cloud Event schema, we should move also this logic here: +// https://github.com/kubeshop/botkube/blob/525c737956ff820a09321879284037da8bf5d647/pkg/controller/controller.go#L200-L253 +func (d *Dispatcher) start(ctx context.Context, pluginName string, pluginConfigs []any, sources []string) error { + sourceClient, err := d.manager.GetSource(pluginName) + if err != nil { + return fmt.Errorf("while getting source client for %s: %w", pluginName, err) } + // TODO(configure plugin): pass the `pluginConfigs` + _ = pluginConfigs + out, err := sourceClient.Stream(ctx) + if err != nil { + return fmt.Errorf("while opening stream for %s: %w", pluginName, err) + } + + go func() { + for { + select { + case event := <-out.Output: + d.dispatch(ctx, event, sources) + case <-ctx.Done(): + return + } + } + }() return nil } -func (d *Dispatcher) dispatch(ctx context.Context, event []byte) { +type genericMessage struct { + response interactive.Message +} + +// ForBot returns message prepared for a bot with a given name. +func (g *genericMessage) ForBot(string) interactive.Message { + return g.response +} + +func (d *Dispatcher) dispatch(ctx context.Context, event []byte, sources []string) { for _, n := range d.notifiers { go func(n notifier.Notifier) { - err := n.SendMessageToAll(ctx, interactive.Message{ + msg := interactive.Message{ Base: interactive.Base{ - Header: string(event), + Description: string(event), }, - }) + } + err := n.SendGenericMessage(ctx, &genericMessage{response: msg}, sources) if err != nil { d.log.Errorf("while sending event: %s", err.Error()) } diff --git a/internal/source/testdata/config.yaml b/internal/source/testdata/config.yaml new file mode 100644 index 000000000..eb2d73dfc --- /dev/null +++ b/internal/source/testdata/config.yaml @@ -0,0 +1,46 @@ +sources: + 'keptn-us-east-2': + botkube/keptn@v1.0.0: + enabled: true + config: + url: 'keptn-us-east-2.local' + 'keptn-eu-central-1': + botkube/keptn@v1.0.0: + enabled: true + config: + url: 'keptn-eu-central-1.local' + +communications: + default-group: + socketSlack: + enabled: true + channels: + all: # proces1 - should use the same process as #random channel as sources order is the same + name: all + bindings: + sources: + - 'keptn-us-east-2' + - 'keptn-eu-central-1' + random: # proces1 -should use the same process as #all channel as sources order is the same + name: random + bindings: + sources: # get events from 2 regions + - 'keptn-us-east-2' + - 'keptn-eu-central-1' + general: # proces2 - should use different process than #all and #random channels as sources order is different + name: general + bindings: + sources: # get events from 2 regions + - 'keptn-eu-central-1' + - 'keptn-us-east-2' + + eu: # proces3 - should use different process as it has only once source + name: eu + bindings: + sources: # get even only from eu + - 'keptn-eu-central-1' + us: # proces4 - should use different process as it has only once source + name: us + bindings: + sources: # get even only from us + - 'keptn-us-east-2' diff --git a/test/e2e/bots_test.go b/test/e2e/bots_test.go index 4f22658d1..da415cdc1 100644 --- a/test/e2e/bots_test.go +++ b/test/e2e/bots_test.go @@ -22,7 +22,7 @@ import ( "github.com/kubeshop/botkube/pkg/bot/interactive" "github.com/kubeshop/botkube/pkg/config" "github.com/kubeshop/botkube/pkg/filterengine/filters" - "github.com/kubeshop/botkube/test/e2e/fake" + "github.com/kubeshop/botkube/test/fake" ) type Config struct { @@ -240,14 +240,43 @@ func runBotTest(t *testing.T, assert.NoError(t, err) }) - t.Run("Plugin execution", func(t *testing.T) { - command := "echo test" - expectedBody := codeBlock(command) - expectedMessage := fmt.Sprintf("%s\n%s", cmdHeader(command), expectedBody) + // Those are a temporary tests. When we will extract kubectl and kubernetes as plugins + // they won't be needed anymore. + t.Run("Botkube Plugins", func(t *testing.T) { + t.Run("Echo Executor", func(t *testing.T) { + command := "echo test" + expectedBody := codeBlock(command) + expectedMessage := fmt.Sprintf("%s\n%s", cmdHeader(command), expectedBody) - botDriver.PostMessageToBot(t, botDriver.Channel().Identifier(), command) - err := botDriver.WaitForLastMessageEqual(botDriver.BotUserID(), botDriver.Channel().ID(), expectedMessage) - assert.NoError(t, err) + botDriver.PostMessageToBot(t, botDriver.Channel().Identifier(), command) + err := botDriver.WaitForLastMessageEqual(botDriver.BotUserID(), botDriver.Channel().ID(), expectedMessage) + assert.NoError(t, err) + }) + t.Run("ConfigMap watcher source", func(t *testing.T) { + t.Log("Creating sample ConfigMap...") + cfgMap := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cm-watcher-trigger", + Namespace: appCfg.Deployment.Namespace, + // for now, it allows us to disable the built-in kubernetes source and make sure that + // only the plugged one will respond + Annotations: map[string]string{ + "botkube.io/disable": "true", + }, + }, + } + + cfgMapCli := k8sCli.CoreV1().ConfigMaps(appCfg.Deployment.Namespace) + cfgMap, err = cfgMapCli.Create(context.Background(), cfgMap, metav1.CreateOptions{}) + require.NoError(t, err) + + t.Cleanup(func() { cleanupCreatedCfgMapIfShould(t, cfgMapCli, cfgMap.Name, nil) }) + + t.Log("Expecting bot message channel...") + expectedMsg := fmt.Sprintf("Detected `ADDED` event on `%s/%s`", cfgMap.Namespace, cfgMap.Name) + err = botDriver.WaitForLastMessageEqual(botDriver.BotUserID(), botDriver.Channel().ID(), expectedMsg) + require.NoError(t, err) + }) }) t.Run("Commands list", func(t *testing.T) { diff --git a/test/e2e/fake/plugin_server.go b/test/fake/plugin_server.go similarity index 94% rename from test/e2e/fake/plugin_server.go rename to test/fake/plugin_server.go index 435293301..8d2ce0d2f 100644 --- a/test/e2e/fake/plugin_server.go +++ b/test/fake/plugin_server.go @@ -20,16 +20,20 @@ const ( ) type ( + // PluginConfig holds configuration for fake plugin server. PluginConfig struct { BinariesDirectory string Server PluginServer } + + // PluginServer holds configuration for HTTP plugin server. PluginServer struct { Host string `envconfig:"default=http://host.k3d.internal"` Port int `envconfig:"default=3000"` } ) +// NewPluginServer return function to start the fake plugin HTTP server. func NewPluginServer(cfg PluginConfig) (string, func() error) { fs := http.FileServer(http.Dir(cfg.BinariesDirectory)) http.Handle("/static/", http.StripPrefix("/static/", fs)) diff --git a/test/helpers/plugin_server.go b/test/helpers/plugin_server.go index 36cf69437..510c60eb4 100644 --- a/test/helpers/plugin_server.go +++ b/test/helpers/plugin_server.go @@ -5,7 +5,7 @@ import ( "os" "path/filepath" - "github.com/kubeshop/botkube/test/e2e/fake" + "github.com/kubeshop/botkube/test/fake" ) func main() { From fb20fdc204c395ff0da6c5d83932bb364e9493df Mon Sep 17 00:00:00 2001 From: Mateusz Szostok Date: Wed, 30 Nov 2022 14:04:30 +0100 Subject: [PATCH 4/5] Add unit-test for source dispatcher --- CONTRIBUTING.md | 3 + cmd/botkube/main.go | 1 - cmd/source/cm-watcher/main.go | 2 +- internal/plugin/store.go | 1 - internal/source/source.go | 83 +++++++++++++++---- internal/source/source_test.go | 65 +++++++++++++++ .../config.yaml | 2 + test/helpers/plugin_server.go | 7 +- 8 files changed, 143 insertions(+), 21 deletions(-) create mode 100644 internal/source/source_test.go rename internal/source/testdata/{ => TestStartingUniqueProcesses}/config.yaml (96%) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 7ee930b36..1f05af89a 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -145,6 +145,9 @@ For faster development, you can also build and run Botkube outside K8s cluster. go run test/helpers/plugin_server.go ``` + > **Note** + > If Botkube runs inside the k3d cluster, export the `PLUGIN_SERVER_HOST=http://host.k3d.internal` environment variable. + 2. Export Botkube plugins cache directory: ```bash diff --git a/cmd/botkube/main.go b/cmd/botkube/main.go index 704e0f181..eb8e7991e 100644 --- a/cmd/botkube/main.go +++ b/cmd/botkube/main.go @@ -322,7 +322,6 @@ func run() error { actionProvider := action.NewProvider(logger.WithField(componentLogFieldKey, "Action Provider"), conf.Actions, executorFactory) router.AddEnabledActionBindings(conf.Actions) - // TODO: temporary dispatcher for events sourcePluginDispatcher := source.NewDispatcher(logger, notifiers, pluginManager, conf) err = sourcePluginDispatcher.Start(ctx) if err != nil { diff --git a/cmd/source/cm-watcher/main.go b/cmd/source/cm-watcher/main.go index c1e6861a7..4b8db6d49 100644 --- a/cmd/source/cm-watcher/main.go +++ b/cmd/source/cm-watcher/main.go @@ -40,7 +40,7 @@ type CMWatcher struct{} // Stream returns a given command as response. func (CMWatcher) Stream(ctx context.Context) (source.StreamOutput, error) { - // TODO: in request we should receive the executor configuration. + // TODO: in request we should receive the source configurations. cfg := Config{ ConfigMap: Object{ Name: "cm-watcher-trigger", diff --git a/internal/plugin/store.go b/internal/plugin/store.go index 679a8283b..dd1177bdd 100644 --- a/internal/plugin/store.go +++ b/internal/plugin/store.go @@ -50,7 +50,6 @@ func newStoreRepositories(indexes map[string][]byte) (storeRepository, storeRepo for repo, data := range indexes { var index Index if err := yaml.Unmarshal(data, &index); err != nil { - fmt.Println(string(data)) return nil, nil, fmt.Errorf("while unmarshaling index: %w", err) } diff --git a/internal/source/source.go b/internal/source/source.go index 4efb95654..c8b7dbbf4 100644 --- a/internal/source/source.go +++ b/internal/source/source.go @@ -19,21 +19,33 @@ type Dispatcher struct { notifiers []notifier.Notifier manager *plugin.Manager cfg *config.Config + starter func(ctx context.Context, pluginName string, pluginConfigs []any, sources []string) error } // NewDispatcher create a new Dispatcher instance. func NewDispatcher(log logrus.FieldLogger, notifiers []notifier.Notifier, manager *plugin.Manager, cfg *config.Config) *Dispatcher { - return &Dispatcher{ + d := &Dispatcher{ log: log, notifiers: notifiers, manager: manager, cfg: cfg, } + + // TODO: it allows us to mock it for unit-test + // will be removed once we will update the gRPC contract on source + // and passing list of configuration. + d.starter = d.start + + return d } // Start starts all sources and dispatch received events. func (d *Dispatcher) Start(ctx context.Context) error { - // startProcesses => ['source-name1;source-name2'] + // startProcesses holds information about started unique plugin processes + // We start a new plugin process each time we see a new order of source bindings. + // We do that because we pass the array of configs to each `Stream` method and + // the merging strategy for configs can depend on the order. + // As a result our key is e.g. ['source-name1;source-name2'] startProcesses := map[string]struct{}{} startPlugin := func(bindSources []string) error { @@ -43,8 +55,10 @@ func (d *Dispatcher) Start(ctx context.Context) error { if found { return nil // such configuration was already started } + startProcesses[key] = struct{}{} - // sourcePluginConfigs => ['botkube/kubernetes@v1.0.0']->[]{"cfg1", "cfg"} + // Holds the array of configs for a given plugin. + // For example, ['botkube/kubernetes@v1.0.0']->[]{"cfg1", "cfg2"} sourcePluginConfigs := map[string][]any{} for _, sourceCfgGroupName := range bindSources { plugins := d.cfg.Sources[sourceCfgGroupName].Plugins @@ -57,7 +71,7 @@ func (d *Dispatcher) Start(ctx context.Context) error { } for pluginName, configs := range sourcePluginConfigs { - err := d.start(ctx, pluginName, configs, bindSources) + err := d.starter(ctx, pluginName, configs, bindSources) if err != nil { return fmt.Errorf("while starting plugin source %s: %w", pluginName, err) } @@ -66,12 +80,39 @@ func (d *Dispatcher) Start(ctx context.Context) error { } for _, commGroupCfg := range d.cfg.Communications { + if commGroupCfg.Slack.Enabled { + for _, channel := range commGroupCfg.Slack.Channels { + if err := startPlugin(channel.Bindings.Sources); err != nil { + return err + } + } + } + if commGroupCfg.SocketSlack.Enabled { - channels := commGroupCfg.SocketSlack.Channels + for _, channel := range commGroupCfg.SocketSlack.Channels { + if err := startPlugin(channel.Bindings.Sources); err != nil { + return err + } + } + } + + if commGroupCfg.Mattermost.Enabled { + for _, channel := range commGroupCfg.Mattermost.Channels { + if err := startPlugin(channel.Bindings.Sources); err != nil { + return err + } + } + } + + if commGroupCfg.Teams.Enabled { + if err := startPlugin(commGroupCfg.Teams.Bindings.Sources); err != nil { + return err + } + } - for _, channel := range channels { - bindSources := channel.Bindings.Sources - if err := startPlugin(bindSources); err != nil { + if commGroupCfg.Discord.Enabled { + for _, channel := range commGroupCfg.Discord.Channels { + if err := startPlugin(channel.Bindings.Sources); err != nil { return err } } @@ -84,6 +125,13 @@ func (d *Dispatcher) Start(ctx context.Context) error { // Once we will have the gRPC contract established with proper Cloud Event schema, we should move also this logic here: // https://github.com/kubeshop/botkube/blob/525c737956ff820a09321879284037da8bf5d647/pkg/controller/controller.go#L200-L253 func (d *Dispatcher) start(ctx context.Context, pluginName string, pluginConfigs []any, sources []string) error { + log := d.log.WithFields(logrus.Fields{ + "pluginName": pluginName, + "sources": sources, + }) + + log.Info("Staring source streaming...") + sourceClient, err := d.manager.GetSource(pluginName) if err != nil { return fmt.Errorf("while getting source client for %s: %w", pluginName, err) @@ -100,6 +148,7 @@ func (d *Dispatcher) start(ctx context.Context, pluginName string, pluginConfigs for { select { case event := <-out.Output: + log.WithField("event", string(event)).Debug("Dispatching received event...") d.dispatch(ctx, event, sources) case <-ctx.Done(): return @@ -109,15 +158,6 @@ func (d *Dispatcher) start(ctx context.Context, pluginName string, pluginConfigs return nil } -type genericMessage struct { - response interactive.Message -} - -// ForBot returns message prepared for a bot with a given name. -func (g *genericMessage) ForBot(string) interactive.Message { - return g.response -} - func (d *Dispatcher) dispatch(ctx context.Context, event []byte, sources []string) { for _, n := range d.notifiers { go func(n notifier.Notifier) { @@ -133,3 +173,12 @@ func (d *Dispatcher) dispatch(ctx context.Context, event []byte, sources []strin }(n) } } + +type genericMessage struct { + response interactive.Message +} + +// ForBot returns interactive message. +func (g *genericMessage) ForBot(string) interactive.Message { + return g.response +} diff --git a/internal/source/source_test.go b/internal/source/source_test.go new file mode 100644 index 000000000..e7638b0c3 --- /dev/null +++ b/internal/source/source_test.go @@ -0,0 +1,65 @@ +package source + +import ( + "context" + "path/filepath" + "strings" + "testing" + + logtest "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/kubeshop/botkube/pkg/config" +) + +func TestStartingUniqueProcesses(t *testing.T) { + // given + givenCfg, _, err := config.LoadWithDefaults(func() []string { + return []string{ + testdataFile(t, "config.yaml"), + } + }) + require.NoError(t, err) + + logger, _ := logtest.NewNullLogger() + + expectedProcesses := map[string]struct{}{ + "botkube/keptn@v1.0.0; keptn-us-east-2; keptn-eu-central-1": {}, + "botkube/keptn@v1.0.0; keptn-eu-central-1; keptn-us-east-2": {}, + "botkube/keptn@v1.0.0; keptn-eu-central-1": {}, + "botkube/keptn@v1.0.0; keptn-us-east-2": {}, + } + + assertStarter := func(ctx context.Context, pluginName string, pluginConfigs []any, sources []string) error { + // then configs are specified in a proper order + var expConfigs []any + for _, sourceName := range sources { + expConfigs = append(expConfigs, givenCfg.Sources[sourceName].Plugins[pluginName].Config) + } + assert.Equal(t, expConfigs, pluginConfigs) + + // then only unique process are started + key := []string{pluginName} + key = append(key, sources...) + processKey := strings.Join(key, "; ") + _, found := expectedProcesses[processKey] + if !found { + t.Errorf("starting unwanted process for %s with sources %v", pluginName, sources) + } + delete(expectedProcesses, processKey) + return nil + } + + // when + dispatcher := NewDispatcher(logger, nil, nil, givenCfg) + dispatcher.starter = assertStarter + + err = dispatcher.Start(context.Background()) + require.NoError(t, err) +} + +func testdataFile(t *testing.T, name string) string { + t.Helper() + return filepath.Join("testdata", t.Name(), name) +} diff --git a/internal/source/testdata/config.yaml b/internal/source/testdata/TestStartingUniqueProcesses/config.yaml similarity index 96% rename from internal/source/testdata/config.yaml rename to internal/source/testdata/TestStartingUniqueProcesses/config.yaml index eb2d73dfc..bff721ab1 100644 --- a/internal/source/testdata/config.yaml +++ b/internal/source/testdata/TestStartingUniqueProcesses/config.yaml @@ -14,6 +14,8 @@ communications: default-group: socketSlack: enabled: true + appToken: "xapp-testing" + botToken: "xoxb-testing" channels: all: # proces1 - should use the same process as #random channel as sources order is the same name: all diff --git a/test/helpers/plugin_server.go b/test/helpers/plugin_server.go index 510c60eb4..ae929b71d 100644 --- a/test/helpers/plugin_server.go +++ b/test/helpers/plugin_server.go @@ -12,11 +12,16 @@ func main() { dir, err := os.Getwd() exitOnErr(err) + host := os.Getenv("PLUGIN_SERVER_HOST") + if host == "" { + host = "http://localhost" + } + binDir := filepath.Join(dir, "dist") indexEndpoint, startServerFn := fake.NewPluginServer(fake.PluginConfig{ BinariesDirectory: binDir, Server: fake.PluginServer{ - Host: "http://localhost", + Host: host, Port: 3000, }, }) From ccbe93a09fe3b0e28db49350d94e834cacf7f152 Mon Sep 17 00:00:00 2001 From: Mateusz Szostok Date: Wed, 30 Nov 2022 17:12:13 +0100 Subject: [PATCH 5/5] Apply suggestions after review --- internal/source/source.go | 97 ++++++++++++++++++++------------------- 1 file changed, 50 insertions(+), 47 deletions(-) diff --git a/internal/source/source.go b/internal/source/source.go index c8b7dbbf4..c715d2967 100644 --- a/internal/source/source.go +++ b/internal/source/source.go @@ -20,15 +20,23 @@ type Dispatcher struct { manager *plugin.Manager cfg *config.Config starter func(ctx context.Context, pluginName string, pluginConfigs []any, sources []string) error + + // startedProcesses holds information about started unique plugin processes + // We start a new plugin process each time we see a new order of source bindings. + // We do that because we pass the array of configs to each `Stream` method and + // the merging strategy for configs can depend on the order. + // As a result our key is e.g. ['source-name1;source-name2'] + startedProcesses map[string]struct{} } // NewDispatcher create a new Dispatcher instance. func NewDispatcher(log logrus.FieldLogger, notifiers []notifier.Notifier, manager *plugin.Manager, cfg *config.Config) *Dispatcher { d := &Dispatcher{ - log: log, - notifiers: notifiers, - manager: manager, - cfg: cfg, + log: log, + notifiers: notifiers, + manager: manager, + cfg: cfg, + startedProcesses: map[string]struct{}{}, } // TODO: it allows us to mock it for unit-test @@ -41,48 +49,10 @@ func NewDispatcher(log logrus.FieldLogger, notifiers []notifier.Notifier, manage // Start starts all sources and dispatch received events. func (d *Dispatcher) Start(ctx context.Context) error { - // startProcesses holds information about started unique plugin processes - // We start a new plugin process each time we see a new order of source bindings. - // We do that because we pass the array of configs to each `Stream` method and - // the merging strategy for configs can depend on the order. - // As a result our key is e.g. ['source-name1;source-name2'] - startProcesses := map[string]struct{}{} - - startPlugin := func(bindSources []string) error { - key := strings.Join(bindSources, ";") - - _, found := startProcesses[key] - if found { - return nil // such configuration was already started - } - startProcesses[key] = struct{}{} - - // Holds the array of configs for a given plugin. - // For example, ['botkube/kubernetes@v1.0.0']->[]{"cfg1", "cfg2"} - sourcePluginConfigs := map[string][]any{} - for _, sourceCfgGroupName := range bindSources { - plugins := d.cfg.Sources[sourceCfgGroupName].Plugins - for pluginName, pluginCfg := range plugins { - if !pluginCfg.Enabled { - continue - } - sourcePluginConfigs[pluginName] = append(sourcePluginConfigs[pluginName], pluginCfg.Config) - } - } - - for pluginName, configs := range sourcePluginConfigs { - err := d.starter(ctx, pluginName, configs, bindSources) - if err != nil { - return fmt.Errorf("while starting plugin source %s: %w", pluginName, err) - } - } - return nil - } - for _, commGroupCfg := range d.cfg.Communications { if commGroupCfg.Slack.Enabled { for _, channel := range commGroupCfg.Slack.Channels { - if err := startPlugin(channel.Bindings.Sources); err != nil { + if err := d.startPlugin(ctx, channel.Bindings.Sources); err != nil { return err } } @@ -90,7 +60,7 @@ func (d *Dispatcher) Start(ctx context.Context) error { if commGroupCfg.SocketSlack.Enabled { for _, channel := range commGroupCfg.SocketSlack.Channels { - if err := startPlugin(channel.Bindings.Sources); err != nil { + if err := d.startPlugin(ctx, channel.Bindings.Sources); err != nil { return err } } @@ -98,21 +68,21 @@ func (d *Dispatcher) Start(ctx context.Context) error { if commGroupCfg.Mattermost.Enabled { for _, channel := range commGroupCfg.Mattermost.Channels { - if err := startPlugin(channel.Bindings.Sources); err != nil { + if err := d.startPlugin(ctx, channel.Bindings.Sources); err != nil { return err } } } if commGroupCfg.Teams.Enabled { - if err := startPlugin(commGroupCfg.Teams.Bindings.Sources); err != nil { + if err := d.startPlugin(ctx, commGroupCfg.Teams.Bindings.Sources); err != nil { return err } } if commGroupCfg.Discord.Enabled { for _, channel := range commGroupCfg.Discord.Channels { - if err := startPlugin(channel.Bindings.Sources); err != nil { + if err := d.startPlugin(ctx, channel.Bindings.Sources); err != nil { return err } } @@ -122,6 +92,39 @@ func (d *Dispatcher) Start(ctx context.Context) error { return nil } +func (d *Dispatcher) startPlugin(ctx context.Context, bindSources []string) error { + key := strings.Join(bindSources, ";") + + _, found := d.startedProcesses[key] + if found { + return nil // such configuration was already started + } + + d.startedProcesses[key] = struct{}{} + + // Holds the array of configs for a given plugin. + // For example, ['botkube/kubernetes@v1.0.0']->[]{"cfg1", "cfg2"} + sourcePluginConfigs := map[string][]any{} + for _, sourceCfgGroupName := range bindSources { + plugins := d.cfg.Sources[sourceCfgGroupName].Plugins + for pluginName, pluginCfg := range plugins { + if !pluginCfg.Enabled { + continue + } + sourcePluginConfigs[pluginName] = append(sourcePluginConfigs[pluginName], pluginCfg.Config) + } + } + + for pluginName, configs := range sourcePluginConfigs { + err := d.starter(ctx, pluginName, configs, bindSources) + if err != nil { + return fmt.Errorf("while starting plugin source %s: %w", pluginName, err) + } + } + + return nil +} + // Once we will have the gRPC contract established with proper Cloud Event schema, we should move also this logic here: // https://github.com/kubeshop/botkube/blob/525c737956ff820a09321879284037da8bf5d647/pkg/controller/controller.go#L200-L253 func (d *Dispatcher) start(ctx context.Context, pluginName string, pluginConfigs []any, sources []string) error {