Skip to content
This repository has been archived by the owner on Feb 9, 2024. It is now read-only.

Commit

Permalink
Merge pull request #56 from streamdal/blinktag/sdk_response
Browse files Browse the repository at this point in the history
ENG-1358 - Update process to return new response format
  • Loading branch information
blinktag authored Dec 14, 2023
2 parents ba9428c + b009822 commit 77806a8
Show file tree
Hide file tree
Showing 7 changed files with 353 additions and 164 deletions.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,18 @@ func main() {
ShutdownCtx: context.Background(),
})

resp, _ := sc.Process(context.Background(), &streamdal.ProcessRequest{
resp := sc.Process(context.Background(), &streamdal.ProcessRequest{
OperationType: streamdal.OperationTypeConsumer,
OperationName: "new-order-topic",
ComponentName: "kafka",
Data: []byte(`{"object": {"field": true}}`),
})

if resp.Error != nil {
fmt.Println(resp.ErrorMessage)
return
}

println(string(resp.Data))
}

Expand Down
12 changes: 8 additions & 4 deletions function.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ type function struct {
}

func (f *function) Exec(ctx context.Context, req []byte) ([]byte, error) {
f.mtx.Lock()
defer f.mtx.Unlock()

ptrLen := uint64(len(req))

inputPtr, err := f.alloc.Call(ctx, ptrLen)
Expand Down Expand Up @@ -65,6 +62,10 @@ func (f *function) Exec(ctx context.Context, req []byte) ([]byte, error) {
// Read memory starting from result ptr
resBytes, err := f.readMemory(resultPtr, resultSize)
if err != nil {
// Dealloc response memory
if _, err := f.dealloc.Call(ctx, uint64(resultPtr), uint64(resultSize)); err != nil {
return nil, errors.Wrap(err, "unable to deallocate memory")
}
return nil, errors.Wrap(err, "unable to read memory")
}

Expand Down Expand Up @@ -153,8 +154,11 @@ func (s *Streamdal) createWASMInstance(wasmBytes []byte) (api.Module, error) {
"httpRequest": s.hf.HTTPRequest,
}

rCfg := wazero.NewRuntimeConfig().
WithMemoryLimitPages(1000) // 64MB (default is 1MB)

ctx := context.Background()
r := wazero.NewRuntime(ctx)
r := wazero.NewRuntimeWithConfig(ctx, rCfg)

wasi_snapshot_preview1.MustInstantiate(ctx, r)

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/onsi/gomega v1.28.0
github.com/pkg/errors v0.9.1
github.com/relistan/go-director v0.0.0-20200406104025-dbbf5d95248d
github.com/streamdal/protos v0.0.124
github.com/streamdal/protos v0.0.125
github.com/tetratelabs/wazero v1.5.0
golang.org/x/time v0.5.0
google.golang.org/grpc v1.59.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykE
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/streamdal/protos v0.0.124 h1:vI6Qj/ySJRRaG2IzfvKhrqA0NcUDGtqinCZYiatNf6k=
github.com/streamdal/protos v0.0.124/go.mod h1:ciPOA0/x5PG4yxOdwBtLFPwyAKwecMkuEitO3csbB7A=
github.com/streamdal/protos v0.0.125 h1:S2EFSaNdst03Uc57dl+PI/t9j3umwsILWKSTDb2GgJc=
github.com/streamdal/protos v0.0.125/go.mod h1:ciPOA0/x5PG4yxOdwBtLFPwyAKwecMkuEitO3csbB7A=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down
Loading

0 comments on commit 77806a8

Please sign in to comment.