diff --git a/go.mod b/go.mod index c622a932f..56416d969 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/conduitio/conduit-connector-protocol v0.5.0 github.com/conduitio/conduit-connector-s3 v0.5.0 github.com/conduitio/conduit-connector-sdk v0.8.0 - github.com/conduitio/conduit-processor-sdk v0.0.0-20240104184508-4f61ece4b1c9 + github.com/conduitio/conduit-processor-sdk v0.0.0-20240116154859-538779da04d6 github.com/conduitio/yaml/v3 v3.3.0 github.com/dgraph-io/badger/v4 v4.2.0 github.com/dop251/goja v0.0.0-20230531210528-d7324b2d74f7 @@ -44,6 +44,7 @@ require ( github.com/prometheus/client_model v0.5.0 github.com/prometheus/common v0.45.0 github.com/rs/zerolog v1.31.0 + github.com/stealthrocket/wazergo v0.19.1 github.com/tetratelabs/wazero v1.5.0 github.com/twmb/go-cache v1.2.0 go.uber.org/goleak v1.3.0 diff --git a/go.sum b/go.sum index 41997e0d0..a2da4216d 100644 --- a/go.sum +++ b/go.sum @@ -1038,8 +1038,8 @@ github.com/conduitio/conduit-connector-s3 v0.5.0 h1:nqLcf/foYnDLkXWYcWJX/5UHzTjW github.com/conduitio/conduit-connector-s3 v0.5.0/go.mod h1:I6oE37zz25RTjnQiUBz2rOASwXNqfaMW7gSlsKX6z8E= github.com/conduitio/conduit-connector-sdk v0.8.0 h1:gvchqoj5d3AQsBoIosx4i32L8Ex9+5BuAyHi/IM9VD4= github.com/conduitio/conduit-connector-sdk v0.8.0/go.mod h1:nOz4K3X6fD8YMe5CPbULwSEE18Eu02ZrpT6o6KwQfxs= -github.com/conduitio/conduit-processor-sdk v0.0.0-20240104184508-4f61ece4b1c9 h1:iDtdHWgk7U99f7wVkmNEEciEeAOcy1niFjQcmu0FX1Y= -github.com/conduitio/conduit-processor-sdk v0.0.0-20240104184508-4f61ece4b1c9/go.mod h1:1ckqfqVvCYVF4rt6n2ymIzsmPYfzBYQS5kuTIFORuKg= +github.com/conduitio/conduit-processor-sdk v0.0.0-20240116154859-538779da04d6 h1:AeDSV6ScmbUByQfaLR8+SkoleQAeydDJyVf1C/OICwU= +github.com/conduitio/conduit-processor-sdk v0.0.0-20240116154859-538779da04d6/go.mod h1:k0rpE3kOAyDcIsBbS5vMO035XzDGW9FJsC4sgEXCH8Y= github.com/conduitio/yaml/v3 v3.3.0 h1:kbbaOSHcuH39gP4+rgbJGl6DSbLZcJgEaBvkEXJlCsI= github.com/conduitio/yaml/v3 v3.3.0/go.mod h1:JNgFMOX1t8W4YJuRZOh6GggVtSMsgP9XgTw+7dIenpc= github.com/containerd/stargz-snapshotter/estargz v0.15.1 h1:eXJjw9RbkLFgioVaTG+G/ZW/0kEe2oEKCdS/ZxIyoCU= @@ -1830,6 +1830,8 @@ github.com/ssgreg/nlreturn/v2 v2.2.1 h1:X4XDI7jstt3ySqGU86YGAURbxw3oTDPK9sPEi6YE github.com/ssgreg/nlreturn/v2 v2.2.1/go.mod h1:E/iiPB78hV7Szg2YfRgyIrk1AD6JVMTRkkxBiELzh2I= github.com/stbenjam/no-sprintf-host-port v0.1.1 h1:tYugd/yrm1O0dV+ThCbaKZh195Dfm07ysF0U6JQXczc= github.com/stbenjam/no-sprintf-host-port v0.1.1/go.mod h1:TLhvtIvONRzdmkFiio4O8LHsN9N74I+PhRquPsxpL0I= +github.com/stealthrocket/wazergo v0.19.1 h1:BPrITETPgSFwiytwmToO0MbUC/+RGC39JScz1JmmG6c= +github.com/stealthrocket/wazergo v0.19.1/go.mod h1:riI0hxw4ndZA5e6z7PesHg2BtTftcZaMxRcoiGGipTs= github.com/stoewer/go-strcase v1.3.0 h1:g0eASXYtp+yvN9fK8sH94oCIk0fau9uV1/ZdJ0AVEzs= github.com/stoewer/go-strcase v1.3.0/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8wodgtPmh1xo= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/pkg/foundation/cerrors/cerrors.go b/pkg/foundation/cerrors/cerrors.go index 1f935060d..f24f84832 100644 --- a/pkg/foundation/cerrors/cerrors.go +++ b/pkg/foundation/cerrors/cerrors.go @@ -37,6 +37,7 @@ var ( Is = errors.Is As = errors.As Unwrap = errors.Unwrap + Join = errors.Join ) type Frame struct { diff --git a/pkg/foundation/log/fields.go b/pkg/foundation/log/fields.go index b79d0f517..dd0bbc61f 100644 --- a/pkg/foundation/log/fields.go +++ b/pkg/foundation/log/fields.go @@ -22,6 +22,7 @@ const ( NodeIDField = "node_id" ParallelWorkerIDField = "parallel_worker_id" PipelineIDField = "pipeline_id" + ProcessorIDField = "processor_id" RecordPositionField = "record_position" RequestIDField = "request_id" ServerAddressField = "address" diff --git a/pkg/plugin/processor/builtin/registry_test.go b/pkg/plugin/processor/builtin/registry_test.go index 3d9a1a744..3f9c55218 100644 --- a/pkg/plugin/processor/builtin/registry_test.go +++ b/pkg/plugin/processor/builtin/registry_test.go @@ -18,9 +18,9 @@ import ( "testing" sdk "github.com/conduitio/conduit-processor-sdk" - "github.com/conduitio/conduit-processor-sdk/mock" "github.com/conduitio/conduit/pkg/foundation/log" "github.com/conduitio/conduit/pkg/plugin" + "github.com/conduitio/conduit/pkg/plugin/processor/mock" "github.com/matryer/is" "go.uber.org/mock/gomock" ) diff --git a/pkg/plugin/processor/mock/processor.go b/pkg/plugin/processor/mock/processor.go new file mode 100644 index 000000000..423c85dac --- /dev/null +++ b/pkg/plugin/processor/mock/processor.go @@ -0,0 +1,126 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/conduitio/conduit-processor-sdk (interfaces: Processor) +// +// Generated by this command: +// +// mockgen -destination=mock/processor.go -package=mock -mock_names=Processor=Processor github.com/conduitio/conduit-processor-sdk Processor +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + opencdc "github.com/conduitio/conduit-commons/opencdc" + sdk "github.com/conduitio/conduit-processor-sdk" + gomock "go.uber.org/mock/gomock" +) + +// Processor is a mock of Processor interface. +type Processor struct { + sdk.UnimplementedProcessor + ctrl *gomock.Controller + recorder *ProcessorMockRecorder +} + +// ProcessorMockRecorder is the mock recorder for Processor. +type ProcessorMockRecorder struct { + mock *Processor +} + +// NewProcessor creates a new mock instance. +func NewProcessor(ctrl *gomock.Controller) *Processor { + mock := &Processor{ctrl: ctrl} + mock.recorder = &ProcessorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *Processor) EXPECT() *ProcessorMockRecorder { + return m.recorder +} + +// Configure mocks base method. +func (m *Processor) Configure(arg0 context.Context, arg1 map[string]string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Configure", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// Configure indicates an expected call of Configure. +func (mr *ProcessorMockRecorder) Configure(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Configure", reflect.TypeOf((*Processor)(nil).Configure), arg0, arg1) +} + +// Open mocks base method. +func (m *Processor) Open(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Open", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Open indicates an expected call of Open. +func (mr *ProcessorMockRecorder) Open(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Open", reflect.TypeOf((*Processor)(nil).Open), arg0) +} + +// Process mocks base method. +func (m *Processor) Process(arg0 context.Context, arg1 []opencdc.Record) []sdk.ProcessedRecord { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Process", arg0, arg1) + ret0, _ := ret[0].([]sdk.ProcessedRecord) + return ret0 +} + +// Process indicates an expected call of Process. +func (mr *ProcessorMockRecorder) Process(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Process", reflect.TypeOf((*Processor)(nil).Process), arg0, arg1) +} + +// Specification mocks base method. +func (m *Processor) Specification() (sdk.Specification, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Specification") + ret0, _ := ret[0].(sdk.Specification) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Specification indicates an expected call of Specification. +func (mr *ProcessorMockRecorder) Specification() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Specification", reflect.TypeOf((*Processor)(nil).Specification)) +} + +// Teardown mocks base method. +func (m *Processor) Teardown(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Teardown", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Teardown indicates an expected call of Teardown. +func (mr *ProcessorMockRecorder) Teardown(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Teardown", reflect.TypeOf((*Processor)(nil).Teardown), arg0) +} + +// mustEmbedUnimplementedProcessor mocks base method. +func (m *Processor) mustEmbedUnimplementedProcessor() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "mustEmbedUnimplementedProcessor") +} + +// mustEmbedUnimplementedProcessor indicates an expected call of mustEmbedUnimplementedProcessor. +func (mr *ProcessorMockRecorder) mustEmbedUnimplementedProcessor() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "mustEmbedUnimplementedProcessor", reflect.TypeOf((*Processor)(nil).mustEmbedUnimplementedProcessor)) +} diff --git a/pkg/plugin/processor/processor.go b/pkg/plugin/processor/processor.go new file mode 100644 index 000000000..008f34555 --- /dev/null +++ b/pkg/plugin/processor/processor.go @@ -0,0 +1,20 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This generates a mock processor and adds UnimplementedProcessor to it to +// satisfy the sdk.Processor interface. +//go:generate mockgen -destination=mock/processor.go -package=mock -mock_names=Processor=Processor github.com/conduitio/conduit-processor-sdk Processor +//go:generate sed -i "" -e "/type Processor struct {/a\\\n sdk.UnimplementedProcessor" mock/processor.go + +package processor diff --git a/pkg/plugin/processor/standalone/host.go b/pkg/plugin/processor/standalone/host.go new file mode 100644 index 000000000..719bb73ed --- /dev/null +++ b/pkg/plugin/processor/standalone/host.go @@ -0,0 +1,145 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package standalone + +import ( + "context" + + processorv1 "github.com/conduitio/conduit-processor-sdk/proto/processor/v1" + "github.com/conduitio/conduit-processor-sdk/wasm" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/stealthrocket/wazergo" + "github.com/stealthrocket/wazergo/types" + "google.golang.org/protobuf/proto" +) + +// hostModule declares the host module that is exported to the WASM module. The +// host module is used to communicate between the WASM module (processor) and Conduit. +var hostModule wazergo.HostModule[*hostModuleInstance] = hostModuleFunctions{ + "command_request": wazergo.F1((*hostModuleInstance).commandRequest), + "command_response": wazergo.F1((*hostModuleInstance).commandResponse), +} + +// hostModuleFunctions type implements HostModule, providing the module name, +// map of exported functions, and the ability to create instances of the module +// type. +type hostModuleFunctions wazergo.Functions[*hostModuleInstance] + +// Name returns the name of the module. +func (f hostModuleFunctions) Name() string { + return "conduit" +} + +// Functions is a helper that returns the exported functions of the module. +func (f hostModuleFunctions) Functions() wazergo.Functions[*hostModuleInstance] { + return (wazergo.Functions[*hostModuleInstance])(f) +} + +// Instantiate creates a new instance of the module. This is called by the +// runtime when a new instance of the module is created. +func (f hostModuleFunctions) Instantiate(_ context.Context, opts ...hostModuleOption) (*hostModuleInstance, error) { + mod := &hostModuleInstance{} + wazergo.Configure(mod, opts...) + if mod.commandRequests == nil { + return nil, cerrors.New("missing command requests channel") + } + if mod.commandResponses == nil { + return nil, cerrors.New("missing command responses channel") + } + return mod, nil +} + +type hostModuleOption = wazergo.Option[*hostModuleInstance] + +func hostModuleOptions( + logger log.CtxLogger, + requests <-chan *processorv1.CommandRequest, + responses chan<- tuple[*processorv1.CommandResponse, error], +) hostModuleOption { + return wazergo.OptionFunc(func(m *hostModuleInstance) { + m.logger = logger + m.commandRequests = requests + m.commandResponses = responses + }) +} + +// hostModuleInstance is used to maintain the state of our module instance. +type hostModuleInstance struct { + logger log.CtxLogger + commandRequests <-chan *processorv1.CommandRequest + commandResponses chan<- tuple[*processorv1.CommandResponse, error] + + parkedCommandRequest *processorv1.CommandRequest +} + +func (*hostModuleInstance) Close(context.Context) error { return nil } + +// commandRequest is the exported function that is called by the WASM module to +// get the next command request. It returns the size of the command request +// message. If the buffer is too small, it returns the size of the command +// request message and parks the command request. The next call to this function +// will return the same command request. +func (m *hostModuleInstance) commandRequest(ctx context.Context, buf types.Bytes) types.Uint32 { + m.logger.Trace(ctx).Msg("getting next command") + + if m.parkedCommandRequest == nil { + // No parked command, so we need to wait for the next one. If the command + // channel is closed, then we return an error. + var ok bool + m.parkedCommandRequest, ok = <-m.commandRequests + if !ok { + return wasm.ErrorCodeNoMoreCommands + } + } + + // If the buffer is too small, we park the command and return the size of the + // command. The next call to nextCommand will return the same command. + if size := proto.Size(m.parkedCommandRequest); len(buf) < size { + m.logger.Warn(ctx). + Int("command_bytes", size). + Int("allocated_bytes", len(buf)). + Msgf("insufficient memory, command will be parked until next call to nextCommand") + return types.Uint32(size) + } + + // If the buffer is large enough, we marshal the command into the buffer and + // return the size of the command. The next call to nextCommand will return + // the next command. + out, err := proto.MarshalOptions{}.MarshalAppend(buf[:0], m.parkedCommandRequest) + if err != nil { + m.logger.Err(ctx, err).Msg("failed marshalling protobuf command request") + return wasm.ErrorCodeUnknownCommandRequest + } + m.parkedCommandRequest = nil + + m.logger.Trace(ctx).Msg("returning next command") + return types.Uint32(len(out)) +} + +// commandResponse is the exported function that is called by the WASM module to +// send a command response. It returns 0 on success, or an error code on error. +func (m *hostModuleInstance) commandResponse(ctx context.Context, buf types.Bytes) types.Uint32 { + var resp processorv1.CommandResponse + err := proto.Unmarshal(buf, &resp) + if err != nil { + m.logger.Err(ctx, err).Msg("failed unmarshalling protobuf command response") + m.commandResponses <- tuple[*processorv1.CommandResponse, error]{nil, err} + return wasm.ErrorCodeUnknownCommandResponse + } + + m.commandResponses <- tuple[*processorv1.CommandResponse, error]{&resp, nil} + return 0 +} diff --git a/pkg/plugin/processor/standalone/logger.go b/pkg/plugin/processor/standalone/logger.go new file mode 100644 index 000000000..037c4fa7f --- /dev/null +++ b/pkg/plugin/processor/standalone/logger.go @@ -0,0 +1,99 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package standalone + +import ( + "io" + + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/tetratelabs/wazero" + + "github.com/goccy/go-json" + "github.com/rs/zerolog" +) + +// wasmLogWriter is a logger adapter for the WASM stderr and stdout streams. +type wasmLogWriter struct { + logger zerolog.Logger +} + +var _ io.Writer = (*wasmLogWriter)(nil) + +func newWasmLogWriter(logger log.CtxLogger, module wazero.CompiledModule) wasmLogWriter { + name := module.Name() + if name == "" { + // no module name, use the component name instead + name = logger.Component() + ".module" + } + logger = logger.WithComponent(name) + return wasmLogWriter{logger: logger.ZerologWithComponent()} +} + +func (l wasmLogWriter) Write(p []byte) (int, error) { + err := l.emitJSONEvent(p) + if err != nil { + // fallback to writing the bytes as-is + return l.logger.Write(p) + } + return len(p), nil +} + +func (l wasmLogWriter) emitJSONEvent(p []byte) error { + var raw map[string]any + err := json.Unmarshal(p, &raw) + if err != nil { + return err + } + + var ( + level = zerolog.DebugLevel // default + msg = "" + ) + + // parse level + if v, ok := raw[zerolog.LevelFieldName]; ok { + delete(raw, zerolog.LevelFieldName) + if s, ok := v.(string); ok { + parsedLvl, err := zerolog.ParseLevel(s) + if err == nil { + level = parsedLvl + } + } + } + + // prepare log event + e := l.logger.WithLevel(level) + if !e.Enabled() { + return nil + } + + // parse message + if v, ok := raw[zerolog.MessageFieldName]; ok { + delete(raw, zerolog.MessageFieldName) + if s, ok := v.(string); ok { + msg = s + } + } + + delete(raw, zerolog.TimestampFieldName) // don't duplicate timestamp, it's added by zerolog + + // parse unknown fields + for k, v := range raw { + e.Any(k, v) + } + + e.Msg(msg) + return nil +} diff --git a/pkg/plugin/processor/standalone/processor.go b/pkg/plugin/processor/standalone/processor.go new file mode 100644 index 000000000..a8d7a1566 --- /dev/null +++ b/pkg/plugin/processor/standalone/processor.go @@ -0,0 +1,347 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package standalone + +import ( + "context" + "fmt" + "time" + + "github.com/tetratelabs/wazero/api" + + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/tetratelabs/wazero/sys" + + "github.com/conduitio/conduit/pkg/foundation/log" + + "github.com/conduitio/conduit-commons/opencdc" + sdk "github.com/conduitio/conduit-processor-sdk" + processorv1 "github.com/conduitio/conduit-processor-sdk/proto/processor/v1" + "github.com/stealthrocket/wazergo" + "github.com/tetratelabs/wazero" +) + +const ( + // magicCookieKey and value are used as a very basic verification + // that a plugin is intended to be launched. This is not a security + // measure, just a UX feature. If the magic cookie doesn't match, + // we show human-friendly output. + magicCookieKey = "CONDUIT_MAGIC_COOKIE" + magicCookieValue = "3stnegqd0x02axggy0vrc4izjeq2zik6g7somyb3ye4vy5iivvjm5s1edppl5oja" + + conduitProcessorIDKey = "CONDUIT_PROCESSOR_ID" + conduitLogLevelKey = "CONDUIT_LOG_LEVEL" +) + +type wasmProcessor struct { + sdk.UnimplementedProcessor + protoconv protoConverter + + id string + logger log.CtxLogger + + // module is the WASM module that implements the processor + module api.Module + + // commandRequests is used to send commands to the actual processor (the + // WASM module) + commandRequests chan *processorv1.CommandRequest + // commandResponses is used to communicate replies between the actual + // processor (the WASM module) and wasmProcessor + commandResponses chan tuple[*processorv1.CommandResponse, error] + // moduleStopped is used to know when the module stopped running + moduleStopped chan struct{} +} + +type tuple[T1, T2 any] struct { + V1 T1 + V2 T2 +} + +func newWASMProcessor( + ctx context.Context, + + runtime wazero.Runtime, + processorModule wazero.CompiledModule, + hostModule *wazergo.CompiledModule[*hostModuleInstance], + + id string, + logger log.CtxLogger, +) (*wasmProcessor, error) { + logger = logger.WithComponent("standalone.wasmProcessor") + logger.Logger = logger.With().Str(log.ProcessorIDField, id).Logger() + wasmLogger := newWasmLogWriter(logger, processorModule) + + commandRequests := make(chan *processorv1.CommandRequest) + commandResponses := make(chan tuple[*processorv1.CommandResponse, error]) + moduleStopped := make(chan struct{}) + + // instantiate the host module and inject it into the context + logger.Debug(ctx).Msg("instantiating host module") + ins, err := hostModule.Instantiate( + ctx, + hostModuleOptions( + logger, + commandRequests, + commandResponses, + ), + ) + if err != nil { + return nil, fmt.Errorf("failed to instantiate host module: %w", err) + } + ctx = wazergo.WithModuleInstance(ctx, ins) + + logger.Debug(ctx).Msg("instantiating processor module") + mod, err := runtime.InstantiateModule( + ctx, + processorModule, + wazero.NewModuleConfig(). + WithName(id). // ensure unique module name + WithEnv(magicCookieKey, magicCookieValue). + WithEnv(conduitProcessorIDKey, id). + WithEnv(conduitLogLevelKey, logger.GetLevel().String()). + + // set up logging + WithStdout(wasmLogger). + WithStderr(wasmLogger). + + // enable time.Now to include correct wall time + WithSysWalltime(). + // enable time.Now to include correct monotonic time + WithSysNanotime(). + // enable time.Sleep to sleep for the correct amount of time + WithSysNanosleep(). + + // don't start right away + WithStartFunctions(), + ) + if err != nil { + _ = ins.Close(ctx) + return nil, fmt.Errorf("failed to instantiate processor module: %w", err) + } + + // Needs to run in a goroutine because the WASM module is blocking as long + // as the "main" function is running + go func() { + defer close(moduleStopped) + + _, err := mod.ExportedFunction("_start").Call(ctx) + + // main function returned, close the module right away + _ = mod.Close(ctx) + + if err != nil { + var exitErr *sys.ExitError + if cerrors.As(err, &exitErr) { + if exitErr.ExitCode() == 0 { // All good + err = nil + } + } + } + logger.Err(ctx, err).Msg("WASM module stopped") + }() + + return &wasmProcessor{ + id: id, + logger: logger, + module: mod, + commandRequests: commandRequests, + commandResponses: commandResponses, + moduleStopped: moduleStopped, + }, nil +} + +func (p *wasmProcessor) Specification() (sdk.Specification, error) { + req := &processorv1.CommandRequest{ + Request: &processorv1.CommandRequest_Specify{ + Specify: &processorv1.Specify_Request{}, + }, + } + + // the function has no context parameter, so we need to set a timeout + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + resp, err := p.executeCommand(ctx, req) + if err != nil { + return sdk.Specification{}, err + } + + switch specResp := resp.Response.(type) { + case *processorv1.CommandResponse_Specify: + return p.protoconv.specification(specResp.Specify), nil + default: + return sdk.Specification{}, fmt.Errorf("unexpected response type: %T", resp) + } +} + +func (p *wasmProcessor) Configure(ctx context.Context, config map[string]string) error { + req := &processorv1.CommandRequest{ + Request: &processorv1.CommandRequest_Configure{ + Configure: &processorv1.Configure_Request{ + Parameters: config, + }, + }, + } + + resp, err := p.executeCommand(ctx, req) + if err != nil { + return err + } + + switch resp.Response.(type) { + case *processorv1.CommandResponse_Configure: + return nil + default: + return fmt.Errorf("unexpected response type: %T", resp) + } +} + +func (p *wasmProcessor) Open(ctx context.Context) error { + req := &processorv1.CommandRequest{ + Request: &processorv1.CommandRequest_Open{ + Open: &processorv1.Open_Request{}, + }, + } + + resp, err := p.executeCommand(ctx, req) + if err != nil { + return err + } + + switch resp.Response.(type) { + case *processorv1.CommandResponse_Open: + return nil + default: + return fmt.Errorf("unexpected response type: %T", resp) + } +} + +func (p *wasmProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord { + protoRecords, err := p.protoconv.records(records) + if err != nil { + p.logger.Err(ctx, err).Msg("failed to convert records to proto") + return []sdk.ProcessedRecord{sdk.ErrorRecord{Error: err}} + } + + req := &processorv1.CommandRequest{ + Request: &processorv1.CommandRequest_Process{ + Process: &processorv1.Process_Request{ + Records: protoRecords, + }, + }, + } + + resp, err := p.executeCommand(ctx, req) + if err != nil { + return []sdk.ProcessedRecord{sdk.ErrorRecord{Error: err}} + } + + switch procResp := resp.Response.(type) { + case *processorv1.CommandResponse_Process: + processedRecords, err := p.protoconv.processedRecords(procResp.Process.Records) + if err != nil { + p.logger.Err(ctx, err).Msg("failed to convert processed records from proto") + return []sdk.ProcessedRecord{sdk.ErrorRecord{Error: err}} + } + return processedRecords + default: + err := fmt.Errorf("unexpected response type: %T", resp) + return []sdk.ProcessedRecord{sdk.ErrorRecord{Error: err}} + } +} + +func (p *wasmProcessor) Teardown(ctx context.Context) error { + // TODO: we should probably have a timeout for the teardown command in case + // the plugin is stuck + teardownErr := p.executeTeardownCommand(ctx) + // close module regardless of teardown error + stopErr := p.closeModule(ctx) + + return cerrors.Join(teardownErr, stopErr) +} + +func (p *wasmProcessor) executeTeardownCommand(ctx context.Context) error { + req := &processorv1.CommandRequest{ + Request: &processorv1.CommandRequest_Teardown{ + Teardown: &processorv1.Teardown_Request{}, + }, + } + + resp, err := p.executeCommand(ctx, req) + + if err != nil { + return err + } + switch resp.Response.(type) { + case *processorv1.CommandResponse_Teardown: + return nil + default: + return fmt.Errorf("unexpected response type: %T", resp) + } +} + +func (p *wasmProcessor) closeModule(ctx context.Context) error { + // Closing the command channel will send an error code to the WASM module + // signaling it to exit. + close(p.commandRequests) + + select { + case <-ctx.Done(): + // kill the plugin + p.logger.Error(ctx).Msg("context canceled while waiting for teardown, killing plugin") + err := p.module.CloseWithExitCode(ctx, 1) + if err != nil { + return fmt.Errorf("failed to kill processor plugin: %w", err) + } + return ctx.Err() + case <-p.moduleStopped: + return nil + } +} + +// executeCommand sends a command request to the WASM module and waits for the +// response. It returns the response, or an error if the response is an error. +// If the context is canceled, it returns ctx.Err(). +func (p *wasmProcessor) executeCommand(ctx context.Context, req *processorv1.CommandRequest) (*processorv1.CommandResponse, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case p.commandRequests <- req: + } + + // wait for the response from the WASM module + var resp *processorv1.CommandResponse + var err error + select { + case <-ctx.Done(): + // TODO if this happens we should probably kill the plugin, as it's + // probably stuck + return nil, ctx.Err() + case crTuple := <-p.commandResponses: + resp, err = crTuple.V1, crTuple.V2 + } + + if err != nil { + return nil, err + } + + // check if the response is an error + if errResp, ok := resp.Response.(*processorv1.CommandResponse_Error); ok { + return nil, p.protoconv.error(errResp.Error) + } + + return resp, nil +} diff --git a/pkg/plugin/processor/standalone/wasm_processor_test.go b/pkg/plugin/processor/standalone/processor_test.go similarity index 60% rename from pkg/plugin/processor/standalone/wasm_processor_test.go rename to pkg/plugin/processor/standalone/processor_test.go index 79665e3d9..8d1b4a45c 100644 --- a/pkg/plugin/processor/standalone/wasm_processor_test.go +++ b/pkg/plugin/processor/standalone/processor_test.go @@ -16,44 +16,41 @@ package standalone import ( "context" - //nolint:depguard // needed to test external error - "errors" "testing" sdk "github.com/conduitio/conduit-processor-sdk" + "github.com/conduitio/conduit-processor-sdk/wasm" + "github.com/conduitio/conduit/pkg/foundation/log" "github.com/matryer/is" "github.com/rs/zerolog" ) -func TestWASMProcessor_MalformedProcessor(t *testing.T) { - is := is.New(t) - ctx := context.Background() - - _, err := NewWASMProcessor(ctx, zerolog.Nop(), testPluginDir+"malformed_processor/processor.txt") - is.True(err != nil) - is.Equal(err.Error(), "failed running WASM module: failed compiling WASM module: invalid magic number") -} - func TestWASMProcessor_SpecifyError(t *testing.T) { is := is.New(t) ctx := context.Background() + logger := log.New(zerolog.New(zerolog.NewTestWriter(t))) - underTest, err := NewWASMProcessor(ctx, zerolog.Nop(), testPluginDir+"specify_error/processor.wasm") + r, hostModule := NewTestWazeroRuntime(ctx, t) + procModule, err := r.CompileModule(ctx, SpecifyError) + is.NoErr(err) + + underTest, err := newWASMProcessor(ctx, r, procModule, hostModule, "test-processor", logger) is.NoErr(err) _, err = underTest.Specification() - is.Equal(err, errors.New("boom")) + is.Equal(err, wasm.NewError(0, "boom")) } func TestWASMProcessor_Specify(t *testing.T) { is := is.New(t) ctx := context.Background() + logger := log.New(zerolog.New(zerolog.NewTestWriter(t))) - underTest, err := NewWASMProcessor( - ctx, - zerolog.New(zerolog.NewTestWriter(t)), - testPluginDir+"simple_processor/processor.wasm", - ) + r, hostModule := NewTestWazeroRuntime(ctx, t) + procModule, err := r.CompileModule(ctx, SimpleProcessor) + is.NoErr(err) + + underTest, err := newWASMProcessor(ctx, r, procModule, hostModule, "test-processor", logger) is.NoErr(err) gotSpec, err := underTest.Specification() @@ -85,3 +82,21 @@ func TestWASMProcessor_Specify(t *testing.T) { is.NoErr(underTest.Teardown(ctx)) } + +func TestWASMProcessor_Configure(t *testing.T) { + is := is.New(t) + ctx := context.Background() + logger := log.New(zerolog.New(zerolog.NewTestWriter(t))) + + r, hostModule := NewTestWazeroRuntime(ctx, t) + procModule, err := r.CompileModule(ctx, SimpleProcessor) + is.NoErr(err) + + underTest, err := newWASMProcessor(ctx, r, procModule, hostModule, "test-processor", logger) + is.NoErr(err) + + err = underTest.Configure(ctx, nil) + is.NoErr(err) + + is.NoErr(underTest.Teardown(ctx)) +} diff --git a/pkg/plugin/processor/standalone/proto.go b/pkg/plugin/processor/standalone/proto.go new file mode 100644 index 000000000..dc75c005e --- /dev/null +++ b/pkg/plugin/processor/standalone/proto.go @@ -0,0 +1,153 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package standalone + +import ( + "fmt" + + "github.com/conduitio/conduit-commons/opencdc" + opencdcv1 "github.com/conduitio/conduit-commons/proto/opencdc/v1" + sdk "github.com/conduitio/conduit-processor-sdk" + processorv1 "github.com/conduitio/conduit-processor-sdk/proto/processor/v1" + "github.com/conduitio/conduit-processor-sdk/wasm" +) + +// protoConverter converts between the SDK and protobuf types. +type protoConverter struct{} + +func (c protoConverter) specification(resp *processorv1.Specify_Response) sdk.Specification { + return sdk.Specification{ + Name: resp.Name, + Summary: resp.Summary, + Description: resp.Description, + Version: resp.Version, + Author: resp.Author, + Parameters: c.specificationParameters(resp.Parameters), + } +} + +func (c protoConverter) specificationParameters(in map[string]*processorv1.Specify_Parameter) map[string]sdk.Parameter { + if in == nil { + return nil + } + + out := make(map[string]sdk.Parameter, len(in)) + for name, param := range in { + out[name] = sdk.Parameter{ + Default: param.Default, + Type: sdk.ParameterType(param.Type), + Description: param.Description, + Validations: c.specificationParameterValidations(param.Validations), + } + } + + return out +} + +func (c protoConverter) specificationParameterValidations(in []*processorv1.Specify_Parameter_Validation) []sdk.Validation { + if in == nil { + return nil + } + + out := make([]sdk.Validation, len(in)) + for i, v := range in { + out[i] = sdk.Validation{ + Type: sdk.ValidationType(v.Type), + Value: v.Value, + } + } + + return out +} + +func (c protoConverter) records(in []opencdc.Record) ([]*opencdcv1.Record, error) { + if in == nil { + return nil, nil + } + + out := make([]*opencdcv1.Record, len(in)) + for i, r := range in { + out[i] = &opencdcv1.Record{} + err := r.ToProto(out[i]) + if err != nil { + return nil, err + } + } + + return out, nil +} + +func (c protoConverter) processedRecords(in []*processorv1.Process_ProcessedRecord) ([]sdk.ProcessedRecord, error) { + if in == nil { + return nil, nil + } + + out := make([]sdk.ProcessedRecord, len(in)) + var err error + for i, r := range in { + out[i], err = c.processedRecord(r) + if err != nil { + return nil, err + } + } + + return out, nil +} + +func (c protoConverter) processedRecord(in *processorv1.Process_ProcessedRecord) (sdk.ProcessedRecord, error) { + if in == nil || in.Record == nil { + return nil, nil + } + + switch v := in.Record.(type) { + case *processorv1.Process_ProcessedRecord_SingleRecord: + return c.singleRecord(v) + case *processorv1.Process_ProcessedRecord_FilterRecord: + return c.filterRecord(v) + case *processorv1.Process_ProcessedRecord_ErrorRecord: + return c.errorRecord(v) + default: + return nil, fmt.Errorf("unknown processed record type: %T", in.Record) + } +} + +func (c protoConverter) singleRecord(in *processorv1.Process_ProcessedRecord_SingleRecord) (sdk.SingleRecord, error) { + if in == nil { + return sdk.SingleRecord{}, nil + } + + var rec opencdc.Record + err := rec.FromProto(in.SingleRecord) + if err != nil { + return sdk.SingleRecord{}, err + } + + return sdk.SingleRecord(rec), nil +} + +func (c protoConverter) filterRecord(_ *processorv1.Process_ProcessedRecord_FilterRecord) (sdk.FilterRecord, error) { + return sdk.FilterRecord{}, nil +} + +func (c protoConverter) errorRecord(in *processorv1.Process_ProcessedRecord_ErrorRecord) (sdk.ErrorRecord, error) { + if in == nil || in.ErrorRecord == nil || in.ErrorRecord.Error == nil { + return sdk.ErrorRecord{}, nil + } + return sdk.ErrorRecord{Error: c.error(in.ErrorRecord.Error)}, nil +} + +func (c protoConverter) error(e *processorv1.Error) error { + return wasm.NewError(e.Code, e.Message) +} diff --git a/pkg/plugin/processor/standalone/registry.go b/pkg/plugin/processor/standalone/registry.go index 0e1bb7075..9e30712fc 100644 --- a/pkg/plugin/processor/standalone/registry.go +++ b/pkg/plugin/processor/standalone/registry.go @@ -16,12 +16,10 @@ package standalone import ( "context" - "fmt" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" "github.com/conduitio/conduit/pkg/plugin" - "github.com/rs/zerolog" ) type Registry struct { @@ -45,24 +43,64 @@ func NewRegistry(logger log.CtxLogger, pluginDir string) *Registry { Registry: plugin.NewRegistry( logger, pluginDir, - func(ctx context.Context, logger zerolog.Logger, path string) (plugin.Spec, error) { - p, err := NewWASMProcessor(ctx, logger, path) - if err != nil { - return nil, fmt.Errorf("failed creating a new WASM processor: %w", err) - } - defer func() { - err := p.Teardown(ctx) - if err != nil { - logger.Warn().Err(err).Msg("processor teardown failed") - } - }() - - s, err := p.Specification() - if err != nil { - return nil, err - } - return &spec{Specification: s}, nil + func(ctx context.Context, logger log.CtxLogger, path string) (plugin.Spec, error) { + // p, err := NewWASMProcessor(ctx, logger, path) + // if err != nil { + // return nil, fmt.Errorf("failed creating a new WASM processor: %w", err) + // } + // defer func() { + // err := p.Teardown(ctx) + // if err != nil { + // logger.Warn(ctx).Err(err).Msg("processor teardown failed") + // } + // }() + // + // s, err := p.Specification() + // if err != nil { + // return nil, err + // } + // return &spec{Specification: s}, nil + panic("not implemented") }, ), } } + +// r := wazero.NewRuntimeWithConfig( +// ctx, +// wazero.NewRuntimeConfig().WithCloseOnContextDone(true), +// ) +// defer func() { +// if err != nil { +// // If there was an error, close the runtime +// closeErr := r.Close(ctx) +// if closeErr != nil { +// logger.Err(ctx, closeErr).Msg("failed to close wazero runtime") +// } +// } +// }() +// +// _, err = wasi_snapshot_preview1.Instantiate(ctx, r) +// if err != nil { +// return nil, cerrors.Errorf("failed instantiating wasi_snapshot_preview1: %w", err) +// } + +// p.logger. +// Debug(ctx). +// Str("path", path). +// Msg("running WASM processor") +// +// wasmBytes, err := os.ReadFile(path) +// if err != nil { +// return fmt.Errorf("failed reading WASM file %s: %w", path, err) +// } +// +// // Compiling a module helps check for some errors early on +// p.logger. +// Debug(ctx). +// Str("path", path). +// Msg("compiling module") +// mod, err := p.runtime.CompileModule(ctx, wasmBytes) +// if err != nil { +// return fmt.Errorf("failed compiling WASM module: %w", err) +// } diff --git a/pkg/plugin/processor/standalone/standalone_test.go b/pkg/plugin/processor/standalone/standalone_test.go index f2aa01693..7455a2576 100644 --- a/pkg/plugin/processor/standalone/standalone_test.go +++ b/pkg/plugin/processor/standalone/standalone_test.go @@ -15,25 +15,67 @@ package standalone import ( + "context" "fmt" "os" "os/exec" "testing" + + "github.com/matryer/is" + "github.com/stealthrocket/wazergo" + "github.com/tetratelabs/wazero" + "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" +) + +const ( + testPluginDir = "./test/wasm_processors/" ) -var testPluginDir = "./test/wasm_processors/" +var ( + MalformedProcessor = []byte("foobar") + SimpleProcessor []byte // contents of ./test/wasm_processors/simple_processor/processor.wasm + SpecifyError []byte // contents of ./test/wasm_processors/specify_error/processor.wasm +) func TestMain(m *testing.M) { + exitOnError := func(err error, msg string) { + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "%v: %v", msg, err) + os.Exit(1) + } + } + cmd := exec.Command("bash", "./test/build-test-processors.sh") cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr - // Run the command + err := cmd.Run() - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "error executing bash script: %v", err) - os.Exit(1) - } + exitOnError(err, "error executing bash script") + + SimpleProcessor, err = os.ReadFile("./test/wasm_processors/simple_processor/processor.wasm") + exitOnError(err, "error reading file ./test/wasm_processors/simple_processor/processor.wasm") + + SpecifyError, err = os.ReadFile("./test/wasm_processors/specify_error/processor.wasm") + exitOnError(err, "error reading file ./test/wasm_processors/specify_error/processor.wasm") os.Exit(m.Run()) } + +func NewTestWazeroRuntime(ctx context.Context, t *testing.T) (wazero.Runtime, *wazergo.CompiledModule[*hostModuleInstance]) { + is := is.New(t) + + r := wazero.NewRuntime(ctx) + t.Cleanup(func() { + err := r.Close(ctx) + is.NoErr(err) + }) + + _, err := wasi_snapshot_preview1.Instantiate(ctx, r) + is.NoErr(err) + + m, err := wazergo.Compile(ctx, r, hostModule) + is.NoErr(err) + + return r, m +} diff --git a/pkg/plugin/processor/standalone/test/wasm_processors/malformed_processor/processor.txt b/pkg/plugin/processor/standalone/test/wasm_processors/malformed_processor/processor.txt deleted file mode 100644 index f6ea04951..000000000 --- a/pkg/plugin/processor/standalone/test/wasm_processors/malformed_processor/processor.txt +++ /dev/null @@ -1 +0,0 @@ -foobar \ No newline at end of file diff --git a/pkg/plugin/processor/standalone/test/wasm_processors/simple_processor/processor.go b/pkg/plugin/processor/standalone/test/wasm_processors/simple_processor/processor.go index 3ff71025d..7b2efbd97 100644 --- a/pkg/plugin/processor/standalone/test/wasm_processors/simple_processor/processor.go +++ b/pkg/plugin/processor/standalone/test/wasm_processors/simple_processor/processor.go @@ -12,21 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build wasm + package main import ( "context" + "fmt" + "time" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" - "github.com/conduitio/conduit-processor-sdk/run" ) func main() { - run.Run(&testProcessor{}) + sdk.Run(&testProcessor{}) } type testProcessor struct { + sdk.UnimplementedProcessor } func (p *testProcessor) Specification() (sdk.Specification, error) { @@ -52,9 +56,11 @@ func (p *testProcessor) Specification() (sdk.Specification, error) { }, nil } -func (p *testProcessor) Configure(context.Context, map[string]string) error { - // TODO implement me - panic("implement me") +func (p *testProcessor) Configure(ctx context.Context, _ map[string]string) error { + fmt.Println("calling log!") + time.Sleep(time.Second) + sdk.Logger(ctx).Warn().Str("foo", "bar").Msg("hello world") + return nil } func (p *testProcessor) Open(context.Context) error { diff --git a/pkg/plugin/processor/standalone/test/wasm_processors/specify_error/processor.go b/pkg/plugin/processor/standalone/test/wasm_processors/specify_error/processor.go index 0f80f95e3..5329666d2 100644 --- a/pkg/plugin/processor/standalone/test/wasm_processors/specify_error/processor.go +++ b/pkg/plugin/processor/standalone/test/wasm_processors/specify_error/processor.go @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build wasm + package main import ( @@ -19,15 +21,15 @@ import ( "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" - "github.com/conduitio/conduit-processor-sdk/run" "github.com/conduitio/conduit/pkg/foundation/cerrors" ) func main() { - run.Run(&testProcessor{}) + sdk.Run(&testProcessor{}) } type testProcessor struct { + sdk.UnimplementedProcessor } func (p *testProcessor) Specification() (sdk.Specification, error) { diff --git a/pkg/plugin/processor/standalone/wasm_processor.go b/pkg/plugin/processor/standalone/wasm_processor.go deleted file mode 100644 index 8d6aa7af6..000000000 --- a/pkg/plugin/processor/standalone/wasm_processor.go +++ /dev/null @@ -1,266 +0,0 @@ -// Copyright © 2023 Meroxa, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package standalone - -import ( - "context" - "fmt" - "os" - - "github.com/conduitio/conduit-commons/opencdc" - sdk "github.com/conduitio/conduit-processor-sdk" - "github.com/conduitio/conduit-processor-sdk/serde" - "github.com/conduitio/conduit/pkg/foundation/cerrors" - "github.com/rs/zerolog" - "github.com/tetratelabs/wazero" - "github.com/tetratelabs/wazero/api" - "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" -) - -type wasmProcessor struct { - logger zerolog.Logger - - runtime wazero.Runtime - runtimeCancel context.CancelFunc - - // commandCh is used to send commands to - // the actual processor (the WASM module) - // and through exported host functions. - commandCh chan sdk.Command - // replyErr is used to communicate replies between - // the actual processor (the WASM module) and wasmProcessor - replyCh chan sdk.CommandResponse - // replyErr is used to communicate errors between - // the actual processor (the WASM module) and wasmProcessor - replyErr chan error - // runModStopped is used to know when the run module stopped - runModStopped chan struct{} -} - -func NewWASMProcessor(ctx context.Context, logger zerolog.Logger, wasmPath string) (sdk.Processor, error) { - runtimeCtx, runtimeCancel := context.WithCancel(ctx) - r := wazero.NewRuntimeWithConfig( - runtimeCtx, - wazero.NewRuntimeConfig().WithCloseOnContextDone(true), - ) - _, err := wasi_snapshot_preview1.Instantiate(ctx, r) - if err != nil { - runtimeCancel() - return nil, cerrors.Errorf("failed instantiating wasi_snapshot_preview1: %w", err) - } - - p := &wasmProcessor{ - logger: logger, - runtime: r, - runtimeCancel: runtimeCancel, - commandCh: make(chan sdk.Command), - replyCh: make(chan sdk.CommandResponse), - replyErr: make(chan error), - runModStopped: make(chan struct{}), - } - - err = p.exportFunctions(ctx) - if err != nil { - runtimeCancel() - return nil, fmt.Errorf("failed exporting processor functions: %w", err) - } - - err = p.run(ctx, wasmPath) - if err != nil { - runtimeCancel() - return nil, fmt.Errorf("failed running WASM module: %w", err) - } - - return p, nil -} - -func (p *wasmProcessor) exportFunctions(ctx context.Context) error { - // Build a host module, called `env`, which will expose - // functions which WASM processor module can use, - envBuilder := p.runtime.NewHostModuleBuilder("env") - envBuilder. - NewFunctionBuilder(). - WithFunc(p.nextCommand). - Export("nextCommand") - envBuilder. - NewFunctionBuilder(). - WithFunc(p.reply). - Export("reply") - - _, err := envBuilder.Instantiate(ctx) - return err -} - -func (p *wasmProcessor) nextCommand(ctx context.Context, m api.Module, ptr, allocSize uint32) uint32 { - p.logger.Trace().Msg("getting next command") - - cmd, ok := <-p.commandCh - if !ok { - p.logger.Info().Msg("command channel closed") - return sdk.ErrCodeNoMoreCommands - } - - bytes, err := serde.MarshalCommand(cmd) - if err != nil { - p.logger.Err(err).Msg("failed marshaling command") - p.replyErr <- fmt.Errorf("failed marshaling command: %w", err) - - return sdk.ErrCodeFailedGettingCommand - } - - return p.write(ctx, m, ptr, allocSize, bytes) -} - -func (p *wasmProcessor) write(_ context.Context, mod api.Module, ptr uint32, sizeAllocated uint32, bytes []byte) uint32 { - p.logger.Trace(). - Int("total_bytes", len(bytes)). - Uint32("allocated_size", sizeAllocated). - Str("module_name", mod.Name()). - Msgf("writing command to module memory") - - if sizeAllocated < uint32(len(bytes)) { - p.logger.Error(). - Int("total_bytes", len(bytes)). - Uint32("allocated_size", sizeAllocated). - Str("module_name", mod.Name()). - Msgf("insufficient memory") - - p.replyErr <- fmt.Errorf( - "insufficient memory allocated for reply, needed %v, allocated %v", - len(bytes), - sizeAllocated, - ) - return sdk.ErrCodeInsufficientSize - } - - bytesWritten := uint64(len(bytes)) - // The pointer is a linear memory offset, which is where we write the name. - if !mod.Memory().Write(ptr, bytes) { - p.logger.Error(). - Uint32("pointer", ptr). - Int("total_bytes", len(bytes)). - Uint32("wasm_module_mem_size", mod.Memory().Size()). - Msg("WASM module memory write is out of range") - - p.replyErr <- cerrors.New("WASM module memory write is out of range") - return sdk.ErrCodeMemoryOutOfRange - } - - p.logger.Trace().Msgf("bytes written: %v", bytesWritten) - return uint32(bytesWritten) -} - -func (p *wasmProcessor) reply(_ context.Context, m api.Module, ptr, size uint32) { - bytes, b := m.Memory().Read(ptr, size) - if !b { - p.logger.Error().Msg("failed reading reply") - p.replyErr <- cerrors.New("failed reading reply") - return - } - - cr, err := serde.UnmarshalCommandResponse(bytes) - if err != nil { - p.replyErr <- fmt.Errorf("failed deserializing command response: %w", err) - return - } - - p.replyCh <- cr -} - -// run instantiates a new module from the given path. -// Blocks as long as the module's start function is running. -func (p *wasmProcessor) run(ctx context.Context, path string) error { - p.logger. - Debug(). - Str("path", path). - Msg("running WASM processor") - - wasmBytes, err := os.ReadFile(path) - if err != nil { - return fmt.Errorf("failed reading WASM file %s: %w", path, err) - } - - // Compiling a module helps check for some errors early on - p.logger. - Debug(). - Str("path", path). - Msg("compiling module") - mod, err := p.runtime.CompileModule(ctx, wasmBytes) - if err != nil { - return fmt.Errorf("failed compiling WASM module: %w", err) - } - - // Needs to run in a goroutine because - // the WASM module is blocking as long as - // the "main" function is running - go func() { - p.logger.Debug().Msg("instantiating module") - _, err = p.runtime.InstantiateModule( - ctx, - mod, - wazero.NewModuleConfig(). - WithName("run-module"). - WithStdout(p.logger). - WithStderr(p.logger), - ) - p.runModStopped <- struct{}{} - if err != nil { - p.logger.Err(err).Msg("WASM module not instantiated or stopped") - } - }() - - return nil -} - -func (p *wasmProcessor) Specification() (sdk.Specification, error) { - p.commandCh <- &sdk.SpecifyCmd{} - - select { - case cr := <-p.replyCh: - resp, ok := cr.(*sdk.SpecifyResponse) - if !ok { - return sdk.Specification{}, fmt.Errorf("unexpected response type: %T", cr) - } - - return resp.Specification, resp.Error() - case err := <-p.replyErr: - return sdk.Specification{}, fmt.Errorf("reply error: %w", err) - } -} - -func (p *wasmProcessor) Configure(context.Context, map[string]string) error { - // TODO implement me - panic("implement me") -} - -func (p *wasmProcessor) Open(context.Context) error { - // TODO implement me - panic("implement me") -} - -func (p *wasmProcessor) Process(context.Context, []opencdc.Record) []sdk.ProcessedRecord { - // TODO implement me - panic("implement me") -} - -func (p *wasmProcessor) Teardown(context.Context) error { - // Closing the command channel will send ErrCodeFailedGettingCommand - // to the WASM module, which will exit. - // TODO handle case when the WASM module is unresponsive. - close(p.commandCh) - <-p.runModStopped - p.runtimeCancel() - return nil -} diff --git a/pkg/plugin/registry.go b/pkg/plugin/registry.go index 0339a2d75..fa8e4bbcf 100644 --- a/pkg/plugin/registry.go +++ b/pkg/plugin/registry.go @@ -23,7 +23,6 @@ import ( "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/log" - "github.com/rs/zerolog" ) type Spec interface { @@ -31,7 +30,7 @@ type Spec interface { GetVersion() string } -type GetSpecFn func(ctx context.Context, logger zerolog.Logger, path string) (Spec, error) +type GetSpecFn func(ctx context.Context, logger log.CtxLogger, path string) (Spec, error) // Registry is a generic directory registry of plugins, // organized by plugin type, name and version. @@ -124,7 +123,7 @@ func (r *Registry) loadPlugins(ctx context.Context, pluginDir string) map[string pluginPath := path.Join(pluginDir, dirEntry.Name()) // create dispenser without a logger to not spam logs on refresh - specs, err := r.getSpecs(ctx, zerolog.Nop(), pluginPath) + specs, err := r.getSpecs(ctx, r.logger, pluginPath) if err != nil { err = cerrors.Errorf("failed to get specs: %w", err) warn(ctx, err, pluginPath)