From 98f09a42c9a2d7e005680ff7a2aef35fbfdcded0 Mon Sep 17 00:00:00 2001 From: Mark G Date: Mon, 20 Nov 2023 09:29:53 -0500 Subject: [PATCH 1/3] Fix mem leak --- function.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/function.go b/function.go index 07b31fc..e557aed 100644 --- a/function.go +++ b/function.go @@ -62,6 +62,11 @@ func (f *function) Exec(ctx context.Context, req []byte) ([]byte, error) { return nil, errors.Wrap(err, "unable to deallocate memory") } + // Dealloc response memory + if _, err := f.dealloc.Call(ctx, uint64(resultPtr), uint64(resultSize)); err != nil { + return nil, errors.Wrap(err, "unable to deallocate memory") + } + // Read memory starting from result ptr resBytes, err := f.readMemory(resultPtr, resultSize) if err != nil { From 82463bf5a8027e42bac13d276b677b81b17c09df Mon Sep 17 00:00:00 2001 From: Mark G Date: Mon, 20 Nov 2023 10:27:32 -0500 Subject: [PATCH 2/3] Move dealloc --- function.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/function.go b/function.go index e557aed..1ac9be4 100644 --- a/function.go +++ b/function.go @@ -62,17 +62,17 @@ func (f *function) Exec(ctx context.Context, req []byte) ([]byte, error) { return nil, errors.Wrap(err, "unable to deallocate memory") } - // Dealloc response memory - if _, err := f.dealloc.Call(ctx, uint64(resultPtr), uint64(resultSize)); err != nil { - return nil, errors.Wrap(err, "unable to deallocate memory") - } - // Read memory starting from result ptr resBytes, err := f.readMemory(resultPtr, resultSize) if err != nil { return nil, errors.Wrap(err, "unable to read memory") } + // Dealloc response memory + if _, err := f.dealloc.Call(ctx, uint64(resultPtr), uint64(resultSize)); err != nil { + return nil, errors.Wrap(err, "unable to deallocate memory") + } + return resBytes, nil } From c2dcaf292a9692366b1f398e011bb875c0af1b4e Mon Sep 17 00:00:00 2001 From: Mark G Date: Mon, 20 Nov 2023 10:44:39 -0500 Subject: [PATCH 3/3] Adding more explicit wasm tests for detective --- wasm_test.go | 74 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/wasm_test.go b/wasm_test.go index 198a512..ed3dacd 100644 --- a/wasm_test.go +++ b/wasm_test.go @@ -141,4 +141,78 @@ var _ = Describe("WASM Modules", func() { Expect(wasmResp.ExitMsg).To(ContainSubstring("inferred fresh schema")) }) }) + + Context("Detective", func() { + var s *Streamdal + var req *protos.WASMRequest + var f *function + + wasmData, err := os.ReadFile("test-assets/wasm/detective.wasm") + Expect(err).ToNot(HaveOccurred()) + + BeforeEach(func() { + req = &protos.WASMRequest{ + Step: &protos.PipelineStep{ + Step: &protos.PipelineStep_Detective{ + Detective: &steps.DetectiveStep{ + Path: stringPtr("object.type"), + Args: []string{"streamdal"}, + Type: steps.DetectiveType_DETECTIVE_TYPE_STRING_CONTAINS_ANY, + Negate: boolPtr(false), + }, + }, + XWasmId: stringPtr(uuid.New().String()), + XWasmFunction: stringPtr("f"), + XWasmBytes: wasmData, + }, + InputPayload: nil, + } + + s = &Streamdal{ + pipelinesMtx: &sync.RWMutex{}, + pipelines: map[string]map[string]*protos.Command{}, + audiencesMtx: &sync.RWMutex{}, + audiences: map[string]struct{}{}, + } + + f, err = s.createFunction(req.Step) + Expect(err).ToNot(HaveOccurred()) + + req.Step.XWasmBytes = nil + }) + + It("returns success on string contains any", func() { + req.InputPayload = []byte(`{"object": {"type": "streamdal"}}`) + + data, err := proto.Marshal(req) + Expect(err).ToNot(HaveOccurred()) + + res, err := f.Exec(context.Background(), data) + Expect(err).ToNot(HaveOccurred()) + + wasmResp := &protos.WASMResponse{} + + err = proto.Unmarshal(res, wasmResp) + Expect(err).ToNot(HaveOccurred()) + Expect(wasmResp).ToNot(BeNil()) + Expect(wasmResp.ExitCode).To(Equal(protos.WASMExitCode_WASM_EXIT_CODE_SUCCESS)) + }) + + It("returns failure on string contains any", func() { + req.InputPayload = []byte(`{"object": {"type": "microsoft"}}`) + + data, err := proto.Marshal(req) + Expect(err).ToNot(HaveOccurred()) + + res, err := f.Exec(context.Background(), data) + Expect(err).ToNot(HaveOccurred()) + + wasmResp := &protos.WASMResponse{} + + err = proto.Unmarshal(res, wasmResp) + Expect(err).ToNot(HaveOccurred()) + Expect(wasmResp).ToNot(BeNil()) + Expect(wasmResp.ExitCode).To(Equal(protos.WASMExitCode_WASM_EXIT_CODE_FAILURE)) + }) + }) })