Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BP: WASM: Load available WASM processors #1322

Merged
merged 19 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions .github/workflows/validate-generated-files.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,5 @@ jobs:
- name: Check generated files
run: |
export PATH=$PATH:$(go env GOPATH)/bin
make install-tools
make generate
make proto-generate
make install-tools generate proto-generate
git diff --exit-code --numstat
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,6 @@ escape_analysis.txt

# Profiles
*.prof

# Compiled test wasm processors
pkg/plugin/processor/standalone/test/wasm_processors/*/processor.wasm
lovromazgon marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ install-tools: download
@go mod tidy

generate:
go generate ./...
go generate -x ./...

pkg/web/ui/dist:
make ui-dist
Expand Down
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/conduitio/conduit

go 1.21.1
go 1.21.5

require (
buf.build/gen/go/grpc-ecosystem/grpc-gateway/protocolbuffers/go v1.32.0-20231027202514-3f42134f4c56.1
Expand All @@ -17,6 +17,7 @@ require (
github.com/conduitio/conduit-connector-protocol v0.5.1-0.20240104160905-e9e61586fb8d
github.com/conduitio/conduit-connector-s3 v0.5.1
github.com/conduitio/conduit-connector-sdk v0.8.0
github.com/conduitio/conduit-processor-sdk v0.0.0-20240118151737-a75ea9e86bb8
github.com/conduitio/yaml/v3 v3.3.0
github.com/dgraph-io/badger/v4 v4.2.0
github.com/dop251/goja v0.0.0-20230531210528-d7324b2d74f7
Expand All @@ -42,6 +43,8 @@ require (
github.com/prometheus/client_model v0.5.0
github.com/prometheus/common v0.45.0
github.com/rs/zerolog v1.31.0
github.com/stealthrocket/wazergo v0.19.1
github.com/tetratelabs/wazero v1.5.0
github.com/twmb/go-cache v1.2.0
go.uber.org/goleak v1.3.0
go.uber.org/mock v0.4.0
Expand Down Expand Up @@ -296,7 +299,6 @@ require (
github.com/t-yuki/gocover-cobertura v0.0.0-20180217150009-aaee18c8195c // indirect
github.com/tdakkota/asciicheck v0.2.0 // indirect
github.com/tetafro/godot v1.4.15 // indirect
github.com/tetratelabs/wazero v1.5.0 // indirect
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 // indirect
github.com/timonwong/loggercheck v0.9.4 // indirect
github.com/tomarrell/wrapcheck/v2 v2.8.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,8 @@ github.com/conduitio/conduit-connector-s3 v0.5.1 h1:yRo8004ryCIZc/S3iWQ1rN6pm6bj
github.com/conduitio/conduit-connector-s3 v0.5.1/go.mod h1:nbxzsyS95gbFJ28Job9vFFB+byRFINSv70/13Yi4mKQ=
github.com/conduitio/conduit-connector-sdk v0.8.0 h1:gvchqoj5d3AQsBoIosx4i32L8Ex9+5BuAyHi/IM9VD4=
github.com/conduitio/conduit-connector-sdk v0.8.0/go.mod h1:nOz4K3X6fD8YMe5CPbULwSEE18Eu02ZrpT6o6KwQfxs=
github.com/conduitio/conduit-processor-sdk v0.0.0-20240118151737-a75ea9e86bb8 h1:H6Px/c38KiId1XDsb4agp25wOlMsZM2rp4p2kxlHDKM=
github.com/conduitio/conduit-processor-sdk v0.0.0-20240118151737-a75ea9e86bb8/go.mod h1:k0rpE3kOAyDcIsBbS5vMO035XzDGW9FJsC4sgEXCH8Y=
github.com/conduitio/yaml/v3 v3.3.0 h1:kbbaOSHcuH39gP4+rgbJGl6DSbLZcJgEaBvkEXJlCsI=
github.com/conduitio/yaml/v3 v3.3.0/go.mod h1:JNgFMOX1t8W4YJuRZOh6GggVtSMsgP9XgTw+7dIenpc=
github.com/containerd/stargz-snapshotter/estargz v0.15.1 h1:eXJjw9RbkLFgioVaTG+G/ZW/0kEe2oEKCdS/ZxIyoCU=
Expand Down Expand Up @@ -1957,6 +1959,8 @@ github.com/ssgreg/nlreturn/v2 v2.2.1 h1:X4XDI7jstt3ySqGU86YGAURbxw3oTDPK9sPEi6YE
github.com/ssgreg/nlreturn/v2 v2.2.1/go.mod h1:E/iiPB78hV7Szg2YfRgyIrk1AD6JVMTRkkxBiELzh2I=
github.com/stbenjam/no-sprintf-host-port v0.1.1 h1:tYugd/yrm1O0dV+ThCbaKZh195Dfm07ysF0U6JQXczc=
github.com/stbenjam/no-sprintf-host-port v0.1.1/go.mod h1:TLhvtIvONRzdmkFiio4O8LHsN9N74I+PhRquPsxpL0I=
github.com/stealthrocket/wazergo v0.19.1 h1:BPrITETPgSFwiytwmToO0MbUC/+RGC39JScz1JmmG6c=
github.com/stealthrocket/wazergo v0.19.1/go.mod h1:riI0hxw4ndZA5e6z7PesHg2BtTftcZaMxRcoiGGipTs=
github.com/stoewer/go-strcase v1.3.0 h1:g0eASXYtp+yvN9fK8sH94oCIk0fau9uV1/ZdJ0AVEzs=
github.com/stoewer/go-strcase v1.3.0/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8wodgtPmh1xo=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
1 change: 1 addition & 0 deletions pkg/foundation/cerrors/cerrors.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var (
Is = errors.Is
As = errors.As
Unwrap = errors.Unwrap
Join = errors.Join
)

type Frame struct {
Expand Down
22 changes: 22 additions & 0 deletions pkg/foundation/log/ctxlogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ package log

import (
"context"
"reflect"
"strings"
"testing"

"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -46,6 +49,11 @@ func Nop() CtxLogger {
return CtxLogger{Logger: zerolog.Nop()}
}

// Test returns a test logger that writes to the supplied testing.TB.
func Test(t testing.TB) CtxLogger {
return CtxLogger{Logger: zerolog.New(zerolog.NewTestWriter(t))}
}

// InitLogger returns a logger initialized with the wanted level and format
func InitLogger(level zerolog.Level, f Format) CtxLogger {
var w = GetWriter(f)
Expand All @@ -67,6 +75,20 @@ func (l CtxLogger) WithComponent(component string) CtxLogger {
return l
}

func (l CtxLogger) WithComponentFromType(c any) CtxLogger {
cType := reflect.TypeOf(c)
for cType.Kind() == reflect.Ptr || cType.Kind() == reflect.Interface {
cType = cType.Elem()
}

pkgPath := cType.PkgPath()
pkgPath = strings.TrimPrefix(pkgPath, "github.com/conduitio/conduit/pkg/")
pkgPath = strings.ReplaceAll(pkgPath, "/", ".")
typeName := cType.Name()
l.component = pkgPath + "." + typeName
return l
}

func (l CtxLogger) Component() string {
return l.component
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/foundation/log/ctxlogger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,20 @@ func TestCtxLoggerComponent(t *testing.T) {
is.Equal(`{"level":"info","component":"test","message":"testing component"}`+"\n", got)
}

type testComponent struct{}

func TestCtxLoggerComponentFromType(t *testing.T) {
is := is.New(t)

logger := New(zerolog.New(zerolog.NewTestWriter(t)))

logger = logger.WithComponentFromType(testComponent{})
is.Equal("foundation.log.testComponent", logger.Component())

logger = logger.WithComponentFromType(&testComponent{})
is.Equal("foundation.log.testComponent", logger.Component())
}

func TestCtxLoggerWithoutHooks(t *testing.T) {
ctx := context.Background()

Expand Down
1 change: 1 addition & 0 deletions pkg/foundation/log/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
NodeIDField = "node_id"
ParallelWorkerIDField = "parallel_worker_id"
PipelineIDField = "pipeline_id"
ProcessorIDField = "processor_id"
RecordPositionField = "record_position"
RequestIDField = "request_id"
ServerAddressField = "address"
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugin/connector/builtin/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func NewDispenserFactory(conn sdk.Connector) DispenserFactory {
}

func NewRegistry(logger log.CtxLogger, factories map[string]DispenserFactory) *Registry {
logger = logger.WithComponent("builtin.Registry")
logger = logger.WithComponentFromType(Registry{})
buildInfo, ok := debug.ReadBuildInfo()
if !ok {
// we are using modules, build info should always be available, we are staying on the safe side
Expand Down
64 changes: 32 additions & 32 deletions pkg/plugin/connector/standalone/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ type blueprint struct {

func NewRegistry(logger log.CtxLogger, pluginDir string) *Registry {
r := &Registry{
logger: logger.WithComponent("standalone.Registry"),
logger: logger.WithComponentFromType(Registry{}),
}

if pluginDir != "" {
// extract absolute path to make it clearer in the logs what directory is used
absPluginDir, err := filepath.Abs(pluginDir)
if err != nil {
r.logger.Warn(context.Background()).Err(err).Msg("could not extract absolute plugins path")
r.logger.Warn(context.Background()).Err(err).Msg("could not extract absolute connector plugins path")
} else {
r.pluginDir = absPluginDir // store plugin dir for hot reloads
r.reloadPlugins()
Expand All @@ -67,15 +67,11 @@ func NewRegistry(logger log.CtxLogger, pluginDir string) *Registry {
r.logger.Info(context.Background()).
Str(log.PluginPathField, r.pluginDir).
Int("count", len(r.List())).
Msg("standalone plugins initialized")
Msg("standalone connector plugins initialized")

return r
}

func newFullName(pluginName, pluginVersion string) plugin.FullName {
return plugin.NewFullName(plugin.PluginTypeStandalone, pluginName, pluginVersion)
}

func (r *Registry) reloadPlugins() {
plugins := r.loadPlugins(context.Background(), r.pluginDir)
r.m.Lock()
Expand All @@ -84,19 +80,19 @@ func (r *Registry) reloadPlugins() {
}

func (r *Registry) loadPlugins(ctx context.Context, pluginDir string) map[string]map[string]blueprint {
r.logger.Debug(ctx).Msgf("loading plugins from directory %v", pluginDir)
r.logger.Debug(ctx).Msgf("loading connector plugins from directory %v", pluginDir)
plugins := make(map[string]map[string]blueprint)

dirEntries, err := os.ReadDir(pluginDir)
if err != nil {
r.logger.Warn(ctx).Err(err).Msg("could not read plugin directory")
r.logger.Warn(ctx).Err(err).Msg("could not read connector plugin directory")
return plugins // return empty map
}
warn := func(ctx context.Context, err error, pluginPath string) {
r.logger.Warn(ctx).
Err(err).
Str(log.PluginPathField, pluginPath).
Msgf("could not load standalone plugin")
Msgf("could not load standalone connector plugin")
}

for _, dirEntry := range dirEntries {
Expand All @@ -107,24 +103,8 @@ func (r *Registry) loadPlugins(ctx context.Context, pluginDir string) map[string

pluginPath := path.Join(pluginDir, dirEntry.Name())

// create dispenser without a logger to not spam logs on refresh
dispenser, err := standalonev1.NewDispenser(zerolog.Nop(), pluginPath)
if err != nil {
err = cerrors.Errorf("failed to create dispenser: %w", err)
warn(ctx, err, pluginPath)
continue
}

specPlugin, err := dispenser.DispenseSpecifier()
if err != nil {
err = cerrors.Errorf("failed to dispense specifier (tip: check if the file is a valid plugin binary and if you have permissions for running it): %w", err)
warn(ctx, err, pluginPath)
continue
}

specs, err := specPlugin.Specify()
specs, err := r.loadSpecifications(pluginPath)
if err != nil {
err = cerrors.Errorf("failed to get specs: %w", err)
warn(ctx, err, pluginPath)
continue
}
Expand All @@ -135,9 +115,9 @@ func (r *Registry) loadPlugins(ctx context.Context, pluginDir string) map[string
plugins[specs.Name] = versionMap
}

fullName := newFullName(specs.Name, specs.Version)
fullName := plugin.NewFullName(plugin.PluginTypeStandalone, specs.Name, specs.Version)
if conflict, ok := versionMap[specs.Version]; ok {
err = cerrors.Errorf("conflict detected, plugin %v already registered, please remove either %v or %v, these plugins won't be usable until that happens", fullName, conflict.path, pluginPath)
err = cerrors.Errorf("conflict detected, connector plugin %v already registered, please remove either %v or %v, these plugins won't be usable until that happens", fullName, conflict.path, pluginPath)
warn(ctx, err, pluginPath)
// delete plugin from map at the end so that further duplicates can
// still be found
Expand All @@ -163,18 +143,38 @@ func (r *Registry) loadPlugins(ctx context.Context, pluginDir string) map[string
r.logger.Debug(ctx).
Str(log.PluginPathField, pluginPath).
Str(log.PluginNameField, string(bp.fullName)).
Msg("set plugin as latest")
Msg("set connector plugin as latest")
}

r.logger.Debug(ctx).
Str(log.PluginPathField, pluginPath).
Str(log.PluginNameField, string(bp.fullName)).
Msg("loaded standalone plugin")
Msg("loaded standalone connector plugin")
}

return plugins
}

func (r *Registry) loadSpecifications(pluginPath string) (connector.Specification, error) {
// create dispenser without a logger to not spam logs on refresh
dispenser, err := standalonev1.NewDispenser(zerolog.Nop(), pluginPath)
if err != nil {
return connector.Specification{}, cerrors.Errorf("failed to create connector dispenser: %w", err)
}

specPlugin, err := dispenser.DispenseSpecifier()
if err != nil {
return connector.Specification{}, cerrors.Errorf("failed to dispense connector specifier (tip: check if the file is a valid connector plugin binary and if you have permissions for running it): %w", err)
}

specs, err := specPlugin.Specify()
if err != nil {
return connector.Specification{}, cerrors.Errorf("failed to get connector specs: %w", err)
}

return specs, nil
}

func (r *Registry) NewDispenser(logger log.CtxLogger, fullName plugin.FullName) (connector.Dispenser, error) {
r.m.RLock()
defer r.m.RUnlock()
Expand All @@ -189,7 +189,7 @@ func (r *Registry) NewDispenser(logger log.CtxLogger, fullName plugin.FullName)
for k := range versionMap {
availableVersions = append(availableVersions, k)
}
return nil, cerrors.Errorf("could not find standalone plugin, only found versions %v: %w", availableVersions, plugin.ErrPluginNotFound)
return nil, cerrors.Errorf("could not find standalone connector plugin, only found versions %v: %w", availableVersions, plugin.ErrPluginNotFound)
}

return standalonev1.NewDispenser(logger.ZerologWithComponent(), bp.path)
Expand Down
Loading
Loading