From 4d5d8c0d6aa1dfbd408150242deccb4876f30530 Mon Sep 17 00:00:00 2001 From: Mark G Date: Fri, 12 Jan 2024 14:31:44 -0500 Subject: [PATCH 1/5] Dynamic Transforms --- go.mod | 2 +- go.sum | 10 +---- go_sdk.go | 13 ++++-- wasm_test.go | 121 +++++++++++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 130 insertions(+), 16 deletions(-) diff --git a/go.mod b/go.mod index 0a37579..9766014 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/onsi/gomega v1.30.0 github.com/pkg/errors v0.9.1 github.com/relistan/go-director v0.0.0-20200406104025-dbbf5d95248d - github.com/streamdal/streamdal/libs/protos v0.1.12 + github.com/streamdal/streamdal/libs/protos v0.1.13 github.com/tetratelabs/wazero v1.6.0 golang.org/x/time v0.5.0 google.golang.org/grpc v1.60.1 diff --git a/go.sum b/go.sum index c102e87..6253892 100644 --- a/go.sum +++ b/go.sum @@ -33,10 +33,8 @@ github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykE github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= -github.com/streamdal/streamdal/libs/protos v0.1.11 h1:PgGS2YHI3X7sHeazQKWjh1VMENW3klMF5NFcKtpsL3M= -github.com/streamdal/streamdal/libs/protos v0.1.11/go.mod h1:WZ6qCqzJu/9Vn+P5EUMN1hcOWlxN2EHgcsxehylhgJQ= -github.com/streamdal/streamdal/libs/protos v0.1.12 h1:U0301m8/EYRHW6tCIk7lIJM86iMQVcoXAqIWdOayKkQ= -github.com/streamdal/streamdal/libs/protos v0.1.12/go.mod h1:WZ6qCqzJu/9Vn+P5EUMN1hcOWlxN2EHgcsxehylhgJQ= +github.com/streamdal/streamdal/libs/protos v0.1.13 h1:VQdAFemjTxa4ddguk050hoTKtw6gSqPqt9KThs7MKS4= +github.com/streamdal/streamdal/libs/protos v0.1.13/go.mod h1:WZ6qCqzJu/9Vn+P5EUMN1hcOWlxN2EHgcsxehylhgJQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -44,8 +42,6 @@ github.com/tetratelabs/wazero v1.6.0 h1:z0H1iikCdP8t+q341xqepY4EWvHEw8Es7tlqiVzl github.com/tetratelabs/wazero v1.6.0/go.mod h1:0U0G41+ochRKoPKCJlh0jMg1CHkyfK8kDqiirMmKY8A= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= -golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -60,8 +56,6 @@ golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.16.1 h1:TLyB3WofjdOEepBHAU20JdNC1Zbg87elYofWYAY5oZA= golang.org/x/tools v0.16.1/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 h1:6G8oQ016D88m1xAKljMlBOOGWDZkes4kMhgGFlf8WcQ= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917/go.mod h1:xtjpI3tXFPP051KaWnhvxkiubL/6dJ18vLVf7q2pTOU= google.golang.org/genproto/googleapis/rpc v0.0.0-20240108191215-35c7eff3a6b1 h1:gphdwh0npgs8elJ4T6J+DQJHPVF7RsuJHCfwztUb4J4= google.golang.org/genproto/googleapis/rpc v0.0.0-20240108191215-35c7eff3a6b1/go.mod h1:daQN87bsDqDoe316QbbvX60nMoJQa4r6Ds0ZuoAe5yA= google.golang.org/grpc v1.60.1 h1:26+wFr+cNqSGFcOXcabYC0lUVJVRa2Sb2ortSK7VrEU= diff --git a/go_sdk.go b/go_sdk.go index 6668ea5..f509f98 100644 --- a/go_sdk.go +++ b/go_sdk.go @@ -461,7 +461,7 @@ func (s *Streamdal) heartbeat(loop *director.TimedLooper) { }) } -func (s *Streamdal) runStep(ctx context.Context, aud *protos.Audience, step *protos.PipelineStep, data []byte) (*protos.WASMResponse, error) { +func (s *Streamdal) runStep(ctx context.Context, aud *protos.Audience, step *protos.PipelineStep, data []byte, isr *protos.InterStepResult) (*protos.WASMResponse, error) { s.config.Logger.Debugf("Running step '%s'", step.Name) // Get WASM module @@ -477,8 +477,9 @@ func (s *Streamdal) runStep(ctx context.Context, aud *protos.Audience, step *pro step.XWasmBytes = nil req := &protos.WASMRequest{ - InputPayload: data, - Step: step, + InputPayload: data, + Step: step, + InterStepResult: isr, } reqBytes, err := proto.Marshal(req) @@ -607,6 +608,8 @@ func (s *Streamdal) Process(ctx context.Context, req *ProcessRequest) *ProcessRe PIPELINE: for _, p := range pipelines { + var isr *protos.InterStepResult + pipelineTimeoutCtx, pipelineTimeoutCxl := context.WithTimeout(ctx, s.config.PipelineTimeout) pipelineStatus := &protos.PipelineStatus{ @@ -644,7 +647,7 @@ PIPELINE: // NOOP } - wasmResp, err := s.runStep(stepTimeoutCtx, aud, step, resp.Data) + wasmResp, err := s.runStep(stepTimeoutCtx, aud, step, resp.Data, isr) if err != nil { stepTimeoutCxl() @@ -684,6 +687,8 @@ PIPELINE: resp.Data = wasmResp.OutputPayload } + isr = wasmResp.InterStepResult // Pass inter-step result to next step + // Check on success and on-failures switch wasmResp.ExitCode { case protos.WASMExitCode_WASM_EXIT_CODE_SUCCESS: diff --git a/wasm_test.go b/wasm_test.go index 2a58a26..f599d61 100644 --- a/wasm_test.go +++ b/wasm_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "os" "sync" + "time" "github.com/google/uuid" . "github.com/onsi/ginkgo/v2" @@ -13,6 +14,10 @@ import ( "github.com/streamdal/streamdal/libs/protos/build/go/protos" "github.com/streamdal/streamdal/libs/protos/build/go/protos/steps" + + "github.com/streamdal/go-sdk/logger" + "github.com/streamdal/go-sdk/metrics/metricsfakes" + "github.com/streamdal/go-sdk/server/serverfakes" ) var _ = Describe("WASM Modules", func() { @@ -392,11 +397,12 @@ var _ = Describe("WASM Modules", func() { err = proto.Unmarshal(res, wasmResp) Expect(err).ToNot(HaveOccurred()) Expect(wasmResp).ToNot(BeNil()) + Expect(wasmResp.ExitMsg).To(Equal("")) Expect(wasmResp.ExitCode).To(Equal(protos.WASMExitCode_WASM_EXIT_CODE_SUCCESS)) Expect(wasmResp.OutputPayload).Should(MatchJSON(`{"object": {"type": "str", "cc_num": "1234"}}`)) }) - It("truncates a field by total length", func() { + It("truncates a field by percent length", func() { wasmData, err := os.ReadFile("test-assets/wasm/transform.wasm") Expect(err).ToNot(HaveOccurred()) @@ -408,8 +414,8 @@ var _ = Describe("WASM Modules", func() { Options: &steps.TransformStep_TruncateOptions{ TruncateOptions: &steps.TransformTruncateOptions{ Path: "object.type", - Type: steps.TransformTruncateType_TRANSFORM_TRUNCATE_TYPE_LENGTH, - Value: 3, + Type: steps.TransformTruncateType_TRANSFORM_TRUNCATE_TYPE_PERCENTAGE, + Value: 50, }, }, }, @@ -619,4 +625,113 @@ var _ = Describe("WASM Modules", func() { // TODO: additional tests for each transform type }) + + Context("inter step result", func() { + FIt("finds and transforms PII in a payload without a path", func() { + // TODO: update this test when we can support multiple transforms in a single step + payload := []byte(`{ + "users": [ + { + "name": "Bob", + "email": "bob@streamdal.com" + } + ] +}`) + detectiveWASM, err := os.ReadFile("test-assets/wasm/detective.wasm") + Expect(err).ToNot(HaveOccurred()) + transformWASM, err := os.ReadFile("test-assets/wasm/transform.wasm") + Expect(err).ToNot(HaveOccurred()) + + pipeline := &protos.Pipeline{ + Id: uuid.New().String(), + Name: "Test Pipeline", + Steps: []*protos.PipelineStep{ + { + Name: "Find Email", + XWasmId: stringPtr(uuid.New().String()), + XWasmBytes: detectiveWASM, + XWasmFunction: stringPtr("f"), + OnSuccess: make([]protos.PipelineStepCondition, 0), + OnFailure: []protos.PipelineStepCondition{protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_ABORT_ALL}, + Step: &protos.PipelineStep_Detective{ + Detective: &steps.DetectiveStep{ + Path: stringPtr(""), // No path, we're searching the entire payload + Negate: boolPtr(false), + Type: steps.DetectiveType_DETECTIVE_TYPE_PII_EMAIL, + }, + }, + }, + { + Dynamic: true, + Name: "Transform Email", + XWasmId: stringPtr(uuid.New().String()), + XWasmBytes: transformWASM, + XWasmFunction: stringPtr("f"), + OnSuccess: make([]protos.PipelineStepCondition, 0), + OnFailure: []protos.PipelineStepCondition{protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_ABORT_ALL}, + Step: &protos.PipelineStep_Transform{ + Transform: &steps.TransformStep{ + Type: steps.TransformType_TRANSFORM_TYPE_REPLACE_VALUE, + Options: &steps.TransformStep_ReplaceValueOptions{ + ReplaceValueOptions: &steps.TransformReplaceValueOptions{ + Path: "", // No path, we're getting the result from the detective step + Value: `"REDACTED"`, + }, + }, + }, + }, + }, + }, + } + + aud := &protos.Audience{ + ServiceName: "mysvc1", + ComponentName: "kafka", + OperationType: protos.OperationType_OPERATION_TYPE_PRODUCER, + OperationName: "mytopic", + } + + s := &Streamdal{ + serverClient: &serverfakes.FakeIServerClient{}, + functionsMtx: &sync.RWMutex{}, + functions: map[string]*function{}, + audiencesMtx: &sync.RWMutex{}, + audiences: map[string]struct{}{}, + tails: map[string]map[string]*Tail{}, + tailsMtx: &sync.RWMutex{}, + config: &Config{ + ServiceName: "mysvc1", + Logger: &logger.TinyLogger{}, + StepTimeout: time.Millisecond * 1000, + PipelineTimeout: time.Millisecond * 1000, + }, + metrics: &metricsfakes.FakeIMetrics{}, + pipelinesMtx: &sync.RWMutex{}, + pipelines: map[string]map[string]*protos.Command{ + audToStr(aud): { + pipeline.Id: { + Audience: aud, + Command: &protos.Command_AttachPipeline{ + AttachPipeline: &protos.AttachPipelineCommand{ + Pipeline: pipeline, + }, + }, + }, + }, + }, + } + + resp := s.Process(context.Background(), &ProcessRequest{ + ComponentName: aud.ComponentName, + OperationType: OperationType(aud.OperationType), + OperationName: aud.OperationName, + Data: payload, + }) + + Expect(resp.Error).To(BeFalse()) + Expect(resp.ErrorMessage).To(Equal("")) + Expect(resp.Data).To(MatchJSON(`{"users": [{"name":"Bob","email":"REDACTED"}]}`)) + }) + + }) }) From ece09aa0006a85081f6c6a2905f4c2ce5d808670 Mon Sep 17 00:00:00 2001 From: Mark G Date: Fri, 12 Jan 2024 14:35:07 -0500 Subject: [PATCH 2/5] Remove focus on test --- wasm_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wasm_test.go b/wasm_test.go index f599d61..ec99450 100644 --- a/wasm_test.go +++ b/wasm_test.go @@ -627,7 +627,7 @@ var _ = Describe("WASM Modules", func() { }) Context("inter step result", func() { - FIt("finds and transforms PII in a payload without a path", func() { + It("finds and transforms PII in a payload without a path", func() { // TODO: update this test when we can support multiple transforms in a single step payload := []byte(`{ "users": [ From 24b5207951156793788afddebf04b58415122770 Mon Sep 17 00:00:00 2001 From: Mark G Date: Fri, 12 Jan 2024 14:42:24 -0500 Subject: [PATCH 3/5] Update wasm_test.go --- wasm_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/wasm_test.go b/wasm_test.go index ec99450..8283851 100644 --- a/wasm_test.go +++ b/wasm_test.go @@ -397,7 +397,6 @@ var _ = Describe("WASM Modules", func() { err = proto.Unmarshal(res, wasmResp) Expect(err).ToNot(HaveOccurred()) Expect(wasmResp).ToNot(BeNil()) - Expect(wasmResp.ExitMsg).To(Equal("")) Expect(wasmResp.ExitCode).To(Equal(protos.WASMExitCode_WASM_EXIT_CODE_SUCCESS)) Expect(wasmResp.OutputPayload).Should(MatchJSON(`{"object": {"type": "str", "cc_num": "1234"}}`)) }) @@ -438,7 +437,7 @@ var _ = Describe("WASM Modules", func() { Expect(err).ToNot(HaveOccurred()) Expect(wasmResp).ToNot(BeNil()) Expect(wasmResp.ExitCode).To(Equal(protos.WASMExitCode_WASM_EXIT_CODE_SUCCESS)) - Expect(wasmResp.OutputPayload).Should(MatchJSON(`{"object": {"type": "str", "cc_num": "1234"}}`)) + Expect(wasmResp.OutputPayload).Should(MatchJSON(`{"object": {"type": "stre", "cc_num": "1234"}}`)) }) }) From 36a6abb528b2ac6676aa0f1454ca767f24e5e67f Mon Sep 17 00:00:00 2001 From: Mark G Date: Wed, 17 Jan 2024 13:00:39 -0500 Subject: [PATCH 4/5] Test for multiple detective results/transforms --- wasm_test.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/wasm_test.go b/wasm_test.go index 8283851..b764862 100644 --- a/wasm_test.go +++ b/wasm_test.go @@ -627,12 +627,15 @@ var _ = Describe("WASM Modules", func() { Context("inter step result", func() { It("finds and transforms PII in a payload without a path", func() { - // TODO: update this test when we can support multiple transforms in a single step payload := []byte(`{ "users": [ { "name": "Bob", "email": "bob@streamdal.com" + }, + { + "name": "Mary", + "email": "mary@streamdal.com" } ] }`) @@ -729,7 +732,7 @@ var _ = Describe("WASM Modules", func() { Expect(resp.Error).To(BeFalse()) Expect(resp.ErrorMessage).To(Equal("")) - Expect(resp.Data).To(MatchJSON(`{"users": [{"name":"Bob","email":"REDACTED"}]}`)) + Expect(resp.Data).To(MatchJSON(`{"users": [{"name":"Bob","email":"REDACTED"},{"name":"Mary","email":"REDACTED"}]}`)) }) }) From 71852ec46238e1c973311c4d81de2f900a98f8f3 Mon Sep 17 00:00:00 2001 From: Mark G Date: Wed, 17 Jan 2024 15:57:15 -0500 Subject: [PATCH 5/5] Bump protos --- go.mod | 4 ++-- go.sum | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 9766014..b1c12cd 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/onsi/gomega v1.30.0 github.com/pkg/errors v0.9.1 github.com/relistan/go-director v0.0.0-20200406104025-dbbf5d95248d - github.com/streamdal/streamdal/libs/protos v0.1.13 + github.com/streamdal/streamdal/libs/protos v0.1.14 github.com/tetratelabs/wazero v1.6.0 golang.org/x/time v0.5.0 google.golang.org/grpc v1.60.1 @@ -25,6 +25,6 @@ require ( golang.org/x/sys v0.16.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/tools v0.16.1 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240108191215-35c7eff3a6b1 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 6253892..28ac465 100644 --- a/go.sum +++ b/go.sum @@ -33,8 +33,8 @@ github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykE github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= -github.com/streamdal/streamdal/libs/protos v0.1.13 h1:VQdAFemjTxa4ddguk050hoTKtw6gSqPqt9KThs7MKS4= -github.com/streamdal/streamdal/libs/protos v0.1.13/go.mod h1:WZ6qCqzJu/9Vn+P5EUMN1hcOWlxN2EHgcsxehylhgJQ= +github.com/streamdal/streamdal/libs/protos v0.1.14 h1:DK1vPmBrX6fLy6uCmNJsonZdqp3zs3zXDCOaRwo3s6o= +github.com/streamdal/streamdal/libs/protos v0.1.14/go.mod h1:WZ6qCqzJu/9Vn+P5EUMN1hcOWlxN2EHgcsxehylhgJQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -56,8 +56,8 @@ golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.16.1 h1:TLyB3WofjdOEepBHAU20JdNC1Zbg87elYofWYAY5oZA= golang.org/x/tools v0.16.1/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240108191215-35c7eff3a6b1 h1:gphdwh0npgs8elJ4T6J+DQJHPVF7RsuJHCfwztUb4J4= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240108191215-35c7eff3a6b1/go.mod h1:daQN87bsDqDoe316QbbvX60nMoJQa4r6Ds0ZuoAe5yA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac h1:nUQEQmH/csSvFECKYRv6HWEyypysidKl2I6Qpsglq/0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac/go.mod h1:daQN87bsDqDoe316QbbvX60nMoJQa4r6Ds0ZuoAe5yA= google.golang.org/grpc v1.60.1 h1:26+wFr+cNqSGFcOXcabYC0lUVJVRa2Sb2ortSK7VrEU= google.golang.org/grpc v1.60.1/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz4eGM= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=