Skip to content

Commit

Permalink
refactoring, logging, utilities, and much more
Browse files Browse the repository at this point in the history
  • Loading branch information
lovromazgon committed Jan 17, 2024
1 parent cb407e7 commit 843df71
Show file tree
Hide file tree
Showing 19 changed files with 1,054 additions and 324 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/conduitio/conduit-connector-protocol v0.5.0
github.com/conduitio/conduit-connector-s3 v0.5.0
github.com/conduitio/conduit-connector-sdk v0.8.0
github.com/conduitio/conduit-processor-sdk v0.0.0-20240104184508-4f61ece4b1c9
github.com/conduitio/conduit-processor-sdk v0.0.0-20240116154859-538779da04d6
github.com/conduitio/yaml/v3 v3.3.0
github.com/dgraph-io/badger/v4 v4.2.0
github.com/dop251/goja v0.0.0-20230531210528-d7324b2d74f7
Expand All @@ -44,6 +44,7 @@ require (
github.com/prometheus/client_model v0.5.0
github.com/prometheus/common v0.45.0
github.com/rs/zerolog v1.31.0
github.com/stealthrocket/wazergo v0.19.1
github.com/tetratelabs/wazero v1.5.0
github.com/twmb/go-cache v1.2.0
go.uber.org/goleak v1.3.0
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1038,8 +1038,8 @@ github.com/conduitio/conduit-connector-s3 v0.5.0 h1:nqLcf/foYnDLkXWYcWJX/5UHzTjW
github.com/conduitio/conduit-connector-s3 v0.5.0/go.mod h1:I6oE37zz25RTjnQiUBz2rOASwXNqfaMW7gSlsKX6z8E=
github.com/conduitio/conduit-connector-sdk v0.8.0 h1:gvchqoj5d3AQsBoIosx4i32L8Ex9+5BuAyHi/IM9VD4=
github.com/conduitio/conduit-connector-sdk v0.8.0/go.mod h1:nOz4K3X6fD8YMe5CPbULwSEE18Eu02ZrpT6o6KwQfxs=
github.com/conduitio/conduit-processor-sdk v0.0.0-20240104184508-4f61ece4b1c9 h1:iDtdHWgk7U99f7wVkmNEEciEeAOcy1niFjQcmu0FX1Y=
github.com/conduitio/conduit-processor-sdk v0.0.0-20240104184508-4f61ece4b1c9/go.mod h1:1ckqfqVvCYVF4rt6n2ymIzsmPYfzBYQS5kuTIFORuKg=
github.com/conduitio/conduit-processor-sdk v0.0.0-20240116154859-538779da04d6 h1:AeDSV6ScmbUByQfaLR8+SkoleQAeydDJyVf1C/OICwU=
github.com/conduitio/conduit-processor-sdk v0.0.0-20240116154859-538779da04d6/go.mod h1:k0rpE3kOAyDcIsBbS5vMO035XzDGW9FJsC4sgEXCH8Y=
github.com/conduitio/yaml/v3 v3.3.0 h1:kbbaOSHcuH39gP4+rgbJGl6DSbLZcJgEaBvkEXJlCsI=
github.com/conduitio/yaml/v3 v3.3.0/go.mod h1:JNgFMOX1t8W4YJuRZOh6GggVtSMsgP9XgTw+7dIenpc=
github.com/containerd/stargz-snapshotter/estargz v0.15.1 h1:eXJjw9RbkLFgioVaTG+G/ZW/0kEe2oEKCdS/ZxIyoCU=
Expand Down Expand Up @@ -1830,6 +1830,8 @@ github.com/ssgreg/nlreturn/v2 v2.2.1 h1:X4XDI7jstt3ySqGU86YGAURbxw3oTDPK9sPEi6YE
github.com/ssgreg/nlreturn/v2 v2.2.1/go.mod h1:E/iiPB78hV7Szg2YfRgyIrk1AD6JVMTRkkxBiELzh2I=
github.com/stbenjam/no-sprintf-host-port v0.1.1 h1:tYugd/yrm1O0dV+ThCbaKZh195Dfm07ysF0U6JQXczc=
github.com/stbenjam/no-sprintf-host-port v0.1.1/go.mod h1:TLhvtIvONRzdmkFiio4O8LHsN9N74I+PhRquPsxpL0I=
github.com/stealthrocket/wazergo v0.19.1 h1:BPrITETPgSFwiytwmToO0MbUC/+RGC39JScz1JmmG6c=
github.com/stealthrocket/wazergo v0.19.1/go.mod h1:riI0hxw4ndZA5e6z7PesHg2BtTftcZaMxRcoiGGipTs=
github.com/stoewer/go-strcase v1.3.0 h1:g0eASXYtp+yvN9fK8sH94oCIk0fau9uV1/ZdJ0AVEzs=
github.com/stoewer/go-strcase v1.3.0/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8wodgtPmh1xo=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
1 change: 1 addition & 0 deletions pkg/foundation/cerrors/cerrors.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var (
Is = errors.Is
As = errors.As
Unwrap = errors.Unwrap
Join = errors.Join
)

type Frame struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/foundation/log/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
NodeIDField = "node_id"
ParallelWorkerIDField = "parallel_worker_id"
PipelineIDField = "pipeline_id"
ProcessorIDField = "processor_id"
RecordPositionField = "record_position"
RequestIDField = "request_id"
ServerAddressField = "address"
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugin/processor/builtin/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import (
"testing"

sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit-processor-sdk/mock"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin"
"github.com/conduitio/conduit/pkg/plugin/processor/mock"
"github.com/matryer/is"
"go.uber.org/mock/gomock"
)
Expand Down
126 changes: 126 additions & 0 deletions pkg/plugin/processor/mock/processor.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions pkg/plugin/processor/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This generates a mock processor and adds UnimplementedProcessor to it to
// satisfy the sdk.Processor interface.
//go:generate mockgen -destination=mock/processor.go -package=mock -mock_names=Processor=Processor github.com/conduitio/conduit-processor-sdk Processor
//go:generate sed -i "" -e "/type Processor struct {/a\\\n sdk.UnimplementedProcessor" mock/processor.go

package processor
145 changes: 145 additions & 0 deletions pkg/plugin/processor/standalone/host.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package standalone

import (
"context"

processorv1 "github.com/conduitio/conduit-processor-sdk/proto/processor/v1"
"github.com/conduitio/conduit-processor-sdk/wasm"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/stealthrocket/wazergo"
"github.com/stealthrocket/wazergo/types"
"google.golang.org/protobuf/proto"
)

// hostModule declares the host module that is exported to the WASM module. The
// host module is used to communicate between the WASM module (processor) and Conduit.
var hostModule wazergo.HostModule[*hostModuleInstance] = hostModuleFunctions{
"command_request": wazergo.F1((*hostModuleInstance).commandRequest),
"command_response": wazergo.F1((*hostModuleInstance).commandResponse),
}

// hostModuleFunctions type implements HostModule, providing the module name,
// map of exported functions, and the ability to create instances of the module
// type.
type hostModuleFunctions wazergo.Functions[*hostModuleInstance]

// Name returns the name of the module.
func (f hostModuleFunctions) Name() string {
return "conduit"
}

// Functions is a helper that returns the exported functions of the module.
func (f hostModuleFunctions) Functions() wazergo.Functions[*hostModuleInstance] {
return (wazergo.Functions[*hostModuleInstance])(f)
}

// Instantiate creates a new instance of the module. This is called by the
// runtime when a new instance of the module is created.
func (f hostModuleFunctions) Instantiate(_ context.Context, opts ...hostModuleOption) (*hostModuleInstance, error) {
mod := &hostModuleInstance{}
wazergo.Configure(mod, opts...)
if mod.commandRequests == nil {
return nil, cerrors.New("missing command requests channel")
}
if mod.commandResponses == nil {
return nil, cerrors.New("missing command responses channel")
}
return mod, nil
}

type hostModuleOption = wazergo.Option[*hostModuleInstance]

func hostModuleOptions(
logger log.CtxLogger,
requests <-chan *processorv1.CommandRequest,
responses chan<- tuple[*processorv1.CommandResponse, error],
) hostModuleOption {
return wazergo.OptionFunc(func(m *hostModuleInstance) {
m.logger = logger
m.commandRequests = requests
m.commandResponses = responses
})
}

// hostModuleInstance is used to maintain the state of our module instance.
type hostModuleInstance struct {
logger log.CtxLogger
commandRequests <-chan *processorv1.CommandRequest
commandResponses chan<- tuple[*processorv1.CommandResponse, error]

parkedCommandRequest *processorv1.CommandRequest
}

func (*hostModuleInstance) Close(context.Context) error { return nil }

// commandRequest is the exported function that is called by the WASM module to
// get the next command request. It returns the size of the command request
// message. If the buffer is too small, it returns the size of the command
// request message and parks the command request. The next call to this function
// will return the same command request.
func (m *hostModuleInstance) commandRequest(ctx context.Context, buf types.Bytes) types.Uint32 {
m.logger.Trace(ctx).Msg("getting next command")

if m.parkedCommandRequest == nil {
// No parked command, so we need to wait for the next one. If the command
// channel is closed, then we return an error.
var ok bool
m.parkedCommandRequest, ok = <-m.commandRequests
if !ok {
return wasm.ErrorCodeNoMoreCommands
}
}

// If the buffer is too small, we park the command and return the size of the
// command. The next call to nextCommand will return the same command.
if size := proto.Size(m.parkedCommandRequest); len(buf) < size {
m.logger.Warn(ctx).
Int("command_bytes", size).
Int("allocated_bytes", len(buf)).
Msgf("insufficient memory, command will be parked until next call to nextCommand")
return types.Uint32(size)
}

// If the buffer is large enough, we marshal the command into the buffer and
// return the size of the command. The next call to nextCommand will return
// the next command.
out, err := proto.MarshalOptions{}.MarshalAppend(buf[:0], m.parkedCommandRequest)
if err != nil {
m.logger.Err(ctx, err).Msg("failed marshalling protobuf command request")
return wasm.ErrorCodeUnknownCommandRequest
}
m.parkedCommandRequest = nil

m.logger.Trace(ctx).Msg("returning next command")
return types.Uint32(len(out))
}

// commandResponse is the exported function that is called by the WASM module to
// send a command response. It returns 0 on success, or an error code on error.
func (m *hostModuleInstance) commandResponse(ctx context.Context, buf types.Bytes) types.Uint32 {
var resp processorv1.CommandResponse
err := proto.Unmarshal(buf, &resp)
if err != nil {
m.logger.Err(ctx, err).Msg("failed unmarshalling protobuf command response")
m.commandResponses <- tuple[*processorv1.CommandResponse, error]{nil, err}
return wasm.ErrorCodeUnknownCommandResponse
}

m.commandResponses <- tuple[*processorv1.CommandResponse, error]{&resp, nil}
return 0
}
Loading

0 comments on commit 843df71

Please sign in to comment.