From 030acbd27f8e23fb7a9315aafe4c1f45b21f8b12 Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Wed, 27 Dec 2023 12:34:11 +0100 Subject: [PATCH] refactor --- command.go | 35 ++++--------------- .../{command_from_proto.go => from_proto.go} | 0 .../{command_to_proto.go => to_proto.go} | 16 ++++++--- .../command_actions.go} | 2 +- internal/{wasm_imports.go => wasm/imports.go} | 2 +- .../imports_stub.go} | 4 +-- internal/{wasm_memory.go => wasm/memory.go} | 2 +- run/run.go | 6 ++-- 8 files changed, 26 insertions(+), 41 deletions(-) rename internal/proto/{command_from_proto.go => from_proto.go} (100%) rename internal/proto/{command_to_proto.go => to_proto.go} (97%) rename internal/{wasm_commands.go => wasm/command_actions.go} (99%) rename internal/{wasm_imports.go => wasm/imports.go} (98%) rename internal/{wasm_imports_stub.go => wasm/imports_stub.go} (95%) rename internal/{wasm_memory.go => wasm/memory.go} (98%) diff --git a/command.go b/command.go index e6c4f31..d1a8389 100644 --- a/command.go +++ b/command.go @@ -25,8 +25,7 @@ type Command interface { } type CommandResponse interface { - // ErrStr returns an error string associated with this response. - ErrStr() string + isCommandResponse() } type SpecifyCmd struct{} @@ -41,12 +40,7 @@ type SpecifyResponse struct { Err error } -func (r *SpecifyResponse) ErrStr() string { - if r.Err == nil { - return "" - } - return r.Err.Error() -} +func (r *SpecifyResponse) isCommandResponse() {} type ConfigureCmd struct { ConfigMap map[string]string @@ -62,12 +56,7 @@ type ConfigureResponse struct { Err error } -func (r *ConfigureResponse) ErrStr() string { - if r.Err == nil { - return "" - } - return r.Err.Error() -} +func (r *ConfigureResponse) isCommandResponse() {} type OpenCmd struct{} @@ -79,12 +68,7 @@ type OpenResponse struct { Err error } -func (r *OpenResponse) ErrStr() string { - if r.Err == nil { - return "" - } - return r.Err.Error() -} +func (r *OpenResponse) isCommandResponse() {} type ProcessCmd struct { Records []opencdc.Record @@ -98,9 +82,7 @@ type ProcessResponse struct { Records []ProcessedRecord } -func (r *ProcessResponse) ErrStr() string { - return "" -} +func (r *ProcessResponse) isCommandResponse() {} type TeardownCmd struct{} @@ -114,9 +96,4 @@ type TeardownResponse struct { Err error } -func (r *TeardownResponse) ErrStr() string { - if r.Err == nil { - return "" - } - return r.Err.Error() -} +func (r *TeardownResponse) isCommandResponse() {} diff --git a/internal/proto/command_from_proto.go b/internal/proto/from_proto.go similarity index 100% rename from internal/proto/command_from_proto.go rename to internal/proto/from_proto.go diff --git a/internal/proto/command_to_proto.go b/internal/proto/to_proto.go similarity index 97% rename from internal/proto/command_to_proto.go rename to internal/proto/to_proto.go index 47caa45..4497ecd 100644 --- a/internal/proto/command_to_proto.go +++ b/internal/proto/to_proto.go @@ -122,19 +122,19 @@ func MarshalCommandResponse(resp sdk.CommandResponse) ([]byte, error) { Version: v.Specification.Version, Author: v.Specification.Author, Parameters: protoSpecificationParams(v.Specification.Parameters), - Err: v.ErrStr(), + Err: errorToString(v.Err), }, } case *sdk.ConfigureResponse: protoResp.Response = &procproto.CommandResponse_ConfigureResp{ ConfigureResp: &procproto.Configure_Response{ - Err: v.ErrStr(), + Err: errorToString(v.Err), }, } case *sdk.OpenResponse: protoResp.Response = &procproto.CommandResponse_OpenResp{ OpenResp: &procproto.Open_Response{ - Err: v.ErrStr(), + Err: errorToString(v.Err), }, } case *sdk.ProcessResponse: @@ -150,7 +150,7 @@ func MarshalCommandResponse(resp sdk.CommandResponse) ([]byte, error) { case *sdk.TeardownResponse: protoResp.Response = &procproto.CommandResponse_TeardownResp{ TeardownResp: &procproto.Teardown_Response{ - Err: v.ErrStr(), + Err: errorToString(v.Err), }, } default: @@ -165,6 +165,14 @@ func MarshalCommandResponse(resp sdk.CommandResponse) ([]byte, error) { return bytes, nil } +func errorToString(err error) string { + if err == nil { + return "" + } + + return err.Error() +} + func protoSpecificationParams(in map[string]sdk.Parameter) map[string]*procproto.Specify_Parameter { out := make(map[string]*procproto.Specify_Parameter, len(in)) for name, param := range in { diff --git a/internal/wasm_commands.go b/internal/wasm/command_actions.go similarity index 99% rename from internal/wasm_commands.go rename to internal/wasm/command_actions.go index 33db568..3d174e5 100644 --- a/internal/wasm_commands.go +++ b/internal/wasm/command_actions.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package internal +package wasm import ( "fmt" diff --git a/internal/wasm_imports.go b/internal/wasm/imports.go similarity index 98% rename from internal/wasm_imports.go rename to internal/wasm/imports.go index bf557ae..b339fcb 100644 --- a/internal/wasm_imports.go +++ b/internal/wasm/imports.go @@ -14,7 +14,7 @@ //go:build wasm -package internal +package wasm // Imports `nextCommand` from the host, which retrieves // the next command for a processor. diff --git a/internal/wasm_imports_stub.go b/internal/wasm/imports_stub.go similarity index 95% rename from internal/wasm_imports_stub.go rename to internal/wasm/imports_stub.go index 8fa1cd4..7152a84 100644 --- a/internal/wasm_imports_stub.go +++ b/internal/wasm/imports_stub.go @@ -13,13 +13,13 @@ // limitations under the License. // The functions in this file are stubs of the functions defined -// in wasm_imports.go. +// in imports.go. // They exist to make it possible to test, lint // or generally run the code in a non-WASM environment. //go:build !wasm -package internal +package wasm func _nextCommand(_, _ uint32) uint32 { panic("stub") diff --git a/internal/wasm_memory.go b/internal/wasm/memory.go similarity index 98% rename from internal/wasm_memory.go rename to internal/wasm/memory.go index 8a2d8e4..6722636 100644 --- a/internal/wasm_memory.go +++ b/internal/wasm/memory.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package internal +package wasm import ( "fmt" diff --git a/run/run.go b/run/run.go index bcb6ef1..6f79832 100644 --- a/run/run.go +++ b/run/run.go @@ -20,7 +20,7 @@ import ( "os" sdk "github.com/conduitio/conduit-processor-sdk" - "github.com/conduitio/conduit-processor-sdk/internal" + "github.com/conduitio/conduit-processor-sdk/internal/wasm" ) // Run is the 'entry point' for a processor. It runs a @@ -30,14 +30,14 @@ import ( // A processor plugin needs to call this function in its main() function. func Run(p sdk.Processor) { for { - cmd, err := internal.NextCommand() + cmd, err := wasm.NextCommand() if err != nil { _, _ = fmt.Fprintf(os.Stderr, "failed retrieving next command: %v", err) os.Exit(1) } resp := cmd.Execute(context.Background(), p) - err = internal.Reply(resp) + err = wasm.Reply(resp) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "failed writing reply: %v\n", err) os.Exit(1)