diff --git a/.goreleaser.plugin.yaml b/.goreleaser.plugin.yaml index 1e1afcead..f1159c5db 100644 --- a/.goreleaser.plugin.yaml +++ b/.goreleaser.plugin.yaml @@ -17,3 +17,17 @@ builds: - arm64 goarm: - 7 + - id: cm-watcher + binary: source_cm-watcher_{{ .Os }}_{{ .Arch }} + no_unique_dist_dir: true + main: cmd/source/cm-watcher/main.go + env: + - CGO_ENABLED=0 + goos: + - linux + - darwin + goarch: + - amd64 + - arm64 + goarm: + - 7 diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 05ae4af98..1f05af89a 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -36,7 +36,7 @@ This section describes how to build and run Botkube from source code. This is ideal for running Botkube on a local cluster, e.g. using [kind](https://kind.sigs.k8s.io) or [`minikube`](https://minikube.sigs.k8s.io/docs/). Remember to set the `IMAGE_PLATFORM` env var to your target architecture. By default, the build targets `linux/amd64`. - + For example, the command below builds the `linux/arm64` target: ```sh @@ -62,23 +62,23 @@ This section describes how to build and run Botkube from source code. docker tag ghcr.io/kubeshop/botkube:v9.99.9-dev-amd64 {your_account}/botkube:v9.99.9-dev docker push {your_account}/botkube:v9.99.9-dev ``` - + Where `{your_account}` is Docker hub or any other registry provider account to which you can push the image. 2. Install Botkube with any of communication platform configured, according to [the installation instructions](https://docs.botkube.io/installation/). During the Helm chart installation step, set the following flags: - + ```sh export IMAGE_REGISTRY="{imageRegistry}" # e.g. docker.io export IMAGE_PULL_POLICY="{pullPolicy}" # e.g. Always or IfNotPresent - + --set image.registry=${IMAGE_REGISTRY} \ --set image.repository={your_account}/botkube \ --set image.tag=v9.99.9-dev \ --set image.pullPolicy=${IMAGE_PULL_POLICY} ``` - + Check [values.yaml](./helm/botkube/values.yaml) for default options. - + ### Build and run locally For faster development, you can also build and run Botkube outside K8s cluster. @@ -130,6 +130,44 @@ For faster development, you can also build and run Botkube outside K8s cluster. ./botkube ``` +#### Develop Botkube plugins + +**Prerequisite** + +- Being able to start the Botkube binary locally. +- [GoReleaser](https://goreleaser.com/install/) + +**Steps** + +1. Start fake plugins server to serve binaries from [`dist`](dist) folder: + + ```bash + go run test/helpers/plugin_server.go + ``` + + > **Note** + > If Botkube runs inside the k3d cluster, export the `PLUGIN_SERVER_HOST=http://host.k3d.internal` environment variable. + +2. Export Botkube plugins cache directory: + + ```bash + export BOTKUBE_PLUGINS_CACHE__DIR="/tmp/plugins" + ``` + +3. In other terminal window, run: + + ```bash + # rebuild plugins only for current GOOS and GOARCH + make build-plugins-single && + # remove cached plugins + rm -rf $BOTKUBE_PLUGINS_CACHE__DIR && + # start botkube to download fresh plugins + ./botkube + ``` + + > **Note** + > Each time you make a change to the [source](cmd/source) or [executors](cmd/executor) plugins re-run the above command. + ## Making A Change - Before making any significant changes, please [open an issue](https://github.com/kubeshop/botkube/issues). Discussing your proposed changes ahead of time will make the contribution process smooth for everyone. diff --git a/Makefile b/Makefile index 9e5fa2244..81df9a7af 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ .DEFAULT_GOAL := build -.PHONY: container-image test test-integration-slack test-integration-discord build pre-build publish lint lint-fix go-import-fmt system-check save-images load-and-push-images gen-grpc-resources build-plugins +.PHONY: container-image test test-integration-slack test-integration-discord build pre-build publish lint lint-fix go-import-fmt system-check save-images load-and-push-images gen-grpc-resources build-plugins build-plugins-single # Show this help. help: @@ -18,21 +18,28 @@ test: system-check @go test -v -race ./... test-integration-slack: system-check - @go test -v -tags=integration -race -count=1 ./test/... -run "TestSlack" + @go test -v -tags=integration -race -count=1 ./test/e2e/... -run "TestSlack" test-integration-discord: system-check - @go test -v -tags=integration -race -count=1 ./test/... -run "TestDiscord" + @go test -v -tags=integration -race -count=1 ./test/e2e/... -run "TestDiscord" # Build the binary build: pre-build @cd cmd/botkube;GOOS_VAL=$(shell go env GOOS) CGO_ENABLED=0 GOARCH_VAL=$(shell go env GOARCH) go build -o $(shell go env GOPATH)/bin/botkube @echo "Build completed successfully" +# Build Botkube official plugins for all supported platforms. build-plugins: pre-build @echo "Building plugins binaries" @./hack/goreleaser.sh build_plugins @echo "Build completed successfully" +# Build Botkube official plugins only for current GOOS and GOARCH. +build-plugins-single: pre-build + @echo "Building single target plugins binaries" + @./hack/goreleaser.sh build_plugins_single + @echo "Build completed successfully" + # Build the image container-image: pre-build @echo "Building docker image" diff --git a/cmd/botkube/main.go b/cmd/botkube/main.go index 19facbd24..eb8e7991e 100644 --- a/cmd/botkube/main.go +++ b/cmd/botkube/main.go @@ -30,6 +30,7 @@ import ( "github.com/kubeshop/botkube/internal/analytics" "github.com/kubeshop/botkube/internal/lifecycle" "github.com/kubeshop/botkube/internal/plugin" + "github.com/kubeshop/botkube/internal/source" "github.com/kubeshop/botkube/internal/storage" "github.com/kubeshop/botkube/pkg/action" "github.com/kubeshop/botkube/pkg/bot" @@ -43,7 +44,6 @@ import ( "github.com/kubeshop/botkube/pkg/notifier" "github.com/kubeshop/botkube/pkg/recommendation" "github.com/kubeshop/botkube/pkg/sink" - "github.com/kubeshop/botkube/pkg/source" ) const ( @@ -100,8 +100,8 @@ func run() error { errGroup, ctx := errgroup.WithContext(ctx) collector := plugin.NewCollector(logger) - enabledPluginExecutors := collector.GetAllEnabledAndUsedPlugins(conf) - pluginManager := plugin.NewManager(logger, conf.Plugins, enabledPluginExecutors) + enabledPluginExecutors, enabledPluginSources := collector.GetAllEnabledAndUsedPlugins(conf) + pluginManager := plugin.NewManager(logger, conf.Plugins, enabledPluginExecutors, enabledPluginSources) err = pluginManager.Start(ctx) if err != nil { @@ -322,6 +322,12 @@ func run() error { actionProvider := action.NewProvider(logger.WithField(componentLogFieldKey, "Action Provider"), conf.Actions, executorFactory) router.AddEnabledActionBindings(conf.Actions) + sourcePluginDispatcher := source.NewDispatcher(logger, notifiers, pluginManager, conf) + err = sourcePluginDispatcher.Start(ctx) + if err != nil { + return fmt.Errorf("while starting source plugin event dispatcher: %w", err) + } + // Create and start controller ctrl := controller.New( logger.WithField(componentLogFieldKey, "Controller"), diff --git a/cmd/executor/echo/README.md b/cmd/executor/echo/README.md index 02b215bc7..96ee1edea 100644 --- a/cmd/executor/echo/README.md +++ b/cmd/executor/echo/README.md @@ -4,7 +4,7 @@ Echo is the example Botkube executor used during [e2e tests](../../../test/e2e). ## Configuration parameters -The Echo configuration should be specified in YAML format. It accepts such parameters: +The configuration should be specified in YAML format. Such parameters are supported: ```yaml changeResponseToUpperCase: true # default is 'false'. diff --git a/cmd/source/cm-watcher/README.md b/cmd/source/cm-watcher/README.md new file mode 100644 index 000000000..035a55c35 --- /dev/null +++ b/cmd/source/cm-watcher/README.md @@ -0,0 +1,13 @@ +# ConfigMap watcher source + +ConfigMap watcher source is the example Botkube source used during [e2e tests](../../../test/e2e). + +## Configuration parameters + +The configuration should be specified in YAML format. Such parameters are supported: + +```yaml +configMap: + name: cm-map-watcher # config map name to react to + namespace: botkube # config map namespace +``` diff --git a/cmd/source/cm-watcher/main.go b/cmd/source/cm-watcher/main.go new file mode 100644 index 000000000..4b8db6d49 --- /dev/null +++ b/cmd/source/cm-watcher/main.go @@ -0,0 +1,106 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + + "github.com/hashicorp/go-plugin" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/clientcmd" + watchtools "k8s.io/client-go/tools/watch" + + "github.com/kubeshop/botkube/pkg/api/source" +) + +const pluginName = "cm-watcher" + +type ( + // Config holds executor configuration. + Config struct { + ConfigMap Object + } + // Object holds information about object to watch. + Object struct { + Name string `yaml:"name"` + Namespace string `yaml:"namespace"` + Event watch.EventType `yaml:"event"` + } +) + +// CMWatcher implements Botkube source plugin. +type CMWatcher struct{} + +// Stream returns a given command as response. +func (CMWatcher) Stream(ctx context.Context) (source.StreamOutput, error) { + // TODO: in request we should receive the source configurations. + cfg := Config{ + ConfigMap: Object{ + Name: "cm-watcher-trigger", + Namespace: "botkube", + Event: "ADDED", + }, + } + + out := source.StreamOutput{ + Output: make(chan []byte), + } + + go listenEvents(ctx, cfg.ConfigMap, out.Output) + + return out, nil +} + +func listenEvents(ctx context.Context, obj Object, sink chan<- []byte) { + config, err := clientcmd.BuildConfigFromFlags("", os.Getenv("KUBECONFIG")) + exitOnError(err) + clientset, err := kubernetes.NewForConfig(config) + exitOnError(err) + + fieldSelector := fields.OneTermEqualSelector("metadata.name", obj.Name).String() + lw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.FieldSelector = fieldSelector + return clientset.CoreV1().ConfigMaps(obj.Namespace).List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.FieldSelector = fieldSelector + return clientset.CoreV1().ConfigMaps(obj.Namespace).Watch(ctx, options) + }, + } + + infiniteWatch := func(event watch.Event) (bool, error) { + if event.Type == obj.Event { + cm := event.Object.(*corev1.ConfigMap) + msg := fmt.Sprintf("Detected `%s` event on `%s/%s`", obj.Event, cm.Namespace, cm.Name) + sink <- []byte(msg) + } + + // always continue - context will cancel this watch for us :) + return false, nil + } + + _, err = watchtools.UntilWithSync(ctx, lw, &corev1.ConfigMap{}, nil, infiniteWatch) + exitOnError(err) +} + +func main() { + source.Serve(map[string]plugin.Plugin{ + pluginName: &source.Plugin{ + Source: &CMWatcher{}, + }, + }) +} + +func exitOnError(err error) { + if err != nil { + log.Fatal(err) + } +} diff --git a/hack/goreleaser.sh b/hack/goreleaser.sh index 2e0a5156e..0268afcc2 100755 --- a/hack/goreleaser.sh +++ b/hack/goreleaser.sh @@ -96,6 +96,10 @@ build_plugins() { goreleaser build -f .goreleaser.plugin.yaml --rm-dist --snapshot } +build_plugins_single() { + goreleaser build -f .goreleaser.plugin.yaml --rm-dist --snapshot --single-target +} + build_single() { export IMAGE_TAG=v9.99.9-dev docker run --rm --privileged \ @@ -127,6 +131,9 @@ case "${1}" in build_plugins) build_plugins ;; + build_plugins_single) + build_plugins_single + ;; build_single) build_single ;; diff --git a/helm/botkube/e2e-test-values.yaml b/helm/botkube/e2e-test-values.yaml index 6ac8e70e0..f9beaf00a 100644 --- a/helm/botkube/e2e-test-values.yaml +++ b/helm/botkube/e2e-test-values.yaml @@ -20,6 +20,7 @@ communications: - k8s-events - k8s-annotated-cm-delete - k8s-pod-create-events + - plugin-based 'secondary': name: "" # Tests will override this temporarily notification: @@ -47,6 +48,7 @@ communications: - k8s-events - k8s-annotated-cm-delete - k8s-pod-create-events + - plugin-based 'secondary': id: "" # Tests will override this channel ID temporarily notification: @@ -120,6 +122,13 @@ sources: event: # overrides top level `event` entry types: - update + + 'plugin-based': + botkube/cm-watcher: + enabled: true + config: + configMapName: cm-watcher-trigger + executors: 'kubectl-read-only': kubectl: diff --git a/internal/plugin/collector.go b/internal/plugin/collector.go index f4f388d6b..4ac63aeef 100644 --- a/internal/plugin/collector.go +++ b/internal/plugin/collector.go @@ -18,15 +18,21 @@ func NewCollector(log logrus.FieldLogger) *Collector { // GetAllEnabledAndUsedPlugins returns the list of all plugins that are both enabled and bind to at // least one communicator that is also enabled. -func (c *Collector) GetAllEnabledAndUsedPlugins(cfg *config.Config) []string { - // Collect all used executor - bindExecutors := map[string]struct{}{} +func (c *Collector) GetAllEnabledAndUsedPlugins(cfg *config.Config) ([]string, []string) { + // Collect all used executor/sources + var ( + bindExecutors = map[string]struct{}{} + bindSources = map[string]struct{}{} + ) collect := func(channels config.IdentifiableMap[config.ChannelBindingsByName]) { for _, bindings := range channels { for _, name := range bindings.Bindings.Executors { bindExecutors[name] = struct{}{} } + for _, name := range bindings.Bindings.Sources { + bindSources[name] = struct{}{} + } } } @@ -47,6 +53,9 @@ func (c *Collector) GetAllEnabledAndUsedPlugins(cfg *config.Config) []string { for _, name := range commGroupCfg.Teams.Bindings.Executors { bindExecutors[name] = struct{}{} } + for _, name := range commGroupCfg.Teams.Bindings.Sources { + bindSources[name] = struct{}{} + } } if commGroupCfg.Discord.Enabled { @@ -54,6 +63,9 @@ func (c *Collector) GetAllEnabledAndUsedPlugins(cfg *config.Config) []string { for _, name := range bindings.Bindings.Executors { bindExecutors[name] = struct{}{} } + for _, name := range bindings.Bindings.Sources { + bindSources[name] = struct{}{} + } } } } @@ -82,5 +94,20 @@ func (c *Collector) GetAllEnabledAndUsedPlugins(cfg *config.Config) []string { } } - return usedExecutorPlugins + // Collect all sources that are both enabled and bind to at least one communicator that is enabled. + var usedSourcePlugins []string + for groupName, groupItems := range cfg.Sources { + for name, source := range groupItems.Plugins { + if !source.Enabled { + continue + } + _, found := bindSources[groupName] + if !found { + continue + } + + usedSourcePlugins = append(usedSourcePlugins, name) + } + } + return usedExecutorPlugins, usedSourcePlugins } diff --git a/internal/plugin/manager.go b/internal/plugin/manager.go index e7d65bbeb..e2120ead2 100644 --- a/internal/plugin/manager.go +++ b/internal/plugin/manager.go @@ -18,12 +18,14 @@ import ( "github.com/kubeshop/botkube/pkg/api" "github.com/kubeshop/botkube/pkg/api/executor" + "github.com/kubeshop/botkube/pkg/api/source" "github.com/kubeshop/botkube/pkg/config" "github.com/kubeshop/botkube/pkg/multierror" ) const ( executorPluginName = "executor" + sourcePluginName = "source" dirPerms = 0o775 binPerms = 0o755 filePerms = 0o664 @@ -33,8 +35,7 @@ const ( // This map is used in order to identify a plugin called Dispense. // This map is globally available and must stay consistent in order for all the plugins to work. var pluginMap = map[string]plugin.Plugin{ - // TODO(plugin-sources): add me: - //sourcePluginName: &source.Plugin{}, + sourcePluginName: &source.Plugin{}, executorPluginName: &executor.Plugin{}, } @@ -47,28 +48,34 @@ type Manager struct { executorsToEnable []string executorsStore store[executor.Executor] + + sourcesStore store[source.Source] + sourcesToEnable []string } // NewManager returns a new Manager instance. -func NewManager(logger logrus.FieldLogger, cfg config.Plugins, executors []string) *Manager { +func NewManager(logger logrus.FieldLogger, cfg config.Plugins, executors, sources []string) *Manager { return &Manager{ cfg: cfg, httpClient: newHTTPClient(), executorsToEnable: executors, executorsStore: newStore[executor.Executor](), + sourcesToEnable: sources, + sourcesStore: newStore[source.Source](), log: logger.WithField("component", "Plugin Manager"), } } // Start downloads and starts all enabled plugins. func (m *Manager) Start(ctx context.Context) error { - if len(m.executorsToEnable) == 0 { + if len(m.executorsToEnable) == 0 && len(m.sourcesToEnable) == 0 { m.log.Info("No external plugins are enabled.") return nil } m.log.WithFields(logrus.Fields{ "enabledExecutors": strings.Join(m.executorsToEnable, ","), + "enabledSources": strings.Join(m.sourcesToEnable, ","), }).Info("Starting Plugin Manager for all enabled plugins") err := m.start(ctx, false) @@ -100,6 +107,17 @@ func (m *Manager) start(ctx context.Context, forceUpdate bool) error { return fmt.Errorf("while creating executor plugins: %w", err) } m.executorsStore.EnabledPlugins = executorClients + + sourcesPlugins, err := m.loadPlugins(ctx, sourcePluginName, m.sourcesToEnable, m.sourcesStore.Repository) + if err != nil { + return err + } + sourcesClients, err := createGRPCClients[source.Source](sourcesPlugins, sourcePluginName) + if err != nil { + return fmt.Errorf("while creating source plugins: %w", err) + } + m.sourcesStore.EnabledPlugins = sourcesClients + return nil } @@ -111,7 +129,21 @@ func (m *Manager) GetExecutor(name string) (executor.Executor, error) { client, found := m.executorsStore.EnabledPlugins[name] if !found || client.Client == nil { - return nil, fmt.Errorf("client for plugin %q not found", name) + return nil, fmt.Errorf("client for executor plugin %q not found", name) + } + + return client.Client, nil +} + +// GetSource returns the source client for a given plugin. +func (m *Manager) GetSource(name string) (source.Source, error) { + if !m.isStarted.Load() { + return nil, ErrNotStartedPluginManager + } + + client, found := m.sourcesStore.EnabledPlugins[name] + if !found || client.Client == nil { + return nil, fmt.Errorf("client for source plugin %q not found", name) } return client.Client, nil @@ -121,7 +153,13 @@ func (m *Manager) GetExecutor(name string) (executor.Executor, error) { // This method blocks until all cleanup is finished. func (m *Manager) Shutdown() { var wg sync.WaitGroup - for _, p := range m.executorsStore.EnabledPlugins { + releasePlugins(&wg, m.sourcesStore.EnabledPlugins) + releasePlugins(&wg, m.executorsStore.EnabledPlugins) + wg.Wait() +} + +func releasePlugins[T any](wg *sync.WaitGroup, enabledPlugins storePlugins[T]) { + for _, p := range enabledPlugins { wg.Add(1) go func(close func()) { @@ -131,7 +169,6 @@ func (m *Manager) Shutdown() { wg.Done() }(p.Cleanup) } - wg.Wait() } func (m *Manager) loadPlugins(ctx context.Context, pluginType string, pluginsToEnable []string, repo storeRepository) (map[string]string, error) { @@ -205,11 +242,12 @@ func (m *Manager) loadRepositoriesMetadata(ctx context.Context, forceUpdate bool rawIndexes[repo] = data } - executorsRepos, err := newStoreRepository(rawIndexes) + executorsRepos, sourcesRepos, err := newStoreRepositories(rawIndexes) if err != nil { - return fmt.Errorf("while building executors repository store: %w", err) + return fmt.Errorf("while building repositories store: %w", err) } m.executorsStore.Repository = executorsRepos + m.sourcesStore.Repository = sourcesRepos return nil } diff --git a/internal/plugin/store.go b/internal/plugin/store.go index fec05d470..dd1177bdd 100644 --- a/internal/plugin/store.go +++ b/internal/plugin/store.go @@ -41,42 +41,52 @@ func newStore[T any]() store[T] { } } -func newStoreRepository(indexes map[string][]byte) (storeRepository, error) { - executorsRepository := storeRepository{} +func newStoreRepositories(indexes map[string][]byte) (storeRepository, storeRepository, error) { + var ( + executorsRepositories = storeRepository{} + sourcesRepositories = storeRepository{} + ) for repo, data := range indexes { var index Index if err := yaml.Unmarshal(data, &index); err != nil { - return nil, fmt.Errorf("while unmarshaling index: %w", err) + return nil, nil, fmt.Errorf("while unmarshaling index: %w", err) } for _, entry := range index.Entries { // omit version, as we want to collect plugins with different version together key, err := BuildPluginKey(repo, entry.Name, "") if err != nil { - return nil, fmt.Errorf("while building key for entry in %s repository: %w", repo, err) + return nil, nil, fmt.Errorf("while building key for entry in %s repository: %w", repo, err) } switch entry.Type { case TypeExecutor: - executorsRepository[key] = append(executorsRepository[key], storeEntry{ + executorsRepositories[key] = append(executorsRepositories[key], storeEntry{ Description: entry.Description, Version: entry.Version, URLs: mapBinaryURLs(entry.URLs), }) case TypeSource: - // TODO(plugin-sources): add me. + sourcesRepositories[key] = append(sourcesRepositories[key], storeEntry{ + Description: entry.Description, + Version: entry.Version, + URLs: mapBinaryURLs(entry.URLs), + }) } } } // sort loaded entries by version - for key := range executorsRepository { - sort.Sort(byIndexEntryVersion(executorsRepository[key])) + for key := range executorsRepositories { + sort.Sort(byIndexEntryVersion(executorsRepositories[key])) + } + + for key := range sourcesRepositories { + sort.Sort(byIndexEntryVersion(sourcesRepositories[key])) } - // TODO(plugin-sources): add sorting - return executorsRepository, nil + return executorsRepositories, sourcesRepositories, nil } func mapBinaryURLs(in []IndexURL) map[string]string { diff --git a/internal/plugin/store_test.go b/internal/plugin/store_test.go index 7f4c65052..e67f0193c 100644 --- a/internal/plugin/store_test.go +++ b/internal/plugin/store_test.go @@ -52,13 +52,50 @@ func TestNewStoreRepository(t *testing.T) { }, }, } + expectedSources := storeRepository{ + "botkube/kubernetes": { + { + Description: "Kubernetes source", + Version: "v1.0.0", + URLs: map[string]string{ + "darwin/amd64": "https://github.com/kubeshop/botkube/releases/download/v0.17.0/darwin_amd64_source_kubernetes", + "darwin/arm64": "https://github.com/kubeshop/botkube/releases/download/v0.17.0/darwin_arm64_source_kubernetes", + "linux/amd64": "https://github.com/kubeshop/botkube/releases/download/v0.17.0/linux-_md64_source_kubernetes", + "linux/arm64": "https://github.com/kubeshop/botkube/releases/download/v0.17.0/linux-_rm64_source_kubernetes", + }, + }, + { + Description: "Kubernetes source", + Version: "0.1.0", // should support also version without `v` + URLs: map[string]string{ + "darwin/amd64": "https://github.com/kubeshop/botkube/releases/download/v0.1.0/darwin_amd64_source_kubernetes", + "darwin/arm64": "https://github.com/kubeshop/botkube/releases/download/v0.1.0/darwin_arm64_source_kubernetes", + "linux/amd64": "https://github.com/kubeshop/botkube/releases/download/v0.1.0/linux-_md64_source_kubernetes", + "linux/arm64": "https://github.com/kubeshop/botkube/releases/download/v0.1.0/linux-_rm64_source_kubernetes", + }, + }, + }, + "mszostok/cm-watcher": { + { + Description: "Source suitable for e2e testing.", + Version: "v1.0.0", + URLs: map[string]string{ + "darwin/amd64": "https://github.com/mszostok/botkube-plugins/releases/download/v1.0.0/darwin_amd64_cmd-watcher", + "darwin/arm64": "https://github.com/mszostok/botkube-plugins/releases/download/v1.0.0/darwin_arm64_cmd-watcher", + "linux/amd64": "https://github.com/mszostok/botkube-plugins/releases/download/v1.0.0/linux-_md64_cmd-watcher", + "linux/arm64": "https://github.com/mszostok/botkube-plugins/releases/download/v1.0.0/linux-_rm64_cmd-watcher", + }, + }, + }, + } // when - executors, err := newStoreRepository(repositories) + executors, sources, err := newStoreRepositories(repositories) // then require.NoError(t, err) assert.Equal(t, executors, expectedExecutors) + assert.Equal(t, sources, expectedSources) } func loadTestdataFile(t *testing.T, name string) []byte { diff --git a/internal/plugin/testdata/TestNewStoreRepository/botkube.yaml b/internal/plugin/testdata/TestNewStoreRepository/botkube.yaml index 8593dc51b..b72314dca 100644 --- a/internal/plugin/testdata/TestNewStoreRepository/botkube.yaml +++ b/internal/plugin/testdata/TestNewStoreRepository/botkube.yaml @@ -66,3 +66,29 @@ entries: platform: os: linux architecture: arm64 + + - name: "kubernetes" + type: "source" + description: "Kubernetes source" + version: "0.1.0" # should support also without the `v` + urls: + # different url syntax + - url: https://github.com/kubeshop/botkube/releases/download/v0.1.0/darwin_amd64_source_kubernetes + platform: + os: darwin + architecture: amd64 + # different url syntax + - url: https://github.com/kubeshop/botkube/releases/download/v0.1.0/darwin_arm64_source_kubernetes + platform: + os: darwin + architecture: arm64 + # different url syntax + - url: https://github.com/kubeshop/botkube/releases/download/v0.1.0/linux-_md64_source_kubernetes + platform: + os: linux + architecture: amd64 + # different url syntax + - url: https://github.com/kubeshop/botkube/releases/download/v0.1.0/linux-_rm64_source_kubernetes + platform: + os: linux + architecture: arm64 diff --git a/pkg/source/registration.go b/internal/source/registration.go similarity index 100% rename from pkg/source/registration.go rename to internal/source/registration.go diff --git a/pkg/source/registration_test.go b/internal/source/registration_test.go similarity index 100% rename from pkg/source/registration_test.go rename to internal/source/registration_test.go diff --git a/pkg/source/router.go b/internal/source/router.go similarity index 100% rename from pkg/source/router.go rename to internal/source/router.go diff --git a/pkg/source/router_test.go b/internal/source/router_test.go similarity index 100% rename from pkg/source/router_test.go rename to internal/source/router_test.go diff --git a/internal/source/source.go b/internal/source/source.go new file mode 100644 index 000000000..c715d2967 --- /dev/null +++ b/internal/source/source.go @@ -0,0 +1,187 @@ +package source + +import ( + "context" + "fmt" + "strings" + + "github.com/sirupsen/logrus" + + "github.com/kubeshop/botkube/internal/plugin" + "github.com/kubeshop/botkube/pkg/bot/interactive" + "github.com/kubeshop/botkube/pkg/config" + "github.com/kubeshop/botkube/pkg/notifier" +) + +// Dispatcher watches for enabled sources events and send them to notifiers. +type Dispatcher struct { + log logrus.FieldLogger + notifiers []notifier.Notifier + manager *plugin.Manager + cfg *config.Config + starter func(ctx context.Context, pluginName string, pluginConfigs []any, sources []string) error + + // startedProcesses holds information about started unique plugin processes + // We start a new plugin process each time we see a new order of source bindings. + // We do that because we pass the array of configs to each `Stream` method and + // the merging strategy for configs can depend on the order. + // As a result our key is e.g. ['source-name1;source-name2'] + startedProcesses map[string]struct{} +} + +// NewDispatcher create a new Dispatcher instance. +func NewDispatcher(log logrus.FieldLogger, notifiers []notifier.Notifier, manager *plugin.Manager, cfg *config.Config) *Dispatcher { + d := &Dispatcher{ + log: log, + notifiers: notifiers, + manager: manager, + cfg: cfg, + startedProcesses: map[string]struct{}{}, + } + + // TODO: it allows us to mock it for unit-test + // will be removed once we will update the gRPC contract on source + // and passing list of configuration. + d.starter = d.start + + return d +} + +// Start starts all sources and dispatch received events. +func (d *Dispatcher) Start(ctx context.Context) error { + for _, commGroupCfg := range d.cfg.Communications { + if commGroupCfg.Slack.Enabled { + for _, channel := range commGroupCfg.Slack.Channels { + if err := d.startPlugin(ctx, channel.Bindings.Sources); err != nil { + return err + } + } + } + + if commGroupCfg.SocketSlack.Enabled { + for _, channel := range commGroupCfg.SocketSlack.Channels { + if err := d.startPlugin(ctx, channel.Bindings.Sources); err != nil { + return err + } + } + } + + if commGroupCfg.Mattermost.Enabled { + for _, channel := range commGroupCfg.Mattermost.Channels { + if err := d.startPlugin(ctx, channel.Bindings.Sources); err != nil { + return err + } + } + } + + if commGroupCfg.Teams.Enabled { + if err := d.startPlugin(ctx, commGroupCfg.Teams.Bindings.Sources); err != nil { + return err + } + } + + if commGroupCfg.Discord.Enabled { + for _, channel := range commGroupCfg.Discord.Channels { + if err := d.startPlugin(ctx, channel.Bindings.Sources); err != nil { + return err + } + } + } + } + + return nil +} + +func (d *Dispatcher) startPlugin(ctx context.Context, bindSources []string) error { + key := strings.Join(bindSources, ";") + + _, found := d.startedProcesses[key] + if found { + return nil // such configuration was already started + } + + d.startedProcesses[key] = struct{}{} + + // Holds the array of configs for a given plugin. + // For example, ['botkube/kubernetes@v1.0.0']->[]{"cfg1", "cfg2"} + sourcePluginConfigs := map[string][]any{} + for _, sourceCfgGroupName := range bindSources { + plugins := d.cfg.Sources[sourceCfgGroupName].Plugins + for pluginName, pluginCfg := range plugins { + if !pluginCfg.Enabled { + continue + } + sourcePluginConfigs[pluginName] = append(sourcePluginConfigs[pluginName], pluginCfg.Config) + } + } + + for pluginName, configs := range sourcePluginConfigs { + err := d.starter(ctx, pluginName, configs, bindSources) + if err != nil { + return fmt.Errorf("while starting plugin source %s: %w", pluginName, err) + } + } + + return nil +} + +// Once we will have the gRPC contract established with proper Cloud Event schema, we should move also this logic here: +// https://github.com/kubeshop/botkube/blob/525c737956ff820a09321879284037da8bf5d647/pkg/controller/controller.go#L200-L253 +func (d *Dispatcher) start(ctx context.Context, pluginName string, pluginConfigs []any, sources []string) error { + log := d.log.WithFields(logrus.Fields{ + "pluginName": pluginName, + "sources": sources, + }) + + log.Info("Staring source streaming...") + + sourceClient, err := d.manager.GetSource(pluginName) + if err != nil { + return fmt.Errorf("while getting source client for %s: %w", pluginName, err) + } + + // TODO(configure plugin): pass the `pluginConfigs` + _ = pluginConfigs + out, err := sourceClient.Stream(ctx) + if err != nil { + return fmt.Errorf("while opening stream for %s: %w", pluginName, err) + } + + go func() { + for { + select { + case event := <-out.Output: + log.WithField("event", string(event)).Debug("Dispatching received event...") + d.dispatch(ctx, event, sources) + case <-ctx.Done(): + return + } + } + }() + return nil +} + +func (d *Dispatcher) dispatch(ctx context.Context, event []byte, sources []string) { + for _, n := range d.notifiers { + go func(n notifier.Notifier) { + msg := interactive.Message{ + Base: interactive.Base{ + Description: string(event), + }, + } + err := n.SendGenericMessage(ctx, &genericMessage{response: msg}, sources) + if err != nil { + d.log.Errorf("while sending event: %s", err.Error()) + } + }(n) + } +} + +type genericMessage struct { + response interactive.Message +} + +// ForBot returns interactive message. +func (g *genericMessage) ForBot(string) interactive.Message { + return g.response +} diff --git a/internal/source/source_test.go b/internal/source/source_test.go new file mode 100644 index 000000000..e7638b0c3 --- /dev/null +++ b/internal/source/source_test.go @@ -0,0 +1,65 @@ +package source + +import ( + "context" + "path/filepath" + "strings" + "testing" + + logtest "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/kubeshop/botkube/pkg/config" +) + +func TestStartingUniqueProcesses(t *testing.T) { + // given + givenCfg, _, err := config.LoadWithDefaults(func() []string { + return []string{ + testdataFile(t, "config.yaml"), + } + }) + require.NoError(t, err) + + logger, _ := logtest.NewNullLogger() + + expectedProcesses := map[string]struct{}{ + "botkube/keptn@v1.0.0; keptn-us-east-2; keptn-eu-central-1": {}, + "botkube/keptn@v1.0.0; keptn-eu-central-1; keptn-us-east-2": {}, + "botkube/keptn@v1.0.0; keptn-eu-central-1": {}, + "botkube/keptn@v1.0.0; keptn-us-east-2": {}, + } + + assertStarter := func(ctx context.Context, pluginName string, pluginConfigs []any, sources []string) error { + // then configs are specified in a proper order + var expConfigs []any + for _, sourceName := range sources { + expConfigs = append(expConfigs, givenCfg.Sources[sourceName].Plugins[pluginName].Config) + } + assert.Equal(t, expConfigs, pluginConfigs) + + // then only unique process are started + key := []string{pluginName} + key = append(key, sources...) + processKey := strings.Join(key, "; ") + _, found := expectedProcesses[processKey] + if !found { + t.Errorf("starting unwanted process for %s with sources %v", pluginName, sources) + } + delete(expectedProcesses, processKey) + return nil + } + + // when + dispatcher := NewDispatcher(logger, nil, nil, givenCfg) + dispatcher.starter = assertStarter + + err = dispatcher.Start(context.Background()) + require.NoError(t, err) +} + +func testdataFile(t *testing.T, name string) string { + t.Helper() + return filepath.Join("testdata", t.Name(), name) +} diff --git a/internal/source/testdata/TestStartingUniqueProcesses/config.yaml b/internal/source/testdata/TestStartingUniqueProcesses/config.yaml new file mode 100644 index 000000000..bff721ab1 --- /dev/null +++ b/internal/source/testdata/TestStartingUniqueProcesses/config.yaml @@ -0,0 +1,48 @@ +sources: + 'keptn-us-east-2': + botkube/keptn@v1.0.0: + enabled: true + config: + url: 'keptn-us-east-2.local' + 'keptn-eu-central-1': + botkube/keptn@v1.0.0: + enabled: true + config: + url: 'keptn-eu-central-1.local' + +communications: + default-group: + socketSlack: + enabled: true + appToken: "xapp-testing" + botToken: "xoxb-testing" + channels: + all: # proces1 - should use the same process as #random channel as sources order is the same + name: all + bindings: + sources: + - 'keptn-us-east-2' + - 'keptn-eu-central-1' + random: # proces1 -should use the same process as #all channel as sources order is the same + name: random + bindings: + sources: # get events from 2 regions + - 'keptn-us-east-2' + - 'keptn-eu-central-1' + general: # proces2 - should use different process than #all and #random channels as sources order is different + name: general + bindings: + sources: # get events from 2 regions + - 'keptn-eu-central-1' + - 'keptn-us-east-2' + + eu: # proces3 - should use different process as it has only once source + name: eu + bindings: + sources: # get even only from eu + - 'keptn-eu-central-1' + us: # proces4 - should use different process as it has only once source + name: us + bindings: + sources: # get even only from us + - 'keptn-us-east-2' diff --git a/pkg/api/source/grpc_adapter.go b/pkg/api/source/grpc_adapter.go new file mode 100644 index 000000000..3ab32058e --- /dev/null +++ b/pkg/api/source/grpc_adapter.go @@ -0,0 +1,144 @@ +package source + +import ( + "context" + "io" + "log" + + "github.com/hashicorp/go-plugin" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/kubeshop/botkube/pkg/api" +) + +// Source defines the Botkube source plugin functionality. +type Source interface { + Stream(ctx context.Context) (StreamOutput, error) +} + +// StreamOutput contains the stream data. +type StreamOutput struct { + // Output represents the streamed events. It is from start of plugin execution. + Output chan []byte + // TODO: we should consider adding error feedback channel too. +} + +// ProtocolVersion is the version that must match between Botkube core +// and Botkube plugins. This should be bumped whenever a change happens in +// one or the other that makes it so that they can't safely communicate. +// This could be adding a new interface value, it could be how helper/schema computes diffs, etc. +// +// NOTE: In the future we can consider using VersionedPlugins. These can be used to negotiate +// a compatible version between client and server. If this is set, Handshake.ProtocolVersion is not required. +const ProtocolVersion = 1 + +var _ plugin.GRPCPlugin = &Plugin{} + +// Plugin This is the implementation of plugin.GRPCPlugin, so we can serve and consume different Botkube Sources. +type Plugin struct { + // The GRPC plugin must still implement the Plugin interface. + plugin.NetRPCUnsupportedPlugin + + // Source represent a concrete implementation that handles the business logic. + Source Source +} + +// GRPCServer registers plugin for serving with the given GRPCServer. +func (p *Plugin) GRPCServer(_ *plugin.GRPCBroker, s *grpc.Server) error { + RegisterSourceServer(s, &grpcServer{ + Source: p.Source, + }) + return nil +} + +// GRPCClient returns the interface implementation for the plugin that is serving via gRPC by GRPCServer. +func (p *Plugin) GRPCClient(_ context.Context, _ *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) { + return &grpcClient{ + client: NewSourceClient(c), + }, nil +} + +type grpcClient struct { + client SourceClient +} + +func (p *grpcClient) Stream(ctx context.Context) (StreamOutput, error) { + stream, err := p.client.Stream(ctx, &emptypb.Empty{}) + if err != nil { + return StreamOutput{}, err + } + + out := StreamOutput{ + Output: make(chan []byte), + } + + go func() { + for { + // RecvMsg blocks until it receives a message into m or the stream is + // done. It returns io.EOF when the stream completes successfully. + feature, err := stream.Recv() + if err == io.EOF { + break + } + + // On any other error, the stream is aborted and the error contains the RPC + // status. + if err != nil { + log.Print(err) + // TODO: we should consider adding error feedback channel to StreamOutput. + return + } + out.Output <- feature.Output + } + }() + + return out, nil +} + +type grpcServer struct { + UnimplementedSourceServer + Source Source +} + +func (p *grpcServer) Stream(_ *emptypb.Empty, gstream Source_StreamServer) error { + ctx := gstream.Context() + + // It's up to the 'Stream' method to close the returned channels as it sends the data to it. + // We can only use 'ctx' to cancel streaming and release associated resources. + stream, err := p.Source.Stream(ctx) + if err != nil { + return err + } + + for { + select { + case <-ctx.Done(): // client canceled stream, we can release this connection. + return ctx.Err() + case out, ok := <-stream.Output: + if !ok { + return nil // output closed, no more chunk logs + } + + err := gstream.Send(&StreamResponse{ + Output: out, + }) + if err != nil { + return err + } + } + } +} + +// Serve serves given plugins. +func Serve(p map[string]plugin.Plugin) { + plugin.Serve(&plugin.ServeConfig{ + Plugins: p, + HandshakeConfig: plugin.HandshakeConfig{ + ProtocolVersion: ProtocolVersion, + MagicCookieKey: api.HandshakeConfig.MagicCookieKey, + MagicCookieValue: api.HandshakeConfig.MagicCookieValue, + }, + GRPCServer: plugin.DefaultGRPCServer, + }) +} diff --git a/pkg/api/source/source.pb.go b/pkg/api/source/source.pb.go new file mode 100644 index 000000000..d22f227bf --- /dev/null +++ b/pkg/api/source/source.pb.go @@ -0,0 +1,153 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.27.1 +// protoc v3.19.4 +// source: source.proto + +package source + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type StreamResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Output represents the streamed Source events. It is from start of Source execution. + Output []byte `protobuf:"bytes,1,opt,name=output,proto3" json:"output,omitempty"` +} + +func (x *StreamResponse) Reset() { + *x = StreamResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_source_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StreamResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamResponse) ProtoMessage() {} + +func (x *StreamResponse) ProtoReflect() protoreflect.Message { + mi := &file_source_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamResponse.ProtoReflect.Descriptor instead. +func (*StreamResponse) Descriptor() ([]byte, []int) { + return file_source_proto_rawDescGZIP(), []int{0} +} + +func (x *StreamResponse) GetOutput() []byte { + if x != nil { + return x.Output + } + return nil +} + +var File_source_proto protoreflect.FileDescriptor + +var file_source_proto_rawDesc = []byte{ + 0x0a, 0x0c, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, + 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x22, 0x28, 0x0a, 0x0e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x32, 0x46, 0x0a, + 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x3c, 0x0a, 0x06, 0x53, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x16, 0x2e, 0x73, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, 0x10, 0x5a, 0x0e, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, + 0x2f, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_source_proto_rawDescOnce sync.Once + file_source_proto_rawDescData = file_source_proto_rawDesc +) + +func file_source_proto_rawDescGZIP() []byte { + file_source_proto_rawDescOnce.Do(func() { + file_source_proto_rawDescData = protoimpl.X.CompressGZIP(file_source_proto_rawDescData) + }) + return file_source_proto_rawDescData +} + +var file_source_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_source_proto_goTypes = []interface{}{ + (*StreamResponse)(nil), // 0: source.StreamResponse + (*emptypb.Empty)(nil), // 1: google.protobuf.Empty +} +var file_source_proto_depIdxs = []int32{ + 1, // 0: source.Source.Stream:input_type -> google.protobuf.Empty + 0, // 1: source.Source.Stream:output_type -> source.StreamResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_source_proto_init() } +func file_source_proto_init() { + if File_source_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_source_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StreamResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_source_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_source_proto_goTypes, + DependencyIndexes: file_source_proto_depIdxs, + MessageInfos: file_source_proto_msgTypes, + }.Build() + File_source_proto = out.File + file_source_proto_rawDesc = nil + file_source_proto_goTypes = nil + file_source_proto_depIdxs = nil +} diff --git a/pkg/api/source/source_grpc.pb.go b/pkg/api/source/source_grpc.pb.go new file mode 100644 index 000000000..5f9efae78 --- /dev/null +++ b/pkg/api/source/source_grpc.pb.go @@ -0,0 +1,133 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.19.4 +// source: source.proto + +package source + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + emptypb "google.golang.org/protobuf/types/known/emptypb" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// SourceClient is the client API for Source service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type SourceClient interface { + Stream(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (Source_StreamClient, error) +} + +type sourceClient struct { + cc grpc.ClientConnInterface +} + +func NewSourceClient(cc grpc.ClientConnInterface) SourceClient { + return &sourceClient{cc} +} + +func (c *sourceClient) Stream(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (Source_StreamClient, error) { + stream, err := c.cc.NewStream(ctx, &Source_ServiceDesc.Streams[0], "/source.Source/Stream", opts...) + if err != nil { + return nil, err + } + x := &sourceStreamClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Source_StreamClient interface { + Recv() (*StreamResponse, error) + grpc.ClientStream +} + +type sourceStreamClient struct { + grpc.ClientStream +} + +func (x *sourceStreamClient) Recv() (*StreamResponse, error) { + m := new(StreamResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// SourceServer is the server API for Source service. +// All implementations must embed UnimplementedSourceServer +// for forward compatibility +type SourceServer interface { + Stream(*emptypb.Empty, Source_StreamServer) error + mustEmbedUnimplementedSourceServer() +} + +// UnimplementedSourceServer must be embedded to have forward compatible implementations. +type UnimplementedSourceServer struct { +} + +func (UnimplementedSourceServer) Stream(*emptypb.Empty, Source_StreamServer) error { + return status.Errorf(codes.Unimplemented, "method Stream not implemented") +} +func (UnimplementedSourceServer) mustEmbedUnimplementedSourceServer() {} + +// UnsafeSourceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to SourceServer will +// result in compilation errors. +type UnsafeSourceServer interface { + mustEmbedUnimplementedSourceServer() +} + +func RegisterSourceServer(s grpc.ServiceRegistrar, srv SourceServer) { + s.RegisterService(&Source_ServiceDesc, srv) +} + +func _Source_Stream_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(emptypb.Empty) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(SourceServer).Stream(m, &sourceStreamServer{stream}) +} + +type Source_StreamServer interface { + Send(*StreamResponse) error + grpc.ServerStream +} + +type sourceStreamServer struct { + grpc.ServerStream +} + +func (x *sourceStreamServer) Send(m *StreamResponse) error { + return x.ServerStream.SendMsg(m) +} + +// Source_ServiceDesc is the grpc.ServiceDesc for Source service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Source_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "source.Source", + HandlerType: (*SourceServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Stream", + Handler: _Source_Stream_Handler, + ServerStreams: true, + }, + }, + Metadata: "source.proto", +} diff --git a/pkg/config/config.go b/pkg/config/config.go index d8ccb2345..d0cef5495 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -198,6 +198,7 @@ type ActionBindings struct { type Sources struct { DisplayName string `yaml:"displayName"` Kubernetes KubernetesSource `yaml:"kubernetes"` + Plugins PluginsExecutors `koanf:",remain"` } // KubernetesSource contains configuration for Kubernetes sources. diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index e542e5653..99821d51c 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -50,6 +50,15 @@ func TestLoadConfigSuccess(t *testing.T) { func TestLoadConfigWithPlugins(t *testing.T) { // given + expSourcePlugin := config.PluginsExecutors{ + "botkube/keptn": { + Enabled: true, + Config: map[string]interface{}{ + "field": "value", + }, + }, + } + expExecutorPlugin := config.PluginsExecutors{ "botkube/echo": { Enabled: true, @@ -70,6 +79,7 @@ func TestLoadConfigWithPlugins(t *testing.T) { require.NoError(t, err) require.NotNil(t, gotCfg) + assert.Equal(t, expSourcePlugin, gotCfg.Sources["k8s-events"].Plugins) assert.Equal(t, expExecutorPlugin, gotCfg.Executors["plugin-based"].Plugins) } diff --git a/pkg/config/testdata/TestLoadConfigSuccess/config-all.yaml b/pkg/config/testdata/TestLoadConfigSuccess/config-all.yaml index d4d0dff6c..bf10b4a83 100644 --- a/pkg/config/testdata/TestLoadConfigSuccess/config-all.yaml +++ b/pkg/config/testdata/TestLoadConfigSuccess/config-all.yaml @@ -104,6 +104,7 @@ communications: # req 1 elm. sources: 'k8s-events': + displayName: "Plugins & Builtins" kubernetes: recommendations: @@ -204,6 +205,11 @@ sources: - spec.template.spec.containers[*].image - status.readyReplicas + botkube/keptn: + enabled: true + config: + field: value + filters: kubernetes: objectAnnotationChecker: true diff --git a/pkg/config/testdata/TestLoadConfigSuccess/config.golden.yaml b/pkg/config/testdata/TestLoadConfigSuccess/config.golden.yaml index bf1b6ebdf..6ad30aa77 100644 --- a/pkg/config/testdata/TestLoadConfigSuccess/config.golden.yaml +++ b/pkg/config/testdata/TestLoadConfigSuccess/config.golden.yaml @@ -10,7 +10,7 @@ actions: - kubectl-read-only sources: k8s-events: - displayName: "" + displayName: Plugins & Builtins kubernetes: recommendations: ingress: @@ -271,6 +271,11 @@ sources: my-annotation: "true" labels: my-label: "true" + plugins: + botkube/keptn: + enabled: true + config: + field: value executors: kubectl-read-only: kubectl: diff --git a/pkg/config/testdata/TestLoadConfigWithPlugins/config-all.yaml b/pkg/config/testdata/TestLoadConfigWithPlugins/config-all.yaml index 794b56bcd..978caf411 100644 --- a/pkg/config/testdata/TestLoadConfigWithPlugins/config-all.yaml +++ b/pkg/config/testdata/TestLoadConfigWithPlugins/config-all.yaml @@ -16,6 +16,7 @@ communications: # req 1 elm. sources: 'k8s-events': + displayName: "Plugins & Builtins" kubernetes: events: @@ -27,6 +28,12 @@ sources: resources: - name: v1/pods + botkube/keptn: + enabled: true + config: + field: value + + executors: 'kubectl-read-only': # Kubectl executor configs diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 3e785dd1e..4d127742d 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -14,6 +14,7 @@ import ( "k8s.io/client-go/tools/cache" "github.com/kubeshop/botkube/internal/analytics" + "github.com/kubeshop/botkube/internal/source" "github.com/kubeshop/botkube/pkg/bot/interactive" "github.com/kubeshop/botkube/pkg/config" "github.com/kubeshop/botkube/pkg/event" @@ -21,7 +22,6 @@ import ( "github.com/kubeshop/botkube/pkg/multierror" "github.com/kubeshop/botkube/pkg/notifier" "github.com/kubeshop/botkube/pkg/recommendation" - "github.com/kubeshop/botkube/pkg/source" ) const ( diff --git a/proto/source.proto b/proto/source.proto new file mode 100644 index 000000000..52c568210 --- /dev/null +++ b/proto/source.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +import "google/protobuf/empty.proto"; + +option go_package = "pkg/api/source"; + +package source; + +message StreamResponse { + // Output represents the streamed Source events. It is from start of Source execution. + bytes output = 1; +} + +service Source { + rpc Stream(google.protobuf.Empty) returns (stream StreamResponse) {} +} diff --git a/test/e2e/bots_test.go b/test/e2e/bots_test.go index 077c8a2fd..da415cdc1 100644 --- a/test/e2e/bots_test.go +++ b/test/e2e/bots_test.go @@ -22,6 +22,7 @@ import ( "github.com/kubeshop/botkube/pkg/bot/interactive" "github.com/kubeshop/botkube/pkg/config" "github.com/kubeshop/botkube/pkg/filterengine/filters" + "github.com/kubeshop/botkube/test/fake" ) type Config struct { @@ -41,7 +42,7 @@ type Config struct { BotkubePluginRepoURL string `envconfig:"default=BOTKUBE_PLUGINS_REPOSITORIES_BOTKUBE"` } } - Plugins PluginsConfig + Plugins fake.PluginConfig ConfigMap struct { Namespace string `envconfig:"default=botkube"` } @@ -153,7 +154,7 @@ func runBotTest(t *testing.T, require.NoError(t, err) t.Log("Starting plugin server...") - indexEndpoint, startServerFn := NewPluginServer(appCfg.Plugins) + indexEndpoint, startServerFn := fake.NewPluginServer(appCfg.Plugins) go func() { require.NoError(t, startServerFn()) }() @@ -239,14 +240,43 @@ func runBotTest(t *testing.T, assert.NoError(t, err) }) - t.Run("Plugin execution", func(t *testing.T) { - command := "echo test" - expectedBody := codeBlock(command) - expectedMessage := fmt.Sprintf("%s\n%s", cmdHeader(command), expectedBody) + // Those are a temporary tests. When we will extract kubectl and kubernetes as plugins + // they won't be needed anymore. + t.Run("Botkube Plugins", func(t *testing.T) { + t.Run("Echo Executor", func(t *testing.T) { + command := "echo test" + expectedBody := codeBlock(command) + expectedMessage := fmt.Sprintf("%s\n%s", cmdHeader(command), expectedBody) - botDriver.PostMessageToBot(t, botDriver.Channel().Identifier(), command) - err := botDriver.WaitForLastMessageEqual(botDriver.BotUserID(), botDriver.Channel().ID(), expectedMessage) - assert.NoError(t, err) + botDriver.PostMessageToBot(t, botDriver.Channel().Identifier(), command) + err := botDriver.WaitForLastMessageEqual(botDriver.BotUserID(), botDriver.Channel().ID(), expectedMessage) + assert.NoError(t, err) + }) + t.Run("ConfigMap watcher source", func(t *testing.T) { + t.Log("Creating sample ConfigMap...") + cfgMap := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cm-watcher-trigger", + Namespace: appCfg.Deployment.Namespace, + // for now, it allows us to disable the built-in kubernetes source and make sure that + // only the plugged one will respond + Annotations: map[string]string{ + "botkube.io/disable": "true", + }, + }, + } + + cfgMapCli := k8sCli.CoreV1().ConfigMaps(appCfg.Deployment.Namespace) + cfgMap, err = cfgMapCli.Create(context.Background(), cfgMap, metav1.CreateOptions{}) + require.NoError(t, err) + + t.Cleanup(func() { cleanupCreatedCfgMapIfShould(t, cfgMapCli, cfgMap.Name, nil) }) + + t.Log("Expecting bot message channel...") + expectedMsg := fmt.Sprintf("Detected `ADDED` event on `%s/%s`", cfgMap.Namespace, cfgMap.Name) + err = botDriver.WaitForLastMessageEqual(botDriver.BotUserID(), botDriver.Channel().ID(), expectedMsg) + require.NoError(t, err) + }) }) t.Run("Commands list", func(t *testing.T) { diff --git a/test/e2e/plugin_server_test.go b/test/fake/plugin_server.go similarity index 53% rename from test/e2e/plugin_server_test.go rename to test/fake/plugin_server.go index acf1a4de4..8d2ce0d2f 100644 --- a/test/e2e/plugin_server_test.go +++ b/test/fake/plugin_server.go @@ -1,4 +1,4 @@ -package e2e +package fake import ( "fmt" @@ -15,18 +15,26 @@ import ( const ( executorBinaryPrefix = "executor_" + sourceBinaryPrefix = "source_" indexFileEndpoint = "/botkube.yaml" ) -type PluginsConfig struct { - BinariesDirectory string `envconfig:"default=dist"` - Server struct { +type ( + // PluginConfig holds configuration for fake plugin server. + PluginConfig struct { + BinariesDirectory string + Server PluginServer + } + + // PluginServer holds configuration for HTTP plugin server. + PluginServer struct { Host string `envconfig:"default=http://host.k3d.internal"` Port int `envconfig:"default=3000"` } -} +) -func NewPluginServer(cfg PluginsConfig) (string, func() error) { +// NewPluginServer return function to start the fake plugin HTTP server. +func NewPluginServer(cfg PluginConfig) (string, func() error) { fs := http.FileServer(http.Dir(cfg.BinariesDirectory)) http.Handle("/static/", http.StripPrefix("/static/", fs)) @@ -75,33 +83,19 @@ func buildIndex(urlBasePath string, dir string) (plugin.Index, error) { if entry.IsDir() { continue } - if !strings.HasPrefix(entry.Name(), executorBinaryPrefix) { - continue - } - - parts := strings.Split(entry.Name(), "_") - if len(parts) != 4 { - return plugin.Index{}, fmt.Errorf("path %s doesn't follow required pattern ___", entry.Name()) - } - name, os, arch := parts[1], parts[2], parts[3] - item, found := entries[name] - if !found { - item = plugin.IndexEntry{ - Name: name, - Type: plugin.TypeExecutor, - Description: "Executor description", - Version: "v0.1.0", + switch { + case strings.HasPrefix(entry.Name(), executorBinaryPrefix): + err := appendEntry(entries, entry.Name(), "Executor description", urlBasePath, plugin.TypeExecutor) + if err != nil { + return plugin.Index{}, err + } + case strings.HasPrefix(entry.Name(), sourceBinaryPrefix): + err := appendEntry(entries, entry.Name(), "Source description", urlBasePath, plugin.TypeSource) + if err != nil { + return plugin.Index{}, err } } - item.URLs = append(item.URLs, plugin.IndexURL{ - URL: fmt.Sprintf("%s/static/%s", urlBasePath, entry.Name()), - Platform: plugin.IndexURLPlatform{ - OS: os, - Arch: arch, - }, - }) - entries[name] = item } var out plugin.Index @@ -110,3 +104,31 @@ func buildIndex(urlBasePath string, dir string) (plugin.Index, error) { } return out, nil } + +func appendEntry(entries map[string]plugin.IndexEntry, entryName, desc, urlBasePath string, pluginType plugin.Type) error { + parts := strings.Split(entryName, "_") + if len(parts) != 4 { + return fmt.Errorf("path %s doesn't follow required pattern ___", entryName) + } + + name, os, arch := parts[1], parts[2], parts[3] + item, found := entries[name] + if !found { + item = plugin.IndexEntry{ + Name: name, + Type: pluginType, + Description: desc, + Version: "v0.1.0", + } + } + item.URLs = append(item.URLs, plugin.IndexURL{ + URL: fmt.Sprintf("%s/static/%s", urlBasePath, entryName), + Platform: plugin.IndexURLPlatform{ + OS: os, + Arch: arch, + }, + }) + entries[name] = item + + return nil +} diff --git a/test/helpers/plugin_server.go b/test/helpers/plugin_server.go new file mode 100644 index 000000000..ae929b71d --- /dev/null +++ b/test/helpers/plugin_server.go @@ -0,0 +1,39 @@ +package main + +import ( + "log" + "os" + "path/filepath" + + "github.com/kubeshop/botkube/test/fake" +) + +func main() { + dir, err := os.Getwd() + exitOnErr(err) + + host := os.Getenv("PLUGIN_SERVER_HOST") + if host == "" { + host = "http://localhost" + } + + binDir := filepath.Join(dir, "dist") + indexEndpoint, startServerFn := fake.NewPluginServer(fake.PluginConfig{ + BinariesDirectory: binDir, + Server: fake.PluginServer{ + Host: host, + Port: 3000, + }, + }) + + log.Printf("Service plugin binaries from %s\n", binDir) + log.Printf("Botkube repository index URL: %s", indexEndpoint) + err = startServerFn() + exitOnErr(err) +} + +func exitOnErr(err error) { + if err != nil { + log.Fatal(err) + } +}