Skip to content

Commit

Permalink
BP: WASM: Load available WASM processors (#1322)
Browse files Browse the repository at this point in the history

Co-authored-by: Maha Hajja <[email protected]>

---------

Co-authored-by: Lovro Mažgon <[email protected]>
Co-authored-by: Maha Hajja <[email protected]>
  • Loading branch information
3 people authored Jan 26, 2024
1 parent 7782b08 commit 7b3087e
Show file tree
Hide file tree
Showing 28 changed files with 2,234 additions and 40 deletions.
4 changes: 1 addition & 3 deletions .github/workflows/validate-generated-files.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,5 @@ jobs:
- name: Check generated files
run: |
export PATH=$PATH:$(go env GOPATH)/bin
make install-tools
make generate
make proto-generate
make install-tools generate proto-generate
git diff --exit-code --numstat
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,6 @@ escape_analysis.txt

# Profiles
*.prof

# Compiled test wasm processors
pkg/plugin/processor/standalone/test/wasm_processors/*/processor.wasm
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ install-tools: download
@go mod tidy

generate:
go generate ./...
go generate -x ./...

pkg/web/ui/dist:
make ui-dist
Expand Down
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/conduitio/conduit

go 1.21.1
go 1.21.5

require (
buf.build/gen/go/grpc-ecosystem/grpc-gateway/protocolbuffers/go v1.32.0-20231027202514-3f42134f4c56.1
Expand All @@ -17,6 +17,7 @@ require (
github.com/conduitio/conduit-connector-protocol v0.5.1-0.20240104160905-e9e61586fb8d
github.com/conduitio/conduit-connector-s3 v0.5.1
github.com/conduitio/conduit-connector-sdk v0.8.0
github.com/conduitio/conduit-processor-sdk v0.0.0-20240118151737-a75ea9e86bb8
github.com/conduitio/yaml/v3 v3.3.0
github.com/dgraph-io/badger/v4 v4.2.0
github.com/dop251/goja v0.0.0-20230531210528-d7324b2d74f7
Expand All @@ -42,6 +43,8 @@ require (
github.com/prometheus/client_model v0.5.0
github.com/prometheus/common v0.45.0
github.com/rs/zerolog v1.31.0
github.com/stealthrocket/wazergo v0.19.1
github.com/tetratelabs/wazero v1.5.0
github.com/twmb/go-cache v1.2.0
go.uber.org/goleak v1.3.0
go.uber.org/mock v0.4.0
Expand Down Expand Up @@ -296,7 +299,6 @@ require (
github.com/t-yuki/gocover-cobertura v0.0.0-20180217150009-aaee18c8195c // indirect
github.com/tdakkota/asciicheck v0.2.0 // indirect
github.com/tetafro/godot v1.4.15 // indirect
github.com/tetratelabs/wazero v1.5.0 // indirect
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 // indirect
github.com/timonwong/loggercheck v0.9.4 // indirect
github.com/tomarrell/wrapcheck/v2 v2.8.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,8 @@ github.com/conduitio/conduit-connector-s3 v0.5.1 h1:yRo8004ryCIZc/S3iWQ1rN6pm6bj
github.com/conduitio/conduit-connector-s3 v0.5.1/go.mod h1:nbxzsyS95gbFJ28Job9vFFB+byRFINSv70/13Yi4mKQ=
github.com/conduitio/conduit-connector-sdk v0.8.0 h1:gvchqoj5d3AQsBoIosx4i32L8Ex9+5BuAyHi/IM9VD4=
github.com/conduitio/conduit-connector-sdk v0.8.0/go.mod h1:nOz4K3X6fD8YMe5CPbULwSEE18Eu02ZrpT6o6KwQfxs=
github.com/conduitio/conduit-processor-sdk v0.0.0-20240118151737-a75ea9e86bb8 h1:H6Px/c38KiId1XDsb4agp25wOlMsZM2rp4p2kxlHDKM=
github.com/conduitio/conduit-processor-sdk v0.0.0-20240118151737-a75ea9e86bb8/go.mod h1:k0rpE3kOAyDcIsBbS5vMO035XzDGW9FJsC4sgEXCH8Y=
github.com/conduitio/yaml/v3 v3.3.0 h1:kbbaOSHcuH39gP4+rgbJGl6DSbLZcJgEaBvkEXJlCsI=
github.com/conduitio/yaml/v3 v3.3.0/go.mod h1:JNgFMOX1t8W4YJuRZOh6GggVtSMsgP9XgTw+7dIenpc=
github.com/containerd/stargz-snapshotter/estargz v0.15.1 h1:eXJjw9RbkLFgioVaTG+G/ZW/0kEe2oEKCdS/ZxIyoCU=
Expand Down Expand Up @@ -1957,6 +1959,8 @@ github.com/ssgreg/nlreturn/v2 v2.2.1 h1:X4XDI7jstt3ySqGU86YGAURbxw3oTDPK9sPEi6YE
github.com/ssgreg/nlreturn/v2 v2.2.1/go.mod h1:E/iiPB78hV7Szg2YfRgyIrk1AD6JVMTRkkxBiELzh2I=
github.com/stbenjam/no-sprintf-host-port v0.1.1 h1:tYugd/yrm1O0dV+ThCbaKZh195Dfm07ysF0U6JQXczc=
github.com/stbenjam/no-sprintf-host-port v0.1.1/go.mod h1:TLhvtIvONRzdmkFiio4O8LHsN9N74I+PhRquPsxpL0I=
github.com/stealthrocket/wazergo v0.19.1 h1:BPrITETPgSFwiytwmToO0MbUC/+RGC39JScz1JmmG6c=
github.com/stealthrocket/wazergo v0.19.1/go.mod h1:riI0hxw4ndZA5e6z7PesHg2BtTftcZaMxRcoiGGipTs=
github.com/stoewer/go-strcase v1.3.0 h1:g0eASXYtp+yvN9fK8sH94oCIk0fau9uV1/ZdJ0AVEzs=
github.com/stoewer/go-strcase v1.3.0/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8wodgtPmh1xo=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
1 change: 1 addition & 0 deletions pkg/foundation/cerrors/cerrors.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var (
Is = errors.Is
As = errors.As
Unwrap = errors.Unwrap
Join = errors.Join
)

type Frame struct {
Expand Down
22 changes: 22 additions & 0 deletions pkg/foundation/log/ctxlogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ package log

import (
"context"
"reflect"
"strings"
"testing"

"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -46,6 +49,11 @@ func Nop() CtxLogger {
return CtxLogger{Logger: zerolog.Nop()}
}

// Test returns a test logger that writes to the supplied testing.TB.
func Test(t testing.TB) CtxLogger {
return CtxLogger{Logger: zerolog.New(zerolog.NewTestWriter(t))}
}

// InitLogger returns a logger initialized with the wanted level and format
func InitLogger(level zerolog.Level, f Format) CtxLogger {
var w = GetWriter(f)
Expand All @@ -67,6 +75,20 @@ func (l CtxLogger) WithComponent(component string) CtxLogger {
return l
}

func (l CtxLogger) WithComponentFromType(c any) CtxLogger {
cType := reflect.TypeOf(c)
for cType.Kind() == reflect.Ptr || cType.Kind() == reflect.Interface {
cType = cType.Elem()
}

pkgPath := cType.PkgPath()
pkgPath = strings.TrimPrefix(pkgPath, "github.com/conduitio/conduit/pkg/")
pkgPath = strings.ReplaceAll(pkgPath, "/", ".")
typeName := cType.Name()
l.component = pkgPath + "." + typeName
return l
}

func (l CtxLogger) Component() string {
return l.component
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/foundation/log/ctxlogger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,20 @@ func TestCtxLoggerComponent(t *testing.T) {
is.Equal(`{"level":"info","component":"test","message":"testing component"}`+"\n", got)
}

type testComponent struct{}

func TestCtxLoggerComponentFromType(t *testing.T) {
is := is.New(t)

logger := New(zerolog.New(zerolog.NewTestWriter(t)))

logger = logger.WithComponentFromType(testComponent{})
is.Equal("foundation.log.testComponent", logger.Component())

logger = logger.WithComponentFromType(&testComponent{})
is.Equal("foundation.log.testComponent", logger.Component())
}

func TestCtxLoggerWithoutHooks(t *testing.T) {
ctx := context.Background()

Expand Down
1 change: 1 addition & 0 deletions pkg/foundation/log/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
NodeIDField = "node_id"
ParallelWorkerIDField = "parallel_worker_id"
PipelineIDField = "pipeline_id"
ProcessorIDField = "processor_id"
RecordPositionField = "record_position"
RequestIDField = "request_id"
ServerAddressField = "address"
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugin/connector/builtin/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func NewDispenserFactory(conn sdk.Connector) DispenserFactory {
}

func NewRegistry(logger log.CtxLogger, factories map[string]DispenserFactory) *Registry {
logger = logger.WithComponent("builtin.Registry")
logger = logger.WithComponentFromType(Registry{})
buildInfo, ok := debug.ReadBuildInfo()
if !ok {
// we are using modules, build info should always be available, we are staying on the safe side
Expand Down
64 changes: 32 additions & 32 deletions pkg/plugin/connector/standalone/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ type blueprint struct {

func NewRegistry(logger log.CtxLogger, pluginDir string) *Registry {
r := &Registry{
logger: logger.WithComponent("standalone.Registry"),
logger: logger.WithComponentFromType(Registry{}),
}

if pluginDir != "" {
// extract absolute path to make it clearer in the logs what directory is used
absPluginDir, err := filepath.Abs(pluginDir)
if err != nil {
r.logger.Warn(context.Background()).Err(err).Msg("could not extract absolute plugins path")
r.logger.Warn(context.Background()).Err(err).Msg("could not extract absolute connector plugins path")
} else {
r.pluginDir = absPluginDir // store plugin dir for hot reloads
r.reloadPlugins()
Expand All @@ -67,15 +67,11 @@ func NewRegistry(logger log.CtxLogger, pluginDir string) *Registry {
r.logger.Info(context.Background()).
Str(log.PluginPathField, r.pluginDir).
Int("count", len(r.List())).
Msg("standalone plugins initialized")
Msg("standalone connector plugins initialized")

return r
}

func newFullName(pluginName, pluginVersion string) plugin.FullName {
return plugin.NewFullName(plugin.PluginTypeStandalone, pluginName, pluginVersion)
}

func (r *Registry) reloadPlugins() {
plugins := r.loadPlugins(context.Background(), r.pluginDir)
r.m.Lock()
Expand All @@ -84,19 +80,19 @@ func (r *Registry) reloadPlugins() {
}

func (r *Registry) loadPlugins(ctx context.Context, pluginDir string) map[string]map[string]blueprint {
r.logger.Debug(ctx).Msgf("loading plugins from directory %v", pluginDir)
r.logger.Debug(ctx).Msgf("loading connector plugins from directory %v", pluginDir)
plugins := make(map[string]map[string]blueprint)

dirEntries, err := os.ReadDir(pluginDir)
if err != nil {
r.logger.Warn(ctx).Err(err).Msg("could not read plugin directory")
r.logger.Warn(ctx).Err(err).Msg("could not read connector plugin directory")
return plugins // return empty map
}
warn := func(ctx context.Context, err error, pluginPath string) {
r.logger.Warn(ctx).
Err(err).
Str(log.PluginPathField, pluginPath).
Msgf("could not load standalone plugin")
Msgf("could not load standalone connector plugin")
}

for _, dirEntry := range dirEntries {
Expand All @@ -107,24 +103,8 @@ func (r *Registry) loadPlugins(ctx context.Context, pluginDir string) map[string

pluginPath := path.Join(pluginDir, dirEntry.Name())

// create dispenser without a logger to not spam logs on refresh
dispenser, err := standalonev1.NewDispenser(zerolog.Nop(), pluginPath)
if err != nil {
err = cerrors.Errorf("failed to create dispenser: %w", err)
warn(ctx, err, pluginPath)
continue
}

specPlugin, err := dispenser.DispenseSpecifier()
if err != nil {
err = cerrors.Errorf("failed to dispense specifier (tip: check if the file is a valid plugin binary and if you have permissions for running it): %w", err)
warn(ctx, err, pluginPath)
continue
}

specs, err := specPlugin.Specify()
specs, err := r.loadSpecifications(pluginPath)
if err != nil {
err = cerrors.Errorf("failed to get specs: %w", err)
warn(ctx, err, pluginPath)
continue
}
Expand All @@ -135,9 +115,9 @@ func (r *Registry) loadPlugins(ctx context.Context, pluginDir string) map[string
plugins[specs.Name] = versionMap
}

fullName := newFullName(specs.Name, specs.Version)
fullName := plugin.NewFullName(plugin.PluginTypeStandalone, specs.Name, specs.Version)
if conflict, ok := versionMap[specs.Version]; ok {
err = cerrors.Errorf("conflict detected, plugin %v already registered, please remove either %v or %v, these plugins won't be usable until that happens", fullName, conflict.path, pluginPath)
err = cerrors.Errorf("conflict detected, connector plugin %v already registered, please remove either %v or %v, these plugins won't be usable until that happens", fullName, conflict.path, pluginPath)
warn(ctx, err, pluginPath)
// delete plugin from map at the end so that further duplicates can
// still be found
Expand All @@ -163,18 +143,38 @@ func (r *Registry) loadPlugins(ctx context.Context, pluginDir string) map[string
r.logger.Debug(ctx).
Str(log.PluginPathField, pluginPath).
Str(log.PluginNameField, string(bp.fullName)).
Msg("set plugin as latest")
Msg("set connector plugin as latest")
}

r.logger.Debug(ctx).
Str(log.PluginPathField, pluginPath).
Str(log.PluginNameField, string(bp.fullName)).
Msg("loaded standalone plugin")
Msg("loaded standalone connector plugin")
}

return plugins
}

func (r *Registry) loadSpecifications(pluginPath string) (connector.Specification, error) {
// create dispenser without a logger to not spam logs on refresh
dispenser, err := standalonev1.NewDispenser(zerolog.Nop(), pluginPath)
if err != nil {
return connector.Specification{}, cerrors.Errorf("failed to create connector dispenser: %w", err)
}

specPlugin, err := dispenser.DispenseSpecifier()
if err != nil {
return connector.Specification{}, cerrors.Errorf("failed to dispense connector specifier (tip: check if the file is a valid connector plugin binary and if you have permissions for running it): %w", err)
}

specs, err := specPlugin.Specify()
if err != nil {
return connector.Specification{}, cerrors.Errorf("failed to get connector specs: %w", err)
}

return specs, nil
}

func (r *Registry) NewDispenser(logger log.CtxLogger, fullName plugin.FullName) (connector.Dispenser, error) {
r.m.RLock()
defer r.m.RUnlock()
Expand All @@ -189,7 +189,7 @@ func (r *Registry) NewDispenser(logger log.CtxLogger, fullName plugin.FullName)
for k := range versionMap {
availableVersions = append(availableVersions, k)
}
return nil, cerrors.Errorf("could not find standalone plugin, only found versions %v: %w", availableVersions, plugin.ErrPluginNotFound)
return nil, cerrors.Errorf("could not find standalone connector plugin, only found versions %v: %w", availableVersions, plugin.ErrPluginNotFound)
}

return standalonev1.NewDispenser(logger.ZerologWithComponent(), bp.path)
Expand Down
Loading

0 comments on commit 7b3087e

Please sign in to comment.