Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso committed Dec 27, 2023
1 parent 64784c1 commit 030acbd
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 41 deletions.
35 changes: 6 additions & 29 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ type Command interface {
}

type CommandResponse interface {
// ErrStr returns an error string associated with this response.
ErrStr() string
isCommandResponse()
}

type SpecifyCmd struct{}
Expand All @@ -41,12 +40,7 @@ type SpecifyResponse struct {
Err error
}

func (r *SpecifyResponse) ErrStr() string {
if r.Err == nil {
return ""
}
return r.Err.Error()
}
func (r *SpecifyResponse) isCommandResponse() {}

type ConfigureCmd struct {
ConfigMap map[string]string
Expand All @@ -62,12 +56,7 @@ type ConfigureResponse struct {
Err error
}

func (r *ConfigureResponse) ErrStr() string {
if r.Err == nil {
return ""
}
return r.Err.Error()
}
func (r *ConfigureResponse) isCommandResponse() {}

type OpenCmd struct{}

Expand All @@ -79,12 +68,7 @@ type OpenResponse struct {
Err error
}

func (r *OpenResponse) ErrStr() string {
if r.Err == nil {
return ""
}
return r.Err.Error()
}
func (r *OpenResponse) isCommandResponse() {}

type ProcessCmd struct {
Records []opencdc.Record
Expand All @@ -98,9 +82,7 @@ type ProcessResponse struct {
Records []ProcessedRecord
}

func (r *ProcessResponse) ErrStr() string {
return ""
}
func (r *ProcessResponse) isCommandResponse() {}

type TeardownCmd struct{}

Expand All @@ -114,9 +96,4 @@ type TeardownResponse struct {
Err error
}

func (r *TeardownResponse) ErrStr() string {
if r.Err == nil {
return ""
}
return r.Err.Error()
}
func (r *TeardownResponse) isCommandResponse() {}
File renamed without changes.
16 changes: 12 additions & 4 deletions internal/proto/command_to_proto.go → internal/proto/to_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,19 +122,19 @@ func MarshalCommandResponse(resp sdk.CommandResponse) ([]byte, error) {
Version: v.Specification.Version,
Author: v.Specification.Author,
Parameters: protoSpecificationParams(v.Specification.Parameters),
Err: v.ErrStr(),
Err: errorToString(v.Err),
},
}
case *sdk.ConfigureResponse:
protoResp.Response = &procproto.CommandResponse_ConfigureResp{
ConfigureResp: &procproto.Configure_Response{
Err: v.ErrStr(),
Err: errorToString(v.Err),
},
}
case *sdk.OpenResponse:
protoResp.Response = &procproto.CommandResponse_OpenResp{
OpenResp: &procproto.Open_Response{
Err: v.ErrStr(),
Err: errorToString(v.Err),
},
}
case *sdk.ProcessResponse:
Expand All @@ -150,7 +150,7 @@ func MarshalCommandResponse(resp sdk.CommandResponse) ([]byte, error) {
case *sdk.TeardownResponse:
protoResp.Response = &procproto.CommandResponse_TeardownResp{
TeardownResp: &procproto.Teardown_Response{
Err: v.ErrStr(),
Err: errorToString(v.Err),
},
}
default:
Expand All @@ -165,6 +165,14 @@ func MarshalCommandResponse(resp sdk.CommandResponse) ([]byte, error) {
return bytes, nil
}

func errorToString(err error) string {
if err == nil {
return ""
}

return err.Error()
}

func protoSpecificationParams(in map[string]sdk.Parameter) map[string]*procproto.Specify_Parameter {
out := make(map[string]*procproto.Specify_Parameter, len(in))
for name, param := range in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package internal
package wasm

import (
"fmt"
Expand Down
2 changes: 1 addition & 1 deletion internal/wasm_imports.go → internal/wasm/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

//go:build wasm

package internal
package wasm

// Imports `nextCommand` from the host, which retrieves
// the next command for a processor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
// limitations under the License.

// The functions in this file are stubs of the functions defined
// in wasm_imports.go.
// in imports.go.
// They exist to make it possible to test, lint
// or generally run the code in a non-WASM environment.

//go:build !wasm

package internal
package wasm

func _nextCommand(_, _ uint32) uint32 {
panic("stub")
Expand Down
2 changes: 1 addition & 1 deletion internal/wasm_memory.go → internal/wasm/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package internal
package wasm

import (
"fmt"
Expand Down
6 changes: 3 additions & 3 deletions run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"os"

sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit-processor-sdk/internal"
"github.com/conduitio/conduit-processor-sdk/internal/wasm"
)

// Run is the 'entry point' for a processor. It runs a
Expand All @@ -30,14 +30,14 @@ import (
// A processor plugin needs to call this function in its main() function.
func Run(p sdk.Processor) {
for {
cmd, err := internal.NextCommand()
cmd, err := wasm.NextCommand()
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "failed retrieving next command: %v", err)
os.Exit(1)
}

resp := cmd.Execute(context.Background(), p)
err = internal.Reply(resp)
err = wasm.Reply(resp)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "failed writing reply: %v\n", err)
os.Exit(1)
Expand Down

0 comments on commit 030acbd

Please sign in to comment.