diff --git a/.github/workflows/validate-generated-files.yml b/.github/workflows/validate-generated-files.yml index 4dbdc482a..82e41b32b 100644 --- a/.github/workflows/validate-generated-files.yml +++ b/.github/workflows/validate-generated-files.yml @@ -25,7 +25,5 @@ jobs: - name: Check generated files run: | export PATH=$PATH:$(go env GOPATH)/bin - make install-tools - make generate - make proto-generate + make install-tools generate proto-generate git diff --exit-code --numstat diff --git a/.gitignore b/.gitignore index 07f5d9b45..4e929e2a5 100644 --- a/.gitignore +++ b/.gitignore @@ -86,3 +86,6 @@ escape_analysis.txt # Profiles *.prof + +# Compiled test wasm processors +pkg/plugin/processor/standalone/test/wasm_processors/*/processor.wasm diff --git a/Makefile b/Makefile index c074092aa..cad4c3698 100644 --- a/Makefile +++ b/Makefile @@ -65,7 +65,7 @@ install-tools: download @go mod tidy generate: - go generate ./... + go generate -x ./... pkg/web/ui/dist: make ui-dist diff --git a/go.mod b/go.mod index 022dbc2c3..500a6db80 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/conduitio/conduit -go 1.21.1 +go 1.21.5 require ( buf.build/gen/go/grpc-ecosystem/grpc-gateway/protocolbuffers/go v1.32.0-20231027202514-3f42134f4c56.1 @@ -17,6 +17,7 @@ require ( github.com/conduitio/conduit-connector-protocol v0.5.1-0.20240104160905-e9e61586fb8d github.com/conduitio/conduit-connector-s3 v0.5.1 github.com/conduitio/conduit-connector-sdk v0.8.0 + github.com/conduitio/conduit-processor-sdk v0.0.0-20240118151737-a75ea9e86bb8 github.com/conduitio/yaml/v3 v3.3.0 github.com/dgraph-io/badger/v4 v4.2.0 github.com/dop251/goja v0.0.0-20230531210528-d7324b2d74f7 @@ -42,6 +43,8 @@ require ( github.com/prometheus/client_model v0.5.0 github.com/prometheus/common v0.45.0 github.com/rs/zerolog v1.31.0 + github.com/stealthrocket/wazergo v0.19.1 + github.com/tetratelabs/wazero v1.5.0 github.com/twmb/go-cache v1.2.0 go.uber.org/goleak v1.3.0 go.uber.org/mock v0.4.0 @@ -296,7 +299,6 @@ require ( github.com/t-yuki/gocover-cobertura v0.0.0-20180217150009-aaee18c8195c // indirect github.com/tdakkota/asciicheck v0.2.0 // indirect github.com/tetafro/godot v1.4.15 // indirect - github.com/tetratelabs/wazero v1.5.0 // indirect github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 // indirect github.com/timonwong/loggercheck v0.9.4 // indirect github.com/tomarrell/wrapcheck/v2 v2.8.1 // indirect diff --git a/go.sum b/go.sum index b288d0492..6b4029a28 100644 --- a/go.sum +++ b/go.sum @@ -1101,6 +1101,8 @@ github.com/conduitio/conduit-connector-s3 v0.5.1 h1:yRo8004ryCIZc/S3iWQ1rN6pm6bj github.com/conduitio/conduit-connector-s3 v0.5.1/go.mod h1:nbxzsyS95gbFJ28Job9vFFB+byRFINSv70/13Yi4mKQ= github.com/conduitio/conduit-connector-sdk v0.8.0 h1:gvchqoj5d3AQsBoIosx4i32L8Ex9+5BuAyHi/IM9VD4= github.com/conduitio/conduit-connector-sdk v0.8.0/go.mod h1:nOz4K3X6fD8YMe5CPbULwSEE18Eu02ZrpT6o6KwQfxs= +github.com/conduitio/conduit-processor-sdk v0.0.0-20240118151737-a75ea9e86bb8 h1:H6Px/c38KiId1XDsb4agp25wOlMsZM2rp4p2kxlHDKM= +github.com/conduitio/conduit-processor-sdk v0.0.0-20240118151737-a75ea9e86bb8/go.mod h1:k0rpE3kOAyDcIsBbS5vMO035XzDGW9FJsC4sgEXCH8Y= github.com/conduitio/yaml/v3 v3.3.0 h1:kbbaOSHcuH39gP4+rgbJGl6DSbLZcJgEaBvkEXJlCsI= github.com/conduitio/yaml/v3 v3.3.0/go.mod h1:JNgFMOX1t8W4YJuRZOh6GggVtSMsgP9XgTw+7dIenpc= github.com/containerd/stargz-snapshotter/estargz v0.15.1 h1:eXJjw9RbkLFgioVaTG+G/ZW/0kEe2oEKCdS/ZxIyoCU= @@ -1957,6 +1959,8 @@ github.com/ssgreg/nlreturn/v2 v2.2.1 h1:X4XDI7jstt3ySqGU86YGAURbxw3oTDPK9sPEi6YE github.com/ssgreg/nlreturn/v2 v2.2.1/go.mod h1:E/iiPB78hV7Szg2YfRgyIrk1AD6JVMTRkkxBiELzh2I= github.com/stbenjam/no-sprintf-host-port v0.1.1 h1:tYugd/yrm1O0dV+ThCbaKZh195Dfm07ysF0U6JQXczc= github.com/stbenjam/no-sprintf-host-port v0.1.1/go.mod h1:TLhvtIvONRzdmkFiio4O8LHsN9N74I+PhRquPsxpL0I= +github.com/stealthrocket/wazergo v0.19.1 h1:BPrITETPgSFwiytwmToO0MbUC/+RGC39JScz1JmmG6c= +github.com/stealthrocket/wazergo v0.19.1/go.mod h1:riI0hxw4ndZA5e6z7PesHg2BtTftcZaMxRcoiGGipTs= github.com/stoewer/go-strcase v1.3.0 h1:g0eASXYtp+yvN9fK8sH94oCIk0fau9uV1/ZdJ0AVEzs= github.com/stoewer/go-strcase v1.3.0/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8wodgtPmh1xo= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/pkg/foundation/cerrors/cerrors.go b/pkg/foundation/cerrors/cerrors.go index 1f935060d..f24f84832 100644 --- a/pkg/foundation/cerrors/cerrors.go +++ b/pkg/foundation/cerrors/cerrors.go @@ -37,6 +37,7 @@ var ( Is = errors.Is As = errors.As Unwrap = errors.Unwrap + Join = errors.Join ) type Frame struct { diff --git a/pkg/foundation/log/ctxlogger.go b/pkg/foundation/log/ctxlogger.go index 8ba0c28a5..edf5d344a 100644 --- a/pkg/foundation/log/ctxlogger.go +++ b/pkg/foundation/log/ctxlogger.go @@ -16,6 +16,9 @@ package log import ( "context" + "reflect" + "strings" + "testing" "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/rs/zerolog" @@ -46,6 +49,11 @@ func Nop() CtxLogger { return CtxLogger{Logger: zerolog.Nop()} } +// Test returns a test logger that writes to the supplied testing.TB. +func Test(t testing.TB) CtxLogger { + return CtxLogger{Logger: zerolog.New(zerolog.NewTestWriter(t))} +} + // InitLogger returns a logger initialized with the wanted level and format func InitLogger(level zerolog.Level, f Format) CtxLogger { var w = GetWriter(f) @@ -67,6 +75,20 @@ func (l CtxLogger) WithComponent(component string) CtxLogger { return l } +func (l CtxLogger) WithComponentFromType(c any) CtxLogger { + cType := reflect.TypeOf(c) + for cType.Kind() == reflect.Ptr || cType.Kind() == reflect.Interface { + cType = cType.Elem() + } + + pkgPath := cType.PkgPath() + pkgPath = strings.TrimPrefix(pkgPath, "github.com/conduitio/conduit/pkg/") + pkgPath = strings.ReplaceAll(pkgPath, "/", ".") + typeName := cType.Name() + l.component = pkgPath + "." + typeName + return l +} + func (l CtxLogger) Component() string { return l.component } diff --git a/pkg/foundation/log/ctxlogger_test.go b/pkg/foundation/log/ctxlogger_test.go index 5e4edb80e..2054b93ae 100644 --- a/pkg/foundation/log/ctxlogger_test.go +++ b/pkg/foundation/log/ctxlogger_test.go @@ -41,6 +41,20 @@ func TestCtxLoggerComponent(t *testing.T) { is.Equal(`{"level":"info","component":"test","message":"testing component"}`+"\n", got) } +type testComponent struct{} + +func TestCtxLoggerComponentFromType(t *testing.T) { + is := is.New(t) + + logger := New(zerolog.New(zerolog.NewTestWriter(t))) + + logger = logger.WithComponentFromType(testComponent{}) + is.Equal("foundation.log.testComponent", logger.Component()) + + logger = logger.WithComponentFromType(&testComponent{}) + is.Equal("foundation.log.testComponent", logger.Component()) +} + func TestCtxLoggerWithoutHooks(t *testing.T) { ctx := context.Background() diff --git a/pkg/foundation/log/fields.go b/pkg/foundation/log/fields.go index b79d0f517..dd0bbc61f 100644 --- a/pkg/foundation/log/fields.go +++ b/pkg/foundation/log/fields.go @@ -22,6 +22,7 @@ const ( NodeIDField = "node_id" ParallelWorkerIDField = "parallel_worker_id" PipelineIDField = "pipeline_id" + ProcessorIDField = "processor_id" RecordPositionField = "record_position" RequestIDField = "request_id" ServerAddressField = "address" diff --git a/pkg/plugin/connector/builtin/registry.go b/pkg/plugin/connector/builtin/registry.go index 605fd3c8c..b0a34667e 100644 --- a/pkg/plugin/connector/builtin/registry.go +++ b/pkg/plugin/connector/builtin/registry.go @@ -85,7 +85,7 @@ func NewDispenserFactory(conn sdk.Connector) DispenserFactory { } func NewRegistry(logger log.CtxLogger, factories map[string]DispenserFactory) *Registry { - logger = logger.WithComponent("builtin.Registry") + logger = logger.WithComponentFromType(Registry{}) buildInfo, ok := debug.ReadBuildInfo() if !ok { // we are using modules, build info should always be available, we are staying on the safe side diff --git a/pkg/plugin/connector/standalone/registry.go b/pkg/plugin/connector/standalone/registry.go index a3a2a5068..1716d3865 100644 --- a/pkg/plugin/connector/standalone/registry.go +++ b/pkg/plugin/connector/standalone/registry.go @@ -50,14 +50,14 @@ type blueprint struct { func NewRegistry(logger log.CtxLogger, pluginDir string) *Registry { r := &Registry{ - logger: logger.WithComponent("standalone.Registry"), + logger: logger.WithComponentFromType(Registry{}), } if pluginDir != "" { // extract absolute path to make it clearer in the logs what directory is used absPluginDir, err := filepath.Abs(pluginDir) if err != nil { - r.logger.Warn(context.Background()).Err(err).Msg("could not extract absolute plugins path") + r.logger.Warn(context.Background()).Err(err).Msg("could not extract absolute connector plugins path") } else { r.pluginDir = absPluginDir // store plugin dir for hot reloads r.reloadPlugins() @@ -67,15 +67,11 @@ func NewRegistry(logger log.CtxLogger, pluginDir string) *Registry { r.logger.Info(context.Background()). Str(log.PluginPathField, r.pluginDir). Int("count", len(r.List())). - Msg("standalone plugins initialized") + Msg("standalone connector plugins initialized") return r } -func newFullName(pluginName, pluginVersion string) plugin.FullName { - return plugin.NewFullName(plugin.PluginTypeStandalone, pluginName, pluginVersion) -} - func (r *Registry) reloadPlugins() { plugins := r.loadPlugins(context.Background(), r.pluginDir) r.m.Lock() @@ -84,19 +80,19 @@ func (r *Registry) reloadPlugins() { } func (r *Registry) loadPlugins(ctx context.Context, pluginDir string) map[string]map[string]blueprint { - r.logger.Debug(ctx).Msgf("loading plugins from directory %v", pluginDir) + r.logger.Debug(ctx).Msgf("loading connector plugins from directory %v", pluginDir) plugins := make(map[string]map[string]blueprint) dirEntries, err := os.ReadDir(pluginDir) if err != nil { - r.logger.Warn(ctx).Err(err).Msg("could not read plugin directory") + r.logger.Warn(ctx).Err(err).Msg("could not read connector plugin directory") return plugins // return empty map } warn := func(ctx context.Context, err error, pluginPath string) { r.logger.Warn(ctx). Err(err). Str(log.PluginPathField, pluginPath). - Msgf("could not load standalone plugin") + Msgf("could not load standalone connector plugin") } for _, dirEntry := range dirEntries { @@ -107,24 +103,8 @@ func (r *Registry) loadPlugins(ctx context.Context, pluginDir string) map[string pluginPath := path.Join(pluginDir, dirEntry.Name()) - // create dispenser without a logger to not spam logs on refresh - dispenser, err := standalonev1.NewDispenser(zerolog.Nop(), pluginPath) - if err != nil { - err = cerrors.Errorf("failed to create dispenser: %w", err) - warn(ctx, err, pluginPath) - continue - } - - specPlugin, err := dispenser.DispenseSpecifier() - if err != nil { - err = cerrors.Errorf("failed to dispense specifier (tip: check if the file is a valid plugin binary and if you have permissions for running it): %w", err) - warn(ctx, err, pluginPath) - continue - } - - specs, err := specPlugin.Specify() + specs, err := r.loadSpecifications(pluginPath) if err != nil { - err = cerrors.Errorf("failed to get specs: %w", err) warn(ctx, err, pluginPath) continue } @@ -135,9 +115,9 @@ func (r *Registry) loadPlugins(ctx context.Context, pluginDir string) map[string plugins[specs.Name] = versionMap } - fullName := newFullName(specs.Name, specs.Version) + fullName := plugin.NewFullName(plugin.PluginTypeStandalone, specs.Name, specs.Version) if conflict, ok := versionMap[specs.Version]; ok { - err = cerrors.Errorf("conflict detected, plugin %v already registered, please remove either %v or %v, these plugins won't be usable until that happens", fullName, conflict.path, pluginPath) + err = cerrors.Errorf("conflict detected, connector plugin %v already registered, please remove either %v or %v, these plugins won't be usable until that happens", fullName, conflict.path, pluginPath) warn(ctx, err, pluginPath) // delete plugin from map at the end so that further duplicates can // still be found @@ -163,18 +143,38 @@ func (r *Registry) loadPlugins(ctx context.Context, pluginDir string) map[string r.logger.Debug(ctx). Str(log.PluginPathField, pluginPath). Str(log.PluginNameField, string(bp.fullName)). - Msg("set plugin as latest") + Msg("set connector plugin as latest") } r.logger.Debug(ctx). Str(log.PluginPathField, pluginPath). Str(log.PluginNameField, string(bp.fullName)). - Msg("loaded standalone plugin") + Msg("loaded standalone connector plugin") } return plugins } +func (r *Registry) loadSpecifications(pluginPath string) (connector.Specification, error) { + // create dispenser without a logger to not spam logs on refresh + dispenser, err := standalonev1.NewDispenser(zerolog.Nop(), pluginPath) + if err != nil { + return connector.Specification{}, cerrors.Errorf("failed to create connector dispenser: %w", err) + } + + specPlugin, err := dispenser.DispenseSpecifier() + if err != nil { + return connector.Specification{}, cerrors.Errorf("failed to dispense connector specifier (tip: check if the file is a valid connector plugin binary and if you have permissions for running it): %w", err) + } + + specs, err := specPlugin.Specify() + if err != nil { + return connector.Specification{}, cerrors.Errorf("failed to get connector specs: %w", err) + } + + return specs, nil +} + func (r *Registry) NewDispenser(logger log.CtxLogger, fullName plugin.FullName) (connector.Dispenser, error) { r.m.RLock() defer r.m.RUnlock() @@ -189,7 +189,7 @@ func (r *Registry) NewDispenser(logger log.CtxLogger, fullName plugin.FullName) for k := range versionMap { availableVersions = append(availableVersions, k) } - return nil, cerrors.Errorf("could not find standalone plugin, only found versions %v: %w", availableVersions, plugin.ErrPluginNotFound) + return nil, cerrors.Errorf("could not find standalone connector plugin, only found versions %v: %w", availableVersions, plugin.ErrPluginNotFound) } return standalonev1.NewDispenser(logger.ZerologWithComponent(), bp.path) diff --git a/pkg/plugin/processor/builtin/registry.go b/pkg/plugin/processor/builtin/registry.go new file mode 100644 index 000000000..563b17f01 --- /dev/null +++ b/pkg/plugin/processor/builtin/registry.go @@ -0,0 +1,174 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package builtin + +import ( + "context" + "reflect" + "runtime/debug" + + sdk "github.com/conduitio/conduit-processor-sdk" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/plugin" +) + +type Registry struct { + logger log.CtxLogger + + // plugins stores plugin blueprints in a 2D map, first key is the plugin + // name, the second key is the plugin version + plugins map[string]map[string]blueprint +} + +type blueprint struct { + fullName plugin.FullName + specification sdk.Specification + constructor ProcessorPluginConstructor +} + +type ProcessorPluginConstructor func(log.CtxLogger) sdk.Processor + +func NewRegistry(logger log.CtxLogger, constructors map[string]ProcessorPluginConstructor) *Registry { + logger = logger.WithComponent("builtin.Registry") + buildInfo, ok := debug.ReadBuildInfo() + if !ok { + // we are using modules, build info should always be available, we are staying on the safe side + logger.Warn(context.Background()).Msg("build info not available, built-in plugin versions may not be read correctly") + buildInfo = &debug.BuildInfo{} // prevent nil pointer exceptions + } + + r := &Registry{ + plugins: loadPlugins(buildInfo, constructors), + logger: logger, + } + logger.Info(context.Background()).Int("count", len(r.List())).Msg("builtin plugins initialized") + return r +} + +func NewProcessorPluginConstructor(processorPlugin sdk.Processor) ProcessorPluginConstructor { + procType := reflect.TypeOf(processorPlugin) + for procType.Kind() != reflect.Struct { + procType.Elem() + } + + f := func(logger log.CtxLogger) sdk.Processor { + // TODO create processor plugin wrapper that injects logger into context + // before forwarding the call to the plugin + newProcValue := reflect.New(procType) + return newProcValue.Interface().(sdk.Processor) + } + + // try out f, to catch any panic early + f(log.CtxLogger{}) + + return f +} + +func loadPlugins(buildInfo *debug.BuildInfo, constructors map[string]ProcessorPluginConstructor) map[string]map[string]blueprint { + plugins := make(map[string]map[string]blueprint, len(constructors)) + for moduleName, constructor := range constructors { + specs, err := getSpecification(moduleName, constructor, buildInfo) + if err != nil { + // stop initialization if a built-in plugin is misbehaving + panic(err) + } + + versionMap := plugins[specs.Name] + if versionMap == nil { + versionMap = make(map[string]blueprint) + plugins[specs.Name] = versionMap + } + + fullName := newFullName(specs.Name, specs.Version) + if _, ok := versionMap[specs.Version]; ok { + panic(cerrors.Errorf("plugin %q already registered", fullName)) + } + + bp := blueprint{ + fullName: fullName, + constructor: constructor, + specification: specs, + } + versionMap[specs.Version] = bp + + latestBp, ok := versionMap[plugin.PluginVersionLatest] + if !ok || fullName.PluginVersionGreaterThan(latestBp.fullName) { + versionMap[plugin.PluginVersionLatest] = bp + } + } + return plugins +} + +func getSpecification(moduleName string, constructor ProcessorPluginConstructor, buildInfo *debug.BuildInfo) (sdk.Specification, error) { + procPlugin := constructor(log.CtxLogger{}) + specs, err := procPlugin.Specification() + if err != nil { + return sdk.Specification{}, err + } + + if version := getModuleVersion(buildInfo.Deps, moduleName); version != "" { + // overwrite version with the import version + specs.Version = version + } + + return specs, nil +} + +func getModuleVersion(deps []*debug.Module, moduleName string) string { + for _, dep := range deps { + if dep.Path == moduleName { + if dep.Replace != nil { + return dep.Replace.Version + } + return dep.Version + } + } + return "" +} + +func newFullName(pluginName, pluginVersion string) plugin.FullName { + return plugin.NewFullName(plugin.PluginTypeBuiltin, pluginName, pluginVersion) +} + +func (r *Registry) NewProcessorPlugin(logger log.CtxLogger, fullName plugin.FullName) (sdk.Processor, error) { + versionMap, ok := r.plugins[fullName.PluginName()] + if !ok { + return nil, plugin.ErrPluginNotFound + } + b, ok := versionMap[fullName.PluginVersion()] + if !ok { + availableVersions := make([]string, 0, len(versionMap)) + for k := range versionMap { + availableVersions = append(availableVersions, k) + } + return nil, cerrors.Errorf("could not find builtin plugin %q, only found versions %v: %w", fullName, availableVersions, plugin.ErrPluginNotFound) + } + + return b.constructor(logger), nil +} + +func (r *Registry) List() map[plugin.FullName]sdk.Specification { + specs := make(map[plugin.FullName]sdk.Specification, len(r.plugins)) + for _, versions := range r.plugins { + for version, bp := range versions { + if version == plugin.PluginVersionLatest { + continue // skip latest versions + } + specs[bp.fullName] = bp.specification + } + } + return specs +} diff --git a/pkg/plugin/processor/builtin/registry_test.go b/pkg/plugin/processor/builtin/registry_test.go new file mode 100644 index 000000000..3f9c55218 --- /dev/null +++ b/pkg/plugin/processor/builtin/registry_test.go @@ -0,0 +1,50 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package builtin + +import ( + "testing" + + sdk "github.com/conduitio/conduit-processor-sdk" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/plugin" + "github.com/conduitio/conduit/pkg/plugin/processor/mock" + "github.com/matryer/is" + "go.uber.org/mock/gomock" +) + +func TestRegistry_List(t *testing.T) { + is := is.New(t) + logger := log.Nop() + + ctrl := gomock.NewController(t) + procPlugin := mock.NewProcessor(ctrl) + + procSpec := sdk.Specification{ + Name: "test-processor", + Version: "v0.1.2", + } + procPlugin.EXPECT().Specification().Return(procSpec, nil) + procConstructor := func(log.CtxLogger) sdk.Processor { return procPlugin } + + wantList := map[plugin.FullName]sdk.Specification{ + "builtin:test-processor@v0.1.2": procSpec, + } + + reg := NewRegistry(logger, map[string]ProcessorPluginConstructor{procSpec.Name: procConstructor}) + + got := reg.List() + is.Equal(got, wantList) +} diff --git a/pkg/plugin/processor/mock/processor.go b/pkg/plugin/processor/mock/processor.go new file mode 100644 index 000000000..423c85dac --- /dev/null +++ b/pkg/plugin/processor/mock/processor.go @@ -0,0 +1,126 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/conduitio/conduit-processor-sdk (interfaces: Processor) +// +// Generated by this command: +// +// mockgen -destination=mock/processor.go -package=mock -mock_names=Processor=Processor github.com/conduitio/conduit-processor-sdk Processor +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + opencdc "github.com/conduitio/conduit-commons/opencdc" + sdk "github.com/conduitio/conduit-processor-sdk" + gomock "go.uber.org/mock/gomock" +) + +// Processor is a mock of Processor interface. +type Processor struct { + sdk.UnimplementedProcessor + ctrl *gomock.Controller + recorder *ProcessorMockRecorder +} + +// ProcessorMockRecorder is the mock recorder for Processor. +type ProcessorMockRecorder struct { + mock *Processor +} + +// NewProcessor creates a new mock instance. +func NewProcessor(ctrl *gomock.Controller) *Processor { + mock := &Processor{ctrl: ctrl} + mock.recorder = &ProcessorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *Processor) EXPECT() *ProcessorMockRecorder { + return m.recorder +} + +// Configure mocks base method. +func (m *Processor) Configure(arg0 context.Context, arg1 map[string]string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Configure", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// Configure indicates an expected call of Configure. +func (mr *ProcessorMockRecorder) Configure(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Configure", reflect.TypeOf((*Processor)(nil).Configure), arg0, arg1) +} + +// Open mocks base method. +func (m *Processor) Open(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Open", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Open indicates an expected call of Open. +func (mr *ProcessorMockRecorder) Open(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Open", reflect.TypeOf((*Processor)(nil).Open), arg0) +} + +// Process mocks base method. +func (m *Processor) Process(arg0 context.Context, arg1 []opencdc.Record) []sdk.ProcessedRecord { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Process", arg0, arg1) + ret0, _ := ret[0].([]sdk.ProcessedRecord) + return ret0 +} + +// Process indicates an expected call of Process. +func (mr *ProcessorMockRecorder) Process(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Process", reflect.TypeOf((*Processor)(nil).Process), arg0, arg1) +} + +// Specification mocks base method. +func (m *Processor) Specification() (sdk.Specification, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Specification") + ret0, _ := ret[0].(sdk.Specification) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Specification indicates an expected call of Specification. +func (mr *ProcessorMockRecorder) Specification() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Specification", reflect.TypeOf((*Processor)(nil).Specification)) +} + +// Teardown mocks base method. +func (m *Processor) Teardown(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Teardown", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Teardown indicates an expected call of Teardown. +func (mr *ProcessorMockRecorder) Teardown(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Teardown", reflect.TypeOf((*Processor)(nil).Teardown), arg0) +} + +// mustEmbedUnimplementedProcessor mocks base method. +func (m *Processor) mustEmbedUnimplementedProcessor() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "mustEmbedUnimplementedProcessor") +} + +// mustEmbedUnimplementedProcessor indicates an expected call of mustEmbedUnimplementedProcessor. +func (mr *ProcessorMockRecorder) mustEmbedUnimplementedProcessor() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "mustEmbedUnimplementedProcessor", reflect.TypeOf((*Processor)(nil).mustEmbedUnimplementedProcessor)) +} diff --git a/pkg/plugin/processor/processor.go b/pkg/plugin/processor/processor.go new file mode 100644 index 000000000..7e08007f7 --- /dev/null +++ b/pkg/plugin/processor/processor.go @@ -0,0 +1,21 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This generates a mock processor and adds UnimplementedProcessor to it to +// satisfy the sdk.Processor interface. +//go:generate mockgen -destination=mock/processor.go -package=mock -mock_names=Processor=Processor github.com/conduitio/conduit-processor-sdk Processor +//go:generate sed -i.bak -e "/type Processor struct {/a\\\n sdk.UnimplementedProcessor" ./mock/processor.go +//go:generate rm ./mock/processor.go.bak + +package processor diff --git a/pkg/plugin/processor/standalone/host_module.go b/pkg/plugin/processor/standalone/host_module.go new file mode 100644 index 000000000..b0e1abcc8 --- /dev/null +++ b/pkg/plugin/processor/standalone/host_module.go @@ -0,0 +1,147 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package standalone + +import ( + "context" + + processorv1 "github.com/conduitio/conduit-processor-sdk/proto/processor/v1" + "github.com/conduitio/conduit-processor-sdk/wasm" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/stealthrocket/wazergo" + "github.com/stealthrocket/wazergo/types" + "google.golang.org/protobuf/proto" +) + +// hostModule declares the host module that is exported to the WASM module. The +// host module is used to communicate between the WASM module (processor) and Conduit. +var hostModule wazergo.HostModule[*hostModuleInstance] = hostModuleFunctions{ + "command_request": wazergo.F1((*hostModuleInstance).commandRequest), + "command_response": wazergo.F1((*hostModuleInstance).commandResponse), +} + +// hostModuleFunctions type implements HostModule, providing the module name, +// map of exported functions, and the ability to create instances of the module +// type. +type hostModuleFunctions wazergo.Functions[*hostModuleInstance] + +// Name returns the name of the module. +func (f hostModuleFunctions) Name() string { + return "conduit" +} + +// Functions is a helper that returns the exported functions of the module. +func (f hostModuleFunctions) Functions() wazergo.Functions[*hostModuleInstance] { + return (wazergo.Functions[*hostModuleInstance])(f) +} + +// Instantiate creates a new instance of the module. This is called by the +// runtime when a new instance of the module is created. +func (f hostModuleFunctions) Instantiate(_ context.Context, opts ...hostModuleOption) (*hostModuleInstance, error) { + mod := &hostModuleInstance{} + wazergo.Configure(mod, opts...) + if mod.commandRequests == nil { + return nil, cerrors.New("missing command requests channel") + } + if mod.commandResponses == nil { + return nil, cerrors.New("missing command responses channel") + } + return mod, nil +} + +type hostModuleOption = wazergo.Option[*hostModuleInstance] + +func hostModuleOptions( + logger log.CtxLogger, + requests <-chan *processorv1.CommandRequest, + responses chan<- tuple[*processorv1.CommandResponse, error], +) hostModuleOption { + return wazergo.OptionFunc(func(m *hostModuleInstance) { + m.logger = logger + m.commandRequests = requests + m.commandResponses = responses + }) +} + +// hostModuleInstance is used to maintain the state of our module instance. +type hostModuleInstance struct { + logger log.CtxLogger + commandRequests <-chan *processorv1.CommandRequest + commandResponses chan<- tuple[*processorv1.CommandResponse, error] + + parkedCommandRequest *processorv1.CommandRequest +} + +func (*hostModuleInstance) Close(context.Context) error { return nil } + +// commandRequest is the exported function that is called by the WASM module to +// get the next command request. It returns the size of the command request +// message. If the buffer is too small, it returns the size of the command +// request message and parks the command request. The next call to this function +// will return the same command request. +func (m *hostModuleInstance) commandRequest(ctx context.Context, buf types.Bytes) types.Uint32 { + m.logger.Trace(ctx).Msg("executing command_request") + + if m.parkedCommandRequest == nil { + // No parked command, so we need to wait for the next one. If the command + // channel is closed, then we return an error. + var ok bool + m.parkedCommandRequest, ok = <-m.commandRequests + if !ok { + return wasm.ErrorCodeNoMoreCommands + } + } + + // If the buffer is too small, we park the command and return the size of the + // command. The next call to nextCommand will return the same command. + if size := proto.Size(m.parkedCommandRequest); len(buf) < size { + m.logger.Warn(ctx). + Int("command_bytes", size). + Int("allocated_bytes", len(buf)). + Msgf("insufficient memory, command will be parked until next call to command_request") + return types.Uint32(size) + } + + // If the buffer is large enough, we marshal the command into the buffer and + // return the size of the command. The next call to nextCommand will return + // the next command. + out, err := proto.MarshalOptions{}.MarshalAppend(buf[:0], m.parkedCommandRequest) + if err != nil { + m.logger.Err(ctx, err).Msg("failed marshalling protobuf command request") + return wasm.ErrorCodeUnknownCommandRequest + } + m.parkedCommandRequest = nil + + m.logger.Trace(ctx).Msg("returning next command") + return types.Uint32(len(out)) +} + +// commandResponse is the exported function that is called by the WASM module to +// send a command response. It returns 0 on success, or an error code on error. +func (m *hostModuleInstance) commandResponse(ctx context.Context, buf types.Bytes) types.Uint32 { + m.logger.Trace(ctx).Msg("executing command_response") + + var resp processorv1.CommandResponse + err := proto.Unmarshal(buf, &resp) + if err != nil { + m.logger.Err(ctx, err).Msg("failed unmarshalling protobuf command response") + m.commandResponses <- tuple[*processorv1.CommandResponse, error]{nil, err} + return wasm.ErrorCodeUnknownCommandResponse + } + + m.commandResponses <- tuple[*processorv1.CommandResponse, error]{&resp, nil} + return 0 +} diff --git a/pkg/plugin/processor/standalone/logger.go b/pkg/plugin/processor/standalone/logger.go new file mode 100644 index 000000000..eabc3610d --- /dev/null +++ b/pkg/plugin/processor/standalone/logger.go @@ -0,0 +1,103 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package standalone + +import ( + "io" + + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/tetratelabs/wazero" + + "github.com/goccy/go-json" + "github.com/rs/zerolog" +) + +// wasmLogWriter is a logger adapter for the WASM stderr and stdout streams. +// It parses the JSON log events and emits them as structured logs. It expects +// the log events to be in the default format produced by zerolog. If the +// parsing fails, it falls back to writing the raw bytes as-is. +type wasmLogWriter struct { + logger zerolog.Logger +} + +var _ io.Writer = (*wasmLogWriter)(nil) + +func newWasmLogWriter(logger log.CtxLogger, module wazero.CompiledModule) wasmLogWriter { + name := module.Name() + if name == "" { + // no module name, use the component name instead + name = logger.Component() + ".module" + } + logger = logger.WithComponent(name) + return wasmLogWriter{logger: logger.ZerologWithComponent()} +} + +func (l wasmLogWriter) Write(p []byte) (int, error) { + err := l.emitJSONEvent(p) + if err != nil { + // fallback to writing the bytes as-is + return l.logger.Write(p) + } + return len(p), nil +} + +func (l wasmLogWriter) emitJSONEvent(p []byte) error { + var raw map[string]any + err := json.Unmarshal(p, &raw) + if err != nil { + return err + } + + var ( + level = zerolog.DebugLevel // default + msg = "" + ) + + // parse level + if v, ok := raw[zerolog.LevelFieldName]; ok { + delete(raw, zerolog.LevelFieldName) + if s, ok := v.(string); ok { + parsedLvl, err := zerolog.ParseLevel(s) + if err == nil { + level = parsedLvl + } + } + } + + // prepare log event + e := l.logger.WithLevel(level) + if !e.Enabled() { + return nil + } + + // parse message + if v, ok := raw[zerolog.MessageFieldName]; ok { + delete(raw, zerolog.MessageFieldName) + if s, ok := v.(string); ok { + msg = s + } + } + + // don't duplicate timestamp, it's added by zerolog + delete(raw, zerolog.TimestampFieldName) + + // parse unknown fields + for k, v := range raw { + e.Any(k, v) + } + + e.Msg(msg) + return nil +} diff --git a/pkg/plugin/processor/standalone/processor.go b/pkg/plugin/processor/standalone/processor.go new file mode 100644 index 000000000..13c692b05 --- /dev/null +++ b/pkg/plugin/processor/standalone/processor.go @@ -0,0 +1,360 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package standalone + +import ( + "context" + "fmt" + "time" + + "github.com/conduitio/conduit/pkg/plugin" + + "github.com/conduitio/conduit-commons/opencdc" + sdk "github.com/conduitio/conduit-processor-sdk" + processorv1 "github.com/conduitio/conduit-processor-sdk/proto/processor/v1" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/stealthrocket/wazergo" + "github.com/tetratelabs/wazero" + "github.com/tetratelabs/wazero/api" + "github.com/tetratelabs/wazero/sys" +) + +const ( + // magicCookieKey and value are used as a very basic verification + // that a plugin is intended to be launched. This is not a security + // measure, just a UX feature. If the magic cookie doesn't match, + // we show human-friendly output. + magicCookieKey = "CONDUIT_MAGIC_COOKIE" + magicCookieValue = "3stnegqd0x02axggy0vrc4izjeq2zik6g7somyb3ye4vy5iivvjm5s1edppl5oja" + + conduitProcessorIDKey = "CONDUIT_PROCESSOR_ID" + conduitLogLevelKey = "CONDUIT_LOG_LEVEL" +) + +type wasmProcessor struct { + sdk.UnimplementedProcessor + protoconv protoConverter + + id string + logger log.CtxLogger + + // module is the WASM module that implements the processor + module api.Module + + // commandRequests is used to send commands to the actual processor (the + // WASM module) + commandRequests chan *processorv1.CommandRequest + // commandResponses is used to communicate replies between the actual + // processor (the WASM module) and wasmProcessor + commandResponses chan tuple[*processorv1.CommandResponse, error] + + // moduleStopped is used to know when the module stopped running + moduleStopped chan struct{} + // moduleError contains the error returned by the module after it stopped + moduleError error +} + +type tuple[T1, T2 any] struct { + V1 T1 + V2 T2 +} + +func newWASMProcessor( + ctx context.Context, + + runtime wazero.Runtime, + processorModule wazero.CompiledModule, + hostModule *wazergo.CompiledModule[*hostModuleInstance], + + id string, + logger log.CtxLogger, +) (*wasmProcessor, error) { + logger = logger.WithComponent("standalone.wasmProcessor") + logger.Logger = logger.With().Str(log.ProcessorIDField, id).Logger() + wasmLogger := newWasmLogWriter(logger, processorModule) + + commandRequests := make(chan *processorv1.CommandRequest) + commandResponses := make(chan tuple[*processorv1.CommandResponse, error]) + moduleStopped := make(chan struct{}) + + // instantiate conduit host module and inject it into the context + logger.Debug(ctx).Msg("instantiating conduit host module") + ins, err := hostModule.Instantiate( + ctx, + hostModuleOptions( + logger, + commandRequests, + commandResponses, + ), + ) + if err != nil { + return nil, fmt.Errorf("failed to instantiate conduit host module: %w", err) + } + ctx = wazergo.WithModuleInstance(ctx, ins) + + logger.Debug(ctx).Msg("instantiating processor module") + mod, err := runtime.InstantiateModule( + ctx, + processorModule, + wazero.NewModuleConfig(). + WithName(id). // ensure unique module name + WithEnv(magicCookieKey, magicCookieValue). + WithEnv(conduitProcessorIDKey, id). + WithEnv(conduitLogLevelKey, logger.GetLevel().String()). + + // set up logging + WithStdout(wasmLogger). + WithStderr(wasmLogger). + + // enable time.Now to include correct wall time + WithSysWalltime(). + // enable time.Now to include correct monotonic time + WithSysNanotime(). + // enable time.Sleep to sleep for the correct amount of time + WithSysNanosleep(). + + // don't start right away + WithStartFunctions(), + ) + if err != nil { + return nil, fmt.Errorf("failed to instantiate processor module: %w", err) + } + + p := &wasmProcessor{ + id: id, + logger: logger, + module: mod, + commandRequests: commandRequests, + commandResponses: commandResponses, + moduleStopped: moduleStopped, + } + + // Needs to run in a goroutine because the WASM module is blocking as long + // as the "main" function is running + go p.run(ctx) + + return p, nil +} + +// run is the main loop of the WASM module. It runs in a goroutine and blocks +// until the module is closed. +func (p *wasmProcessor) run(ctx context.Context) { + defer close(p.moduleStopped) + + _, err := p.module.ExportedFunction("_start").Call(ctx) + + // main function returned, close the module right away + _ = p.module.Close(ctx) + + if err != nil { + var exitErr *sys.ExitError + if cerrors.As(err, &exitErr) { + if exitErr.ExitCode() == 0 { // All good + err = nil + } + } + } + + p.moduleError = err + p.logger.Err(ctx, err).Msg("WASM module stopped") +} + +func (p *wasmProcessor) Specification() (sdk.Specification, error) { + req := &processorv1.CommandRequest{ + Request: &processorv1.CommandRequest_Specify{ + Specify: &processorv1.Specify_Request{}, + }, + } + + // the function has no context parameter, so we need to set a timeout + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + resp, err := p.executeCommand(ctx, req) + if err != nil { + return sdk.Specification{}, err + } + + switch specResp := resp.Response.(type) { + case *processorv1.CommandResponse_Specify: + return p.protoconv.specification(specResp.Specify), nil + default: + return sdk.Specification{}, fmt.Errorf("unexpected response type: %T", resp) + } +} + +func (p *wasmProcessor) Configure(ctx context.Context, config map[string]string) error { + req := &processorv1.CommandRequest{ + Request: &processorv1.CommandRequest_Configure{ + Configure: &processorv1.Configure_Request{ + Parameters: config, + }, + }, + } + + resp, err := p.executeCommand(ctx, req) + if err != nil { + return err + } + + switch resp.Response.(type) { + case *processorv1.CommandResponse_Configure: + return nil + default: + return fmt.Errorf("unexpected response type: %T", resp) + } +} + +func (p *wasmProcessor) Open(ctx context.Context) error { + req := &processorv1.CommandRequest{ + Request: &processorv1.CommandRequest_Open{ + Open: &processorv1.Open_Request{}, + }, + } + + resp, err := p.executeCommand(ctx, req) + if err != nil { + return err + } + + switch resp.Response.(type) { + case *processorv1.CommandResponse_Open: + return nil + default: + return fmt.Errorf("unexpected response type: %T", resp) + } +} + +func (p *wasmProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord { + protoRecords, err := p.protoconv.records(records) + if err != nil { + p.logger.Err(ctx, err).Msg("failed to convert records to proto") + return []sdk.ProcessedRecord{sdk.ErrorRecord{Error: err}} + } + + req := &processorv1.CommandRequest{ + Request: &processorv1.CommandRequest_Process{ + Process: &processorv1.Process_Request{ + Records: protoRecords, + }, + }, + } + + resp, err := p.executeCommand(ctx, req) + if err != nil { + return []sdk.ProcessedRecord{sdk.ErrorRecord{Error: err}} + } + + switch procResp := resp.Response.(type) { + case *processorv1.CommandResponse_Process: + processedRecords, err := p.protoconv.processedRecords(procResp.Process.Records) + if err != nil { + p.logger.Err(ctx, err).Msg("failed to convert processed records from proto") + return []sdk.ProcessedRecord{sdk.ErrorRecord{Error: err}} + } + return processedRecords + default: + err := fmt.Errorf("unexpected response type: %T", resp) + return []sdk.ProcessedRecord{sdk.ErrorRecord{Error: err}} + } +} + +func (p *wasmProcessor) Teardown(ctx context.Context) error { + // TODO: we should probably have a timeout for the teardown command in case + // the plugin is stuck + teardownErr := p.executeTeardownCommand(ctx) + // close module regardless of teardown error + stopErr := p.closeModule(ctx) + + return cerrors.Join(teardownErr, stopErr, p.moduleError) +} + +func (p *wasmProcessor) executeTeardownCommand(ctx context.Context) error { + req := &processorv1.CommandRequest{ + Request: &processorv1.CommandRequest_Teardown{ + Teardown: &processorv1.Teardown_Request{}, + }, + } + + resp, err := p.executeCommand(ctx, req) + + if err != nil { + return err + } + switch resp.Response.(type) { + case *processorv1.CommandResponse_Teardown: + return nil + default: + return fmt.Errorf("unexpected response type: %T", resp) + } +} + +func (p *wasmProcessor) closeModule(ctx context.Context) error { + // Closing the command channel will send an error code to the WASM module + // signaling it to exit. + close(p.commandRequests) + + select { + case <-ctx.Done(): + // kill the plugin + p.logger.Error(ctx).Msg("context canceled while waiting for teardown, killing plugin") + err := p.module.CloseWithExitCode(ctx, 1) + if err != nil { + return fmt.Errorf("failed to kill processor plugin: %w", err) + } + return ctx.Err() + case <-p.moduleStopped: + return nil + } +} + +// executeCommand sends a command request to the WASM module and waits for the +// response. It returns the response, or an error if the response is an error. +// If the context is canceled, it returns ctx.Err(). +func (p *wasmProcessor) executeCommand(ctx context.Context, req *processorv1.CommandRequest) (*processorv1.CommandResponse, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-p.moduleStopped: + return nil, cerrors.Errorf("processor plugin stopped while trying to send command %T: %w", req.Request, plugin.ErrPluginNotRunning) + case p.commandRequests <- req: + } + + // wait for the response from the WASM module + var resp *processorv1.CommandResponse + var err error + select { + case <-ctx.Done(): + // TODO if this happens we should probably kill the plugin, as it's + // probably stuck + return nil, ctx.Err() + case <-p.moduleStopped: + return nil, cerrors.Errorf("processor plugin stopped while waiting for response to command %T: %w", req.Request, plugin.ErrPluginNotRunning) + case crTuple := <-p.commandResponses: + resp, err = crTuple.V1, crTuple.V2 + } + + if err != nil { + return nil, err + } + + // check if the response is an error + if errResp, ok := resp.Response.(*processorv1.CommandResponse_Error); ok { + return nil, p.protoconv.error(errResp.Error) + } + + return resp, nil +} diff --git a/pkg/plugin/processor/standalone/processor_test.go b/pkg/plugin/processor/standalone/processor_test.go new file mode 100644 index 000000000..ffcb49a69 --- /dev/null +++ b/pkg/plugin/processor/standalone/processor_test.go @@ -0,0 +1,238 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package standalone + +import ( + "context" + "testing" + + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/plugin" + + "github.com/conduitio/conduit-commons/opencdc" + + sdk "github.com/conduitio/conduit-processor-sdk" + "github.com/conduitio/conduit-processor-sdk/wasm" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/matryer/is" +) + +func TestWASMProcessor_Specification_Success(t *testing.T) { + is := is.New(t) + ctx := context.Background() + logger := log.Test(t) + + underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, "test-processor", logger) + is.NoErr(err) + + gotSpec, err := underTest.Specification() + is.NoErr(err) + + wantSpec := ChaosProcessorSpecifications() + is.Equal(gotSpec, wantSpec) + + is.NoErr(underTest.Teardown(ctx)) +} + +func TestWASMProcessor_Specification_Error(t *testing.T) { + is := is.New(t) + ctx := context.Background() + logger := log.Test(t) + + underTest, err := newWASMProcessor(ctx, TestRuntime, SpecifyErrorModule, CompiledHostModule, "test-processor", logger) + is.NoErr(err) + + _, err = underTest.Specification() + is.Equal(err, wasm.NewError(0, "boom")) + + // Teardown still works + is.NoErr(underTest.Teardown(ctx)) +} + +func TestWASMProcessor_Configure_Success(t *testing.T) { + is := is.New(t) + ctx := context.Background() + logger := log.Test(t) + + underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, "test-processor", logger) + is.NoErr(err) + + err = underTest.Configure(ctx, nil) + is.NoErr(err) + + is.NoErr(underTest.Teardown(ctx)) +} + +func TestWASMProcessor_Configure_Error(t *testing.T) { + is := is.New(t) + ctx := context.Background() + logger := log.Test(t) + + underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, "test-processor", logger) + is.NoErr(err) + + err = underTest.Configure(ctx, map[string]string{"configure": "error"}) + is.Equal(err, wasm.NewError(0, "boom")) + + // Teardown still works + is.NoErr(underTest.Teardown(ctx)) +} + +func TestWASMProcessor_Configure_Panic(t *testing.T) { + is := is.New(t) + ctx := context.Background() + logger := log.Test(t) + + underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, "test-processor", logger) + is.NoErr(err) + + err = underTest.Configure(ctx, map[string]string{"configure": "panic"}) + is.True(cerrors.Is(err, plugin.ErrPluginNotRunning)) + + // Teardown should also fail with the same error + err = underTest.Teardown(ctx) + is.True(cerrors.Is(err, plugin.ErrPluginNotRunning)) +} + +func TestWASMProcessor_Open_Success(t *testing.T) { + is := is.New(t) + ctx := context.Background() + logger := log.Test(t) + + underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, "test-processor", logger) + is.NoErr(err) + + err = underTest.Open(ctx) + is.NoErr(err) + + is.NoErr(underTest.Teardown(ctx)) +} + +func TestWASMProcessor_Open_Error(t *testing.T) { + is := is.New(t) + ctx := context.Background() + logger := log.Test(t) + + underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, "test-processor", logger) + is.NoErr(err) + + err = underTest.Configure(ctx, map[string]string{"open": "error"}) + is.NoErr(err) + + err = underTest.Open(ctx) + is.Equal(err, wasm.NewError(0, "boom")) + + // Teardown still works + is.NoErr(underTest.Teardown(ctx)) +} + +func TestWASMProcessor_Open_Panic(t *testing.T) { + is := is.New(t) + ctx := context.Background() + logger := log.Test(t) + + underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, "test-processor", logger) + is.NoErr(err) + + err = underTest.Configure(ctx, map[string]string{"open": "panic"}) + is.NoErr(err) + + err = underTest.Open(ctx) + is.True(cerrors.Is(err, plugin.ErrPluginNotRunning)) + + // Teardown should also fail with the same error + err = underTest.Teardown(ctx) + is.True(cerrors.Is(err, plugin.ErrPluginNotRunning)) +} + +func TestWASMProcessor_Process_Success(t *testing.T) { + is := is.New(t) + ctx := context.Background() + logger := log.Test(t) + + underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, "test-processor", logger) + is.NoErr(err) + + is.NoErr(underTest.Configure(ctx, map[string]string{"process.prefix": "hello!\n\n"})) + + processed := underTest.Process(ctx, nil) + is.Equal(0, len(processed)) + + processed = underTest.Process(ctx, []opencdc.Record{}) + is.Equal(0, len(processed)) + + input := opencdc.Record{ + Position: opencdc.Position("first left then right"), + Operation: opencdc.OperationCreate, + Metadata: map[string]string{ + "street": "23rd", + }, + Key: opencdc.RawData("broken"), + Payload: opencdc.Change{ + After: opencdc.RawData("oranges"), + }, + } + want := sdk.SingleRecord(input.Clone()) + want.Payload.After = opencdc.RawData("hello!\n\n" + string(want.Payload.After.Bytes())) + + processed = underTest.Process(ctx, []opencdc.Record{input}) + is.Equal(1, len(processed)) + is.Equal(want, processed[0]) + + is.NoErr(underTest.Teardown(ctx)) +} + +func TestWASMProcessor_Process_Error(t *testing.T) { + is := is.New(t) + ctx := context.Background() + logger := log.Test(t) + + underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, "test-processor", logger) + is.NoErr(err) + + is.NoErr(underTest.Configure(ctx, map[string]string{"process": "error"})) + + processed := underTest.Process(ctx, nil) + is.Equal(1, len(processed)) + + errRecord, ok := processed[0].(sdk.ErrorRecord) + is.True(ok) + is.Equal(errRecord.Error, wasm.NewError(0, "boom")) + + // Teardown still works + is.NoErr(underTest.Teardown(ctx)) +} + +func TestWASMProcessor_Process_Panic(t *testing.T) { + is := is.New(t) + ctx := context.Background() + logger := log.Test(t) + + underTest, err := newWASMProcessor(ctx, TestRuntime, ChaosProcessorModule, CompiledHostModule, "test-processor", logger) + is.NoErr(err) + + is.NoErr(underTest.Configure(ctx, map[string]string{"process": "panic"})) + + processed := underTest.Process(ctx, nil) + is.Equal(1, len(processed)) + + errRecord, ok := processed[0].(sdk.ErrorRecord) + is.True(ok) + is.True(cerrors.Is(errRecord.Error, plugin.ErrPluginNotRunning)) + + // Teardown should also fail with the same error + err = underTest.Teardown(ctx) + is.True(cerrors.Is(err, plugin.ErrPluginNotRunning)) +} diff --git a/pkg/plugin/processor/standalone/proto.go b/pkg/plugin/processor/standalone/proto.go new file mode 100644 index 000000000..dc75c005e --- /dev/null +++ b/pkg/plugin/processor/standalone/proto.go @@ -0,0 +1,153 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package standalone + +import ( + "fmt" + + "github.com/conduitio/conduit-commons/opencdc" + opencdcv1 "github.com/conduitio/conduit-commons/proto/opencdc/v1" + sdk "github.com/conduitio/conduit-processor-sdk" + processorv1 "github.com/conduitio/conduit-processor-sdk/proto/processor/v1" + "github.com/conduitio/conduit-processor-sdk/wasm" +) + +// protoConverter converts between the SDK and protobuf types. +type protoConverter struct{} + +func (c protoConverter) specification(resp *processorv1.Specify_Response) sdk.Specification { + return sdk.Specification{ + Name: resp.Name, + Summary: resp.Summary, + Description: resp.Description, + Version: resp.Version, + Author: resp.Author, + Parameters: c.specificationParameters(resp.Parameters), + } +} + +func (c protoConverter) specificationParameters(in map[string]*processorv1.Specify_Parameter) map[string]sdk.Parameter { + if in == nil { + return nil + } + + out := make(map[string]sdk.Parameter, len(in)) + for name, param := range in { + out[name] = sdk.Parameter{ + Default: param.Default, + Type: sdk.ParameterType(param.Type), + Description: param.Description, + Validations: c.specificationParameterValidations(param.Validations), + } + } + + return out +} + +func (c protoConverter) specificationParameterValidations(in []*processorv1.Specify_Parameter_Validation) []sdk.Validation { + if in == nil { + return nil + } + + out := make([]sdk.Validation, len(in)) + for i, v := range in { + out[i] = sdk.Validation{ + Type: sdk.ValidationType(v.Type), + Value: v.Value, + } + } + + return out +} + +func (c protoConverter) records(in []opencdc.Record) ([]*opencdcv1.Record, error) { + if in == nil { + return nil, nil + } + + out := make([]*opencdcv1.Record, len(in)) + for i, r := range in { + out[i] = &opencdcv1.Record{} + err := r.ToProto(out[i]) + if err != nil { + return nil, err + } + } + + return out, nil +} + +func (c protoConverter) processedRecords(in []*processorv1.Process_ProcessedRecord) ([]sdk.ProcessedRecord, error) { + if in == nil { + return nil, nil + } + + out := make([]sdk.ProcessedRecord, len(in)) + var err error + for i, r := range in { + out[i], err = c.processedRecord(r) + if err != nil { + return nil, err + } + } + + return out, nil +} + +func (c protoConverter) processedRecord(in *processorv1.Process_ProcessedRecord) (sdk.ProcessedRecord, error) { + if in == nil || in.Record == nil { + return nil, nil + } + + switch v := in.Record.(type) { + case *processorv1.Process_ProcessedRecord_SingleRecord: + return c.singleRecord(v) + case *processorv1.Process_ProcessedRecord_FilterRecord: + return c.filterRecord(v) + case *processorv1.Process_ProcessedRecord_ErrorRecord: + return c.errorRecord(v) + default: + return nil, fmt.Errorf("unknown processed record type: %T", in.Record) + } +} + +func (c protoConverter) singleRecord(in *processorv1.Process_ProcessedRecord_SingleRecord) (sdk.SingleRecord, error) { + if in == nil { + return sdk.SingleRecord{}, nil + } + + var rec opencdc.Record + err := rec.FromProto(in.SingleRecord) + if err != nil { + return sdk.SingleRecord{}, err + } + + return sdk.SingleRecord(rec), nil +} + +func (c protoConverter) filterRecord(_ *processorv1.Process_ProcessedRecord_FilterRecord) (sdk.FilterRecord, error) { + return sdk.FilterRecord{}, nil +} + +func (c protoConverter) errorRecord(in *processorv1.Process_ProcessedRecord_ErrorRecord) (sdk.ErrorRecord, error) { + if in == nil || in.ErrorRecord == nil || in.ErrorRecord.Error == nil { + return sdk.ErrorRecord{}, nil + } + return sdk.ErrorRecord{Error: c.error(in.ErrorRecord.Error)}, nil +} + +func (c protoConverter) error(e *processorv1.Error) error { + return wasm.NewError(e.Code, e.Message) +} diff --git a/pkg/plugin/processor/standalone/registry.go b/pkg/plugin/processor/standalone/registry.go new file mode 100644 index 000000000..021477073 --- /dev/null +++ b/pkg/plugin/processor/standalone/registry.go @@ -0,0 +1,285 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package standalone + +import ( + "context" + "fmt" + "os" + "path" + "path/filepath" + "sync" + + "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" + + "github.com/stealthrocket/wazergo" + + "github.com/tetratelabs/wazero" + + sdk "github.com/conduitio/conduit-processor-sdk" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/plugin" +) + +// Registry is a directory registry of processor plugins, organized by plugin +// type, name and version. +// Every file in the specified directory is considered a plugin +// (directories are skipped). +type Registry struct { + logger log.CtxLogger + pluginDir string + runtime wazero.Runtime + + // hostModule is the conduit host module that exposes Conduit host functions + // to the WASM module. The host module is compiled once and instantiated + // multiple times, once for each WASM module. + hostModule *wazergo.CompiledModule[*hostModuleInstance] + + // plugins stores plugin blueprints in a 2D map, first key is the plugin + // name, the second key is the plugin version + plugins map[string]map[string]blueprint + // m guards plugins from being concurrently accessed + m sync.RWMutex +} + +type blueprint struct { + fullName plugin.FullName + specification sdk.Specification + path string + module wazero.CompiledModule + // TODO store hash of plugin binary and compare before running the binary to + // ensure someone can't switch the plugin after we registered it +} + +func NewRegistry(logger log.CtxLogger, pluginDir string) (*Registry, error) { + // context is only used for logging, it's not used for long running operations + ctx := context.Background() + + logger = logger.WithComponentFromType(Registry{}) + + if pluginDir != "" { + // extract absolute path to make it clearer in the logs what directory is used + absPluginDir, err := filepath.Abs(pluginDir) + if err != nil { + logger.Warn(ctx).Err(err).Msg("could not extract absolute processor plugins path") + } else { + pluginDir = absPluginDir + } + } + + // we are using the wasm compiler, context is not used + runtime := wazero.NewRuntime(ctx) + // TODO close runtime on shutdown + + _, err := wasi_snapshot_preview1.Instantiate(ctx, runtime) + if err != nil { + _ = runtime.Close(ctx) + return nil, cerrors.Errorf("failed to instantiate WASI: %w", err) + } + + // init host module + compiledHostModule, err := wazergo.Compile(ctx, runtime, hostModule) + if err != nil { + _ = runtime.Close(ctx) + return nil, cerrors.Errorf("failed to compile host module: %w", err) + } + + r := &Registry{ + logger: logger, + runtime: runtime, + hostModule: compiledHostModule, + pluginDir: pluginDir, + } + + r.reloadPlugins() + r.logger.Info(context.Background()). + Str(log.PluginPathField, r.pluginDir). + Int("count", len(r.List())). + Msg("standalone processor plugins initialized") + + return r, nil +} + +func (r *Registry) NewProcessor(ctx context.Context, fullName plugin.FullName, id string) (sdk.Processor, error) { + r.m.RLock() + defer r.m.RUnlock() + + versions, ok := r.plugins[fullName.PluginName()] + if !ok { + return nil, plugin.ErrPluginNotFound + } + bp, ok := versions[fullName.PluginVersion()] + if !ok { + availableVersions := make([]string, 0, len(versions)) + for k := range versions { + availableVersions = append(availableVersions, k) + } + return nil, cerrors.Errorf("could not find standalone processor plugin, only found versions %v: %w", availableVersions, plugin.ErrPluginNotFound) + } + + p, err := newWASMProcessor(ctx, r.runtime, bp.module, r.hostModule, id, r.logger) + if err != nil { + return nil, cerrors.Errorf("failed to create a new WASM processor: %w", err) + } + + return p, nil +} + +func (r *Registry) reloadPlugins() { + if r.pluginDir == "" { + return // no plugin dir, no plugins to load + } + + plugins := r.loadPlugins(context.Background(), r.pluginDir) + r.m.Lock() + r.plugins = plugins + r.m.Unlock() +} + +func (r *Registry) loadPlugins(ctx context.Context, pluginDir string) map[string]map[string]blueprint { + r.logger.Debug(ctx).Msgf("loading processor plugins from directory %v", pluginDir) + plugins := make(map[string]map[string]blueprint) + + dirEntries, err := os.ReadDir(pluginDir) + if err != nil { + r.logger.Warn(ctx).Err(err).Msg("could not read processor plugin directory") + return plugins // return empty map + } + warn := func(ctx context.Context, err error, pluginPath string) { + r.logger.Warn(ctx). + Err(err). + Str(log.PluginPathField, pluginPath). + Msgf("could not load standalone processor plugin") + } + + for _, dirEntry := range dirEntries { + if dirEntry.IsDir() { + // skip directories + continue + } + + pluginPath := path.Join(pluginDir, dirEntry.Name()) + + // create dispenser without a logger to not spam logs on refresh + module, specs, err := r.loadModuleAndSpecifications(ctx, pluginPath) + if err != nil { + warn(ctx, err, pluginPath) + continue + } + + versionMap := plugins[specs.Name] + if versionMap == nil { + versionMap = make(map[string]blueprint) + plugins[specs.Name] = versionMap + } + + fullName := plugin.NewFullName(plugin.PluginTypeStandalone, specs.Name, specs.Version) + if conflict, ok := versionMap[specs.Version]; ok { + err = cerrors.Errorf("conflict detected, processor plugin %v already registered, please remove either %v or %v, these plugins won't be usable until that happens", fullName, conflict.path, pluginPath) + warn(ctx, err, pluginPath) + // close module as we won't use it + _ = module.Close(ctx) + // delete plugin from map at the end so that further duplicates can + // still be found + defer func() { + delete(versionMap, specs.Version) + if len(versionMap) == 0 { + delete(plugins, specs.Name) + } + }() + continue + } + + bp := blueprint{ + fullName: fullName, + specification: specs, + path: pluginPath, + module: module, + } + versionMap[specs.Version] = bp + + latestFullName := versionMap[plugin.PluginVersionLatest].fullName + if fullName.PluginVersionGreaterThan(latestFullName) { + versionMap[plugin.PluginVersionLatest] = bp + r.logger.Debug(ctx). + Str(log.PluginPathField, pluginPath). + Str(log.PluginNameField, string(bp.fullName)). + Msg("set processor plugin as latest") + } + + r.logger.Debug(ctx). + Str(log.PluginPathField, pluginPath). + Str(log.PluginNameField, string(bp.fullName)). + Msg("loaded standalone processor plugin") + } + + return plugins +} + +func (r *Registry) loadModuleAndSpecifications(ctx context.Context, pluginPath string) (_ wazero.CompiledModule, _ sdk.Specification, err error) { + wasmBytes, err := os.ReadFile(pluginPath) + if err != nil { + return nil, sdk.Specification{}, fmt.Errorf("failed to read WASM file %q: %w", pluginPath, err) + } + + r.logger.Debug(ctx). + Str("path", pluginPath). + Msg("compiling WASM module") + + module, err := r.runtime.CompileModule(ctx, wasmBytes) + if err != nil { + return nil, sdk.Specification{}, fmt.Errorf("failed to compile WASM module: %w", err) + } + defer func() { + if err != nil { + _ = module.Close(ctx) + } + }() + + p, err := newWASMProcessor(ctx, r.runtime, module, r.hostModule, "init-processor", log.Nop()) + if err != nil { + return nil, sdk.Specification{}, fmt.Errorf("failed to create a new WASM processor: %w", err) + } + defer func() { + err := p.Teardown(ctx) + if err != nil { + r.logger.Warn(ctx).Err(err).Msg("processor teardown failed") + } + }() + + specs, err := p.Specification() + if err != nil { + return nil, sdk.Specification{}, err + } + + return module, specs, nil +} + +func (r *Registry) List() map[plugin.FullName]sdk.Specification { + r.m.RLock() + defer r.m.RUnlock() + + specs := make(map[plugin.FullName]sdk.Specification, len(r.plugins)) + for _, versions := range r.plugins { + for version, bp := range versions { + if version == plugin.PluginVersionLatest { + continue // skip latest versions + } + specs[bp.fullName] = bp.specification + } + } + return specs +} diff --git a/pkg/plugin/processor/standalone/registry_test.go b/pkg/plugin/processor/standalone/registry_test.go new file mode 100644 index 000000000..702e9f258 --- /dev/null +++ b/pkg/plugin/processor/standalone/registry_test.go @@ -0,0 +1,141 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package standalone + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/conduitio/conduit-commons/csync" + "github.com/conduitio/conduit-commons/opencdc" + sdk "github.com/conduitio/conduit-processor-sdk" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/plugin" + "github.com/google/uuid" + "github.com/matryer/is" +) + +func TestRegistry_List(t *testing.T) { + is := is.New(t) + + underTest, err := NewRegistry(log.Test(t), testPluginChaosDir) + is.NoErr(err) + list := underTest.List() + is.Equal(1, len(list)) + got, ok := list["standalone:chaos-processor@v1.3.5"] + is.True(ok) // expected spec for standalone:chaos-processor@v1.3.5 + + want := ChaosProcessorSpecifications() + + is.Equal(got, want) +} + +func TestRegistry_MalformedProcessor(t *testing.T) { + is := is.New(t) + + underTest, err := NewRegistry(log.Test(t), testPluginMalformedDir) + is.NoErr(err) + list := underTest.List() + is.Equal(0, len(list)) +} + +func TestRegistry_SpecifyError(t *testing.T) { + is := is.New(t) + + underTest, err := NewRegistry(log.Test(t), testPluginSpecifyErrorDir) + is.NoErr(err) + list := underTest.List() + is.Equal(0, len(list)) +} + +func TestRegistry_ChaosProcessor(t *testing.T) { + ctx := context.Background() + is := is.New(t) + + // reuse this registry for multiple tests, because it's expensive to create + underTest, err := NewRegistry(log.Nop(), testPluginChaosDir) + is.NoErr(err) + + const standaloneProcessorName = plugin.FullName("standalone:chaos-processor@v1.3.5") + + t.Run("List", func(t *testing.T) { + is := is.New(t) + + list := underTest.List() + is.Equal(1, len(list)) + + got, ok := list[standaloneProcessorName] + is.True(ok) + + want := ChaosProcessorSpecifications() + is.Equal(got, want) + }) + + t.Run("NewProcessor", func(t *testing.T) { + is := is.New(t) + + p, err := underTest.NewProcessor(ctx, standaloneProcessorName, "test-processor") + is.NoErr(err) + + got, err := p.Specification() + is.NoErr(err) + + want := ChaosProcessorSpecifications() + is.Equal(got, want) + + is.NoErr(p.Teardown(ctx)) + }) + + t.Run("ConcurrentProcessors", func(t *testing.T) { + const ( + // spawn 50 processors, each processing 50 records simultaneously + processorCount = 50 + recordCount = 50 + ) + + var wg csync.WaitGroup + for i := 0; i < processorCount; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + p, err := underTest.NewProcessor(ctx, "standalone:chaos-processor@v1.3.5", fmt.Sprintf("test-processor-%d", i)) + is.NoErr(err) + + err = p.Configure(ctx, map[string]string{"process.prefix": fmt.Sprintf("%d", i)}) + is.NoErr(err) + + rec := opencdc.Record{ + Payload: opencdc.Change{ + After: opencdc.RawData(uuid.NewString()), + }, + } + want := rec.Clone() + want.Payload.After = opencdc.RawData(fmt.Sprintf("%d", i) + string(want.Payload.After.Bytes())) + + for i := 0; i < recordCount; i++ { + got := p.Process(ctx, []opencdc.Record{rec}) + is.Equal(len(got), 1) + is.Equal(opencdc.Record(got[0].(sdk.SingleRecord)), want) + } + + is.NoErr(p.Teardown(ctx)) + }(i + 1) + } + err = wg.WaitTimeout(ctx, time.Minute) + is.NoErr(err) + }) +} diff --git a/pkg/plugin/processor/standalone/standalone_test.go b/pkg/plugin/processor/standalone/standalone_test.go new file mode 100644 index 000000000..332c63784 --- /dev/null +++ b/pkg/plugin/processor/standalone/standalone_test.go @@ -0,0 +1,152 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package standalone + +import ( + "context" + "fmt" + "os" + "os/exec" + "testing" + "time" + + "github.com/conduitio/conduit/pkg/foundation/csync" + + sdk "github.com/conduitio/conduit-processor-sdk" + + "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" + + "github.com/stealthrocket/wazergo" + "github.com/tetratelabs/wazero" +) + +const ( + testPluginDir = "./test/wasm_processors/" + + testPluginChaosDir = testPluginDir + "chaos/" + testPluginMalformedDir = testPluginDir + "malformed/" + testPluginSpecifyErrorDir = testPluginDir + "specify_error/" +) + +var ( + // TestRuntime can be reused in tests to avoid recompiling the test modules + TestRuntime wazero.Runtime + CompiledHostModule *wazergo.CompiledModule[*hostModuleInstance] + + ChaosProcessorBinary []byte + MalformedProcessorBinary []byte + SpecifyErrorBinary []byte + + ChaosProcessorModule wazero.CompiledModule + SpecifyErrorModule wazero.CompiledModule + + testProcessorPaths = map[string]tuple[*[]byte, *wazero.CompiledModule]{ + testPluginChaosDir + "processor.wasm": {&ChaosProcessorBinary, &ChaosProcessorModule}, + testPluginMalformedDir + "processor.txt": {&MalformedProcessorBinary, nil}, + testPluginSpecifyErrorDir + "processor.wasm": {&SpecifyErrorBinary, &SpecifyErrorModule}, + } +) + +func TestMain(m *testing.M) { + exitOnError := func(err error, msg string) { + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "%v: %v", msg, err) + os.Exit(1) + } + } + + cmd := exec.Command("bash", "./test/build-test-processors.sh") + + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + err := cmd.Run() + exitOnError(err, "error executing bash script") + + // instantiate shared test runtime + ctx := context.Background() + TestRuntime = wazero.NewRuntime(ctx) + + _, err = wasi_snapshot_preview1.Instantiate(ctx, TestRuntime) + exitOnError(err, "error instantiating WASI") + + CompiledHostModule, err = wazergo.Compile(ctx, TestRuntime, hostModule) + + // load test processors + var wg csync.WaitGroup + for path, t := range testProcessorPaths { + *t.V1, err = os.ReadFile(path) + exitOnError(err, "error reading file "+path) + + if t.V2 == nil { + continue + } + + // compile modules in parallel + wg.Add(1) + go func(binary []byte, target *wazero.CompiledModule, path string) { + defer wg.Done() + *target, err = TestRuntime.CompileModule(ctx, binary) + exitOnError(err, "error compiling module "+path) + }(*t.V1, t.V2, path) + } + err = wg.WaitTimeout(ctx, time.Minute) + exitOnError(err, "timed out waiting on modules to compile") + + // run tests + code := m.Run() + + err = TestRuntime.Close(ctx) + exitOnError(err, "error closing wasm runtime") + + os.Exit(code) +} + +func ChaosProcessorSpecifications() sdk.Specification { + param := sdk.Parameter{ + Default: "success", + Type: sdk.ParameterTypeString, + Description: "prefix", + Validations: []sdk.Validation{ + { + Type: sdk.ValidationTypeInclusion, + Value: "success,error,panic", + }, + }, + } + return sdk.Specification{ + Name: "chaos-processor", + Summary: "chaos processor summary", + Description: "chaos processor description", + Version: "v1.3.5", + Author: "Meroxa, Inc.", + Parameters: map[string]sdk.Parameter{ + "configure": param, + "open": param, + "process.prefix": { + Default: "", + Type: sdk.ParameterTypeString, + Description: "prefix to be added to the payload's after", + Validations: []sdk.Validation{ + { + Type: sdk.ValidationTypeRequired, + }, + }, + }, + "process": param, + "teardown": param, + }, + } +} diff --git a/pkg/plugin/processor/standalone/test/build-test-processors.sh b/pkg/plugin/processor/standalone/test/build-test-processors.sh new file mode 100755 index 000000000..14c363590 --- /dev/null +++ b/pkg/plugin/processor/standalone/test/build-test-processors.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" + +WASM_PROCESSORS_DIR="$SCRIPT_DIR/wasm_processors" + +for dir in "$WASM_PROCESSORS_DIR"/*/; do + # Check if the directory contains a .go file + if [ -e "${dir}processor.go" ]; then + cd "$dir" || exit + + GOOS=wasip1 GOARCH=wasm go build -o processor.wasm processor.go + + cd "$WASM_PROCESSORS_DIR" || exit + fi +done diff --git a/pkg/plugin/processor/standalone/test/wasm_processors/chaos/processor.go b/pkg/plugin/processor/standalone/test/wasm_processors/chaos/processor.go new file mode 100644 index 000000000..5f85d698d --- /dev/null +++ b/pkg/plugin/processor/standalone/test/wasm_processors/chaos/processor.go @@ -0,0 +1,126 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build wasm + +package main + +import ( + "context" + "errors" + + "github.com/conduitio/conduit-commons/opencdc" + sdk "github.com/conduitio/conduit-processor-sdk" +) + +func main() { + sdk.Run(&chaosProcessor{}) +} + +type chaosProcessor struct { + sdk.UnimplementedProcessor + cfg map[string]string +} + +func (p *chaosProcessor) Specification() (sdk.Specification, error) { + param := sdk.Parameter{ + Default: "success", + Type: sdk.ParameterTypeString, + Description: "prefix", + Validations: []sdk.Validation{ + { + Type: sdk.ValidationTypeInclusion, + Value: "success,error,panic", + }, + }, + } + return sdk.Specification{ + Name: "chaos-processor", + Summary: "chaos processor summary", + Description: "chaos processor description", + Version: "v1.3.5", + Author: "Meroxa, Inc.", + Parameters: map[string]sdk.Parameter{ + "configure": param, + "open": param, + "process.prefix": { + Default: "", + Type: sdk.ParameterTypeString, + Description: "prefix to be added to the payload's after", + Validations: []sdk.Validation{ + { + Type: sdk.ValidationTypeRequired, + }, + }, + }, + "process": param, + "teardown": param, + }, + }, nil +} + +func (p *chaosProcessor) Configure(_ context.Context, cfg map[string]string) error { + p.cfg = cfg + + err := p.methodBehavior("configure") + if err != nil { + return err + } + + return nil +} + +func (p *chaosProcessor) Open(context.Context) error { + return p.methodBehavior("open") +} + +func (p *chaosProcessor) methodBehavior(name string) error { + switch p.cfg[name] { + case "error": + return errors.New("boom") + case "panic": + panic(name + " panic") + case "", "success": + return nil + default: + panic("unknown mode: " + p.cfg[name]) + } +} + +func (p *chaosProcessor) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord { + err := p.methodBehavior("process") + if err != nil { + // on error we return a single record with the error + return []sdk.ProcessedRecord{sdk.ErrorRecord{Error: err}} + } + + _, ok := p.cfg["process.prefix"] + if !ok { + return []sdk.ProcessedRecord{sdk.ErrorRecord{Error: errors.New("missing prefix")}} + } + + out := make([]sdk.ProcessedRecord, len(records)) + for i, record := range records { + original := record.Payload.After.(opencdc.RawData) + record.Payload.After = opencdc.RawData(p.cfg["process.prefix"] + string(original.Bytes())) + + out[i] = sdk.SingleRecord(record) + } + + return out +} + +func (p *chaosProcessor) Teardown(context.Context) error { + return p.methodBehavior("teardown") +} diff --git a/pkg/plugin/processor/standalone/test/wasm_processors/malformed/processor.txt b/pkg/plugin/processor/standalone/test/wasm_processors/malformed/processor.txt new file mode 100644 index 000000000..f13acb5c1 --- /dev/null +++ b/pkg/plugin/processor/standalone/test/wasm_processors/malformed/processor.txt @@ -0,0 +1 @@ +this is not a valid wasm binary \ No newline at end of file diff --git a/pkg/plugin/processor/standalone/test/wasm_processors/specify_error/processor.go b/pkg/plugin/processor/standalone/test/wasm_processors/specify_error/processor.go new file mode 100644 index 000000000..056873472 --- /dev/null +++ b/pkg/plugin/processor/standalone/test/wasm_processors/specify_error/processor.go @@ -0,0 +1,56 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build wasm + +package main + +import ( + "context" + "errors" + + "github.com/conduitio/conduit-commons/opencdc" + sdk "github.com/conduitio/conduit-processor-sdk" +) + +func main() { + sdk.Run(&testProcessor{}) +} + +type testProcessor struct { + sdk.UnimplementedProcessor +} + +func (p *testProcessor) Specification() (sdk.Specification, error) { + return sdk.Specification{}, errors.New("boom") +} + +func (p *testProcessor) Configure(context.Context, map[string]string) error { + // TODO implement me + panic("implement me") +} + +func (p *testProcessor) Open(context.Context) error { + // TODO implement me + panic("implement me") +} + +func (p *testProcessor) Process(context.Context, []opencdc.Record) []sdk.ProcessedRecord { + // TODO implement me + panic("implement me") +} + +func (p *testProcessor) Teardown(context.Context) error { + return nil +} diff --git a/pkg/plugin/service.go b/pkg/plugin/service.go index 4a8c2e093..3c268f20c 100644 --- a/pkg/plugin/service.go +++ b/pkg/plugin/service.go @@ -58,7 +58,7 @@ func NewService( } } -func (s *Service) Check(_ context.Context) error { +func (s *Service) Check(context.Context) error { return nil }