diff --git a/function.go b/function.go index 07b31fc..1ac9be4 100644 --- a/function.go +++ b/function.go @@ -68,6 +68,11 @@ func (f *function) Exec(ctx context.Context, req []byte) ([]byte, error) { return nil, errors.Wrap(err, "unable to read memory") } + // Dealloc response memory + if _, err := f.dealloc.Call(ctx, uint64(resultPtr), uint64(resultSize)); err != nil { + return nil, errors.Wrap(err, "unable to deallocate memory") + } + return resBytes, nil } diff --git a/wasm_test.go b/wasm_test.go index 198a512..ed3dacd 100644 --- a/wasm_test.go +++ b/wasm_test.go @@ -141,4 +141,78 @@ var _ = Describe("WASM Modules", func() { Expect(wasmResp.ExitMsg).To(ContainSubstring("inferred fresh schema")) }) }) + + Context("Detective", func() { + var s *Streamdal + var req *protos.WASMRequest + var f *function + + wasmData, err := os.ReadFile("test-assets/wasm/detective.wasm") + Expect(err).ToNot(HaveOccurred()) + + BeforeEach(func() { + req = &protos.WASMRequest{ + Step: &protos.PipelineStep{ + Step: &protos.PipelineStep_Detective{ + Detective: &steps.DetectiveStep{ + Path: stringPtr("object.type"), + Args: []string{"streamdal"}, + Type: steps.DetectiveType_DETECTIVE_TYPE_STRING_CONTAINS_ANY, + Negate: boolPtr(false), + }, + }, + XWasmId: stringPtr(uuid.New().String()), + XWasmFunction: stringPtr("f"), + XWasmBytes: wasmData, + }, + InputPayload: nil, + } + + s = &Streamdal{ + pipelinesMtx: &sync.RWMutex{}, + pipelines: map[string]map[string]*protos.Command{}, + audiencesMtx: &sync.RWMutex{}, + audiences: map[string]struct{}{}, + } + + f, err = s.createFunction(req.Step) + Expect(err).ToNot(HaveOccurred()) + + req.Step.XWasmBytes = nil + }) + + It("returns success on string contains any", func() { + req.InputPayload = []byte(`{"object": {"type": "streamdal"}}`) + + data, err := proto.Marshal(req) + Expect(err).ToNot(HaveOccurred()) + + res, err := f.Exec(context.Background(), data) + Expect(err).ToNot(HaveOccurred()) + + wasmResp := &protos.WASMResponse{} + + err = proto.Unmarshal(res, wasmResp) + Expect(err).ToNot(HaveOccurred()) + Expect(wasmResp).ToNot(BeNil()) + Expect(wasmResp.ExitCode).To(Equal(protos.WASMExitCode_WASM_EXIT_CODE_SUCCESS)) + }) + + It("returns failure on string contains any", func() { + req.InputPayload = []byte(`{"object": {"type": "microsoft"}}`) + + data, err := proto.Marshal(req) + Expect(err).ToNot(HaveOccurred()) + + res, err := f.Exec(context.Background(), data) + Expect(err).ToNot(HaveOccurred()) + + wasmResp := &protos.WASMResponse{} + + err = proto.Unmarshal(res, wasmResp) + Expect(err).ToNot(HaveOccurred()) + Expect(wasmResp).ToNot(BeNil()) + Expect(wasmResp.ExitCode).To(Equal(protos.WASMExitCode_WASM_EXIT_CODE_FAILURE)) + }) + }) })