Skip to content

Commit

Permalink
Implement command and reply (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso authored Jan 9, 2024
1 parent cd358fd commit 791ebd7
Show file tree
Hide file tree
Showing 11 changed files with 394 additions and 3 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:
pull_request:

jobs:
build:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
Expand Down
13 changes: 13 additions & 0 deletions .golangci.goheader.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Copyright © {{ copyright-year }} Meroxa, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
125 changes: 125 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
run:
timeout: 5m

linters-settings:
nolintlint:
allow-unused: false # report any unused nolint directives
require-explanation: true # require an explanation for nolint directives
require-specific: true # require nolint directives to mention the specific linter being suppressed
gocyclo:
min-complexity: 20
goconst:
ignore-tests: true
goheader:
template-path: '.golangci.goheader.template'
values:
regexp:
copyright-year: 20[2-9]\d
wrapcheck:
ignoreSigs:
- .Errorf(
- errors.New(
- errors.Unwrap(
- .Wrap(
- .Wrapf(
- .WithMessage(
- .WithMessagef(
- .WithStack(
- (context.Context).Err()

issues:
exclude-rules:
- path: _test\.go
linters:
- dogsled
- gosec
- gocognit
- errcheck
- forcetypeassert
- funlen
- goerr113

linters:
# please, do not use `enable-all`: it's deprecated and will be removed soon.
# inverted configuration with `enable-all` and `disable` is not scalable during updates of golangci-lint
disable-all: true
enable:
- asasalint
- asciicheck
- bidichk
- bodyclose
- containedctx
- contextcheck
- decorder
# - depguard
- dogsled
- dupl
- dupword
- durationcheck
- errcheck
- errchkjson
- errname
- errorlint
- execinquery
- exhaustive
- exportloopref
# - forbidigo
- forcetypeassert
- funlen
- gci
- ginkgolinter
- gocheckcompilerdirectives
- gochecknoinits
- gocognit
- goconst
- gocritic
- godot
- goerr113
- gofmt
- gofumpt
- goheader
- goimports
- gomoddirectives
- goprintffuncname
- gosec
- gosimple
- gosmopolitan
- govet
- grouper
- importas
- ineffassign
- interfacebloat
# - ireturn # Doesn't have correct support for generic types https://github.com/butuzov/ireturn/issues/37
- loggercheck
- maintidx
- makezero
- mirror
- misspell
- musttag
- nakedret
- nestif
- nilerr
- nilnil
- noctx
- nolintlint
- nosprintfhostport
- prealloc
- predeclared
- promlinter
- reassign
- revive
- rowserrcheck
- sqlclosecheck
- staticcheck
- stylecheck
- tenv
- testableexamples
- thelper
- unconvert
- unparam
- unused
- usestdlibvars
- wastedassign
- whitespace
- wrapcheck
- zerologlint
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
.PHONY: test
test:
go test $(GOTEST_FLAGS) -race ./...
go test $(GOTEST_FLAGS) -race ./... -tags !wasm

.PHONY: lint
lint:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ toolchain go1.21.5

require (
github.com/conduitio/conduit-commons v0.0.0-20231205181721-bef91d55116c
github.com/goccy/go-json v0.10.2
github.com/golangci/golangci-lint v1.55.2
go.uber.org/mock v0.4.0
mvdan.cc/gofumpt v0.5.0
Expand Down Expand Up @@ -65,7 +66,6 @@ require (
github.com/go-toolsmith/typep v1.1.0 // indirect
github.com/go-xmlfmt/xmlfmt v1.1.2 // indirect
github.com/gobwas/glob v0.2.3 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/gofrs/flock v0.8.1 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golangci/check v0.0.0-20180506172741-cfe4005ccda2 // indirect
Expand Down
77 changes: 77 additions & 0 deletions internal/wasm_commands.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright © 2023 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"errors"
"fmt"
"math"

"github.com/goccy/go-json"
)

var (
defaultCommandSize = uint32(1024)

// ErrorCodeStart is the smallest error code which the host (i.e. Conduit) can send.
// The imported function _nextCommand returns an uint32 value
// that is either the number of bytes actually written or an error code.
// Because of that, we're reserving a range of error codes.
ErrorCodeStart = math.MaxUint32 - uint32(100)
)

var (
ErrCannotUnmarshalCommand = errors.New("cannot unmarshal command")
ErrNextCommand = errors.New("failed getting next command")
)

type Command struct {
Name string `json:"name"`
}

// NextCommand retrieves the next command from Conduit.
func NextCommand() (Command, error) {
// allocate some memory for Conduit to write the command
// we're allocating some memory in advance, so that
// we don't need to introduce another call just to
// get the amount of memory which is needed.
ptr, cleanup := allocate(defaultCommandSize)
defer cleanup()

// request Conduit to write the command to the given allocation
fmt.Println("getting next command")
resp := _nextCommand(ptr, defaultCommandSize)
if resp > ErrorCodeStart { // error codes
// todo if more memory is needed, allocate it
// https://github.com/ConduitIO/conduit-processor-sdk/issues/6
fmt.Printf("got error code: %v\n", resp)
return Command{}, fmt.Errorf("error code %v: %w", resp, ErrNextCommand)
}

// parse the command
var cmd Command
err := json.Unmarshal(ptrToByteArray(ptr, resp), &cmd)
if err != nil {
return Command{}, ErrCannotUnmarshalCommand
}

return cmd, nil
}

func Reply(bytes []byte) {
ptr, cleanup := Write(bytes)
defer cleanup()
_reply(ptr, uint32(len(bytes)))
}
41 changes: 41 additions & 0 deletions internal/wasm_imports.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright © 2023 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build wasm

package internal

// Imports `nextCommand` from the host, which retrieves
// the next command for a processor.
//
// The arguments are:
// (1) a pointer to the address where the command should be written
// (2) the size of allocated memory.
//
// The return value can be 0 (for a successful reply) or an error code.
//
//go:wasmimport env nextCommand
func _nextCommand(ptr, size uint32) uint32

// Imports `reply` from the host, which informs
// the host about the reply for the previous command.
//
// The arguments are:
// (1) a pointer to the address where the reply should be written
// (2) the size of allocated memory.
//
// The return value can be 0 (for a successful reply) or an error code.
//
//go:wasmimport env reply
func _reply(ptr, size uint32)
30 changes: 30 additions & 0 deletions internal/wasm_imports_stub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright © 2023 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// The functions in this file are stubs of the functions defined
// in wasm_imports.go.
// They exist to make it possible to test, lint
// or generally run the code in a non-WASM environment.

//go:build !wasm

package internal

func _nextCommand(_, _ uint32) uint32 {
panic("stub")
}

func _reply(_, _ uint32) {
panic("stub")
}
54 changes: 54 additions & 0 deletions internal/wasm_memory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright © 2023 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"fmt"
"unsafe"
)

var allocations = make(map[uintptr][]byte)

func allocate(size uint32) (uint32, func()) {
fmt.Printf("allocating %v bytes", size)

return Write(make([]byte, size))
}

func free(ptr unsafe.Pointer) {
if ptr == nil {
return
}

if _, ok := allocations[uintptr(ptr)]; ok {
delete(allocations, uintptr(ptr))
} else {
panic("free: invalid pointer")
}
}

func ptrToByteArray(ptr uint32, size uint32) []byte {
return unsafe.Slice((*byte)(unsafe.Pointer(uintptr(ptr))), size)
}

func Write(bytes []byte) (uint32, func()) {
fmt.Printf("writing %v bytes to memory\n", len(bytes))
ptr := unsafe.Pointer(&bytes[0])
allocations[uintptr(ptr)] = bytes

return uint32(uintptr(ptr)), func() {
free(ptr)
}
}
1 change: 1 addition & 0 deletions mock/processor.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 791ebd7

Please sign in to comment.