Skip to content
This repository has been archived by the owner on Feb 9, 2024. It is now read-only.

Commit

Permalink
Merge pull request #53 from streamdal/blinktag/fix_mem_leak
Browse files Browse the repository at this point in the history
Fix mem leak
  • Loading branch information
blinktag authored Nov 28, 2023
2 parents 0f0c565 + c2dcaf2 commit 3530a65
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 0 deletions.
5 changes: 5 additions & 0 deletions function.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ func (f *function) Exec(ctx context.Context, req []byte) ([]byte, error) {
return nil, errors.Wrap(err, "unable to read memory")
}

// Dealloc response memory
if _, err := f.dealloc.Call(ctx, uint64(resultPtr), uint64(resultSize)); err != nil {
return nil, errors.Wrap(err, "unable to deallocate memory")
}

return resBytes, nil
}

Expand Down
74 changes: 74 additions & 0 deletions wasm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,78 @@ var _ = Describe("WASM Modules", func() {
Expect(wasmResp.ExitMsg).To(ContainSubstring("inferred fresh schema"))
})
})

Context("Detective", func() {
var s *Streamdal
var req *protos.WASMRequest
var f *function

wasmData, err := os.ReadFile("test-assets/wasm/detective.wasm")
Expect(err).ToNot(HaveOccurred())

BeforeEach(func() {
req = &protos.WASMRequest{
Step: &protos.PipelineStep{
Step: &protos.PipelineStep_Detective{
Detective: &steps.DetectiveStep{
Path: stringPtr("object.type"),
Args: []string{"streamdal"},
Type: steps.DetectiveType_DETECTIVE_TYPE_STRING_CONTAINS_ANY,
Negate: boolPtr(false),
},
},
XWasmId: stringPtr(uuid.New().String()),
XWasmFunction: stringPtr("f"),
XWasmBytes: wasmData,
},
InputPayload: nil,
}

s = &Streamdal{
pipelinesMtx: &sync.RWMutex{},
pipelines: map[string]map[string]*protos.Command{},
audiencesMtx: &sync.RWMutex{},
audiences: map[string]struct{}{},
}

f, err = s.createFunction(req.Step)
Expect(err).ToNot(HaveOccurred())

req.Step.XWasmBytes = nil
})

It("returns success on string contains any", func() {
req.InputPayload = []byte(`{"object": {"type": "streamdal"}}`)

data, err := proto.Marshal(req)
Expect(err).ToNot(HaveOccurred())

res, err := f.Exec(context.Background(), data)
Expect(err).ToNot(HaveOccurred())

wasmResp := &protos.WASMResponse{}

err = proto.Unmarshal(res, wasmResp)
Expect(err).ToNot(HaveOccurred())
Expect(wasmResp).ToNot(BeNil())
Expect(wasmResp.ExitCode).To(Equal(protos.WASMExitCode_WASM_EXIT_CODE_SUCCESS))
})

It("returns failure on string contains any", func() {
req.InputPayload = []byte(`{"object": {"type": "microsoft"}}`)

data, err := proto.Marshal(req)
Expect(err).ToNot(HaveOccurred())

res, err := f.Exec(context.Background(), data)
Expect(err).ToNot(HaveOccurred())

wasmResp := &protos.WASMResponse{}

err = proto.Unmarshal(res, wasmResp)
Expect(err).ToNot(HaveOccurred())
Expect(wasmResp).ToNot(BeNil())
Expect(wasmResp.ExitCode).To(Equal(protos.WASMExitCode_WASM_EXIT_CODE_FAILURE))
})
})
})

0 comments on commit 3530a65

Please sign in to comment.