From fb962f32a38bc57f8946f12d9b7523be500a76b0 Mon Sep 17 00:00:00 2001 From: Mark G Date: Thu, 7 Dec 2023 13:29:44 -0600 Subject: [PATCH 1/7] Update process to return new response format --- go.mod | 2 +- go.sum | 4 +- go_sdk.go | 173 ++++++++++++++++++++++++++++++++---------- go_sdk_test.go | 201 +++++++++++++++++++++++++++---------------------- wasm_test.go | 39 ++++++++++ 5 files changed, 288 insertions(+), 131 deletions(-) diff --git a/go.mod b/go.mod index af24b03..3489233 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/onsi/gomega v1.28.0 github.com/pkg/errors v0.9.1 github.com/relistan/go-director v0.0.0-20200406104025-dbbf5d95248d - github.com/streamdal/protos v0.0.124 + github.com/streamdal/protos v0.0.125 github.com/tetratelabs/wazero v1.5.0 golang.org/x/time v0.5.0 google.golang.org/grpc v1.59.0 diff --git a/go.sum b/go.sum index 138b301..6ac413f 100644 --- a/go.sum +++ b/go.sum @@ -37,8 +37,8 @@ github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykE github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= -github.com/streamdal/protos v0.0.124 h1:vI6Qj/ySJRRaG2IzfvKhrqA0NcUDGtqinCZYiatNf6k= -github.com/streamdal/protos v0.0.124/go.mod h1:ciPOA0/x5PG4yxOdwBtLFPwyAKwecMkuEitO3csbB7A= +github.com/streamdal/protos v0.0.125 h1:S2EFSaNdst03Uc57dl+PI/t9j3umwsILWKSTDb2GgJc= +github.com/streamdal/protos v0.0.125/go.mod h1:ciPOA0/x5PG4yxOdwBtLFPwyAKwecMkuEitO3csbB7A= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= diff --git a/go_sdk.go b/go_sdk.go index ae1297a..5bd7fd7 100644 --- a/go_sdk.go +++ b/go_sdk.go @@ -43,6 +43,9 @@ type OperationType int // ClientType is used to indicate if this library is being used by a shim or directly (as an SDK) type ClientType int +// ProcessResponse is the response struct from a Process() call +type ProcessResponse protos.SDKResponse + const ( // DefaultPipelineTimeoutDurationStr is the default timeout for a pipeline execution DefaultPipelineTimeoutDurationStr = "100ms" @@ -86,7 +89,7 @@ var ( type IStreamdal interface { // Process is used to run data pipelines against data - Process(ctx context.Context, req *ProcessRequest) (*ProcessResponse, error) + Process(ctx context.Context, req *ProcessRequest) *ProcessResponse } // Streamdal is the main struct for this library @@ -184,12 +187,6 @@ type ProcessRequest struct { Data []byte } -// ProcessResponse is used to maintain a consistent API for the return of a Process() call -// We will introduce additional fields in this struct in the future -type ProcessResponse struct { - Data []byte -} - func New(cfg *Config) (*Streamdal, error) { if err := validateConfig(cfg); err != nil { return nil, errors.Wrap(err, "unable to validate config") @@ -538,9 +535,26 @@ func (s *Streamdal) getCounterLabels(req *ProcessRequest, pipeline *protos.Pipel return l } -func (s *Streamdal) Process(ctx context.Context, req *ProcessRequest) (*ProcessResponse, error) { +func (s *Streamdal) Process(ctx context.Context, req *ProcessRequest) *ProcessResponse { if err := validateProcessRequest(req); err != nil { - return nil, errors.Wrap(err, "invalid process request") + resp := &ProcessResponse{ + Error: true, + ErrorMessage: err.Error(), + PipelineStatus: make([]*protos.PipelineStatus, 0), + } + + if req != nil { + resp.Data = req.Data + } + + return resp + } + + resp := &ProcessResponse{ + Data: req.Data, + Error: false, + ErrorMessage: "", + PipelineStatus: make([]*protos.PipelineStatus, 0), } data := req.Data @@ -577,76 +591,151 @@ func (s *Streamdal) Process(ctx context.Context, req *ProcessRequest) (*ProcessR s.sendTail(aud, "", data, data) // No pipelines for this mode, nothing to do - return &ProcessResponse{Data: data}, nil + return resp } if payloadSize > MaxWASMPayloadSize { _ = s.metrics.Incr(ctx, &types.CounterEntry{Name: counterError, Labels: s.getCounterLabels(req, nil), Value: 1, Audience: aud}) - s.config.Logger.Warn(ErrMaxPayloadSizeExceeded) - return nil, ErrMaxPayloadSizeExceeded + resp.Error = true + resp.ErrorMessage = ErrMaxPayloadSizeExceeded.Error() + + return resp } originalData := data // Used for tail request PIPELINE: for _, p := range pipelines { + pipelineTimeoutCtx, pipelineTimeoutCxl := context.WithTimeout(ctx, s.config.PipelineTimeout) + + pipelineStatus := &protos.PipelineStatus{ + Id: p.GetAttachPipeline().GetPipeline().Id, + Name: p.GetAttachPipeline().GetPipeline().Name, + StepStatus: make([]*protos.StepStatus, 0), + } pipeline := p.GetAttachPipeline().GetPipeline() _ = s.metrics.Incr(ctx, &types.CounterEntry{Name: counterProcessed, Labels: s.getCounterLabels(req, pipeline), Value: 1, Audience: aud}) _ = s.metrics.Incr(ctx, &types.CounterEntry{Name: counterBytes, Labels: s.getCounterLabels(req, pipeline), Value: payloadSize, Audience: aud}) - // If a step - timeoutCtx, timeoutCxl := context.WithTimeout(ctx, s.config.PipelineTimeout) - for _, step := range pipeline.Steps { + stepTimeoutCtx, stepTimeoutCxl := context.WithTimeout(ctx, s.config.StepTimeout) + + stepStatus := &protos.StepStatus{ + Name: step.Name, + Error: false, + ErrorMessage: "", + AbortStatus: protos.AbortStatus_ABORT_STATUS_UNSET, + } select { - case <-timeoutCtx.Done(): - timeoutCxl() + case <-pipelineTimeoutCtx.Done(): + pipelineTimeoutCxl() + stepTimeoutCxl() + s.config.Logger.Errorf("pipeline '%s' timeout exceeded", pipeline.Name) + stepStatus.Error = true + stepStatus.ErrorMessage = "Step timeout exceeded" + pipelineStatus.StepStatus = append(pipelineStatus.StepStatus, stepStatus) + resp.PipelineStatus = append(resp.PipelineStatus, pipelineStatus) continue PIPELINE default: // NOOP } - wasmResp, err := s.runStep(timeoutCtx, aud, step, data) + wasmResp, err := s.runStep(stepTimeoutCtx, aud, step, data) if err != nil { + stepTimeoutCxl() + err = fmt.Errorf("failed to run step '%s': %s", step.Name, err) s.config.Logger.Error(err) - shouldContinue := s.handleConditions(ctx, step.OnFailure, pipeline, step, aud, req) - if !shouldContinue { - timeoutCxl() + + stepStatus.Error = true + stepStatus.ErrorMessage = err.Error() + + continuePipeline, continueProcess := s.handleConditions(ctx, step.OnFailure, pipeline, step, aud, req) + if !continueProcess { + pipelineTimeoutCxl() + s.config.Logger.Debugf("Step '%s' failed to run, aborting pipeline", step.Name) + + stepStatus.AbortStatus = protos.AbortStatus_ABORT_STATUS_ALL + pipelineStatus.StepStatus = append(pipelineStatus.StepStatus, stepStatus) + resp.PipelineStatus = append(resp.PipelineStatus, pipelineStatus) + return resp + } else if !continuePipeline { + pipelineTimeoutCxl() + + stepStatus.AbortStatus = protos.AbortStatus_ABORT_STATUS_CURRENT + pipelineStatus.StepStatus = append(pipelineStatus.StepStatus, stepStatus) + resp.PipelineStatus = append(resp.PipelineStatus, pipelineStatus) continue PIPELINE } // wasmResp will be nil, so don't allow code below to execute - continue + pipelineStatus.StepStatus = append(pipelineStatus.StepStatus, stepStatus) + continue // Step } // Check on success and on-failures switch wasmResp.ExitCode { case protos.WASMExitCode_WASM_EXIT_CODE_SUCCESS: + stepTimeoutCxl() s.config.Logger.Debugf("Step '%s' returned exit code success", step.Name) - shouldContinue := s.handleConditions(ctx, step.OnSuccess, pipeline, step, aud, req) - if !shouldContinue { - timeoutCxl() + continuePipeline, continueProcess := s.handleConditions(ctx, step.OnSuccess, pipeline, step, aud, req) + if !continueProcess { + pipelineTimeoutCxl() + s.config.Logger.Debugf("Step '%s' returned exit code success but step "+ - "condition failed, aborting pipeline", step.Name) + "condition failed, aborting further pipelines", step.Name) + + stepStatus.AbortStatus = protos.AbortStatus_ABORT_STATUS_ALL + pipelineStatus.StepStatus = append(pipelineStatus.StepStatus, stepStatus) + resp.PipelineStatus = append(resp.PipelineStatus, pipelineStatus) + return resp + } else if !continuePipeline { + pipelineTimeoutCxl() + + s.config.Logger.Debugf("Step '%s' returned exit code success but step "+ + "condition failed, aborting step and continuing pipelines", step.Name) + + stepStatus.AbortStatus = protos.AbortStatus_ABORT_STATUS_CURRENT + pipelineStatus.StepStatus = append(pipelineStatus.StepStatus, stepStatus) + resp.PipelineStatus = append(resp.PipelineStatus, pipelineStatus) continue PIPELINE } case protos.WASMExitCode_WASM_EXIT_CODE_FAILURE: fallthrough case protos.WASMExitCode_WASM_EXIT_CODE_INTERNAL_ERROR: + stepTimeoutCxl() + + stepStatus.Error = true + stepStatus.ErrorMessage = "Step failed: " + wasmResp.ExitMsg + s.config.Logger.Errorf("Step '%s' returned exit code '%s'", step.Name, wasmResp.ExitCode.String()) _ = s.metrics.Incr(ctx, &types.CounterEntry{Name: counterError, Labels: s.getCounterLabels(req, pipeline), Value: 1, Audience: aud}) - shouldContinue := s.handleConditions(ctx, step.OnFailure, pipeline, step, aud, req) - if !shouldContinue { - timeoutCxl() + continuePipeline, continueProcess := s.handleConditions(ctx, step.OnFailure, pipeline, step, aud, req) + if !continueProcess { + pipelineTimeoutCxl() + s.config.Logger.Debugf("Step '%s' returned exit code success but step "+ + "condition failed, aborting pipeline", step.Name) + + stepStatus.AbortStatus = protos.AbortStatus_ABORT_STATUS_ALL + pipelineStatus.StepStatus = append(pipelineStatus.StepStatus, stepStatus) + resp.PipelineStatus = append(resp.PipelineStatus, pipelineStatus) + resp.Error = true + resp.ErrorMessage = stepStatus.ErrorMessage + return resp + } else if !continuePipeline { + pipelineTimeoutCxl() + s.config.Logger.Debugf("Step '%s' returned exit code failure, aborting pipeline", step.Name) + stepStatus.AbortStatus = protos.AbortStatus_ABORT_STATUS_CURRENT + pipelineStatus.StepStatus = append(pipelineStatus.StepStatus, stepStatus) + resp.PipelineStatus = append(resp.PipelineStatus, pipelineStatus) continue PIPELINE } default: @@ -658,10 +747,13 @@ PIPELINE: if len(wasmResp.OutputPayload) > 0 { data = wasmResp.OutputPayload } - } - timeoutCxl() + stepTimeoutCxl() + pipelineStatus.StepStatus = append(pipelineStatus.StepStatus, stepStatus) + } + pipelineTimeoutCxl() + resp.PipelineStatus = append(resp.PipelineStatus, pipelineStatus) } // Perform tail if necessary @@ -673,9 +765,7 @@ PIPELINE: data = req.Data } - return &ProcessResponse{ - Data: data, - }, nil + return resp } func (s *Streamdal) handleConditions( @@ -685,8 +775,9 @@ func (s *Streamdal) handleConditions( step *protos.PipelineStep, aud *protos.Audience, req *ProcessRequest, -) bool { - shouldContinue := true +) (bool, bool) { + shouldContinuePipeline := true + shouldContinueProcess := true for _, condition := range conditions { switch condition { case protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_NOTIFY: @@ -705,16 +796,20 @@ func (s *Streamdal) handleConditions( } _ = s.metrics.Incr(ctx, &types.CounterEntry{Name: types.NotifyCount, Labels: labels, Value: 1, Audience: aud}) } - case protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_ABORT: + case protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_ABORT_CURRENT: s.config.Logger.Debugf("Step '%s' failed, aborting further pipeline steps", step.Name) - shouldContinue = false + shouldContinuePipeline = false + case protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_ABORT_ALL: + s.config.Logger.Debugf("Step '%s' failed, aborting all pipelines", step.Name) + shouldContinuePipeline = false + shouldContinueProcess = false default: // Assume continue s.config.Logger.Debugf("Step '%s' failed, continuing to next step", step.Name) } } - return shouldContinue + return shouldContinuePipeline, shouldContinueProcess } func (a *Audience) toProto(serviceName string) *protos.Audience { diff --git a/go_sdk_test.go b/go_sdk_test.go index 0d4bb4c9..d3c24f5 100644 --- a/go_sdk_test.go +++ b/go_sdk_test.go @@ -214,25 +214,26 @@ var _ = Describe("Streamdal", func() { It("handles notify condition", func() { conditions := []protos.PipelineStepCondition{protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_NOTIFY} - got := s.handleConditions(context.Background(), conditions, pipeline, step, aud, req) - Expect(got).To(BeTrue()) + continuePipeline, _ := s.handleConditions(context.Background(), conditions, pipeline, step, aud, req) + Expect(continuePipeline).To(BeTrue()) Expect(fakeClient.NotifyCallCount()).To(Equal(1)) }) It("handles abort condition", func() { - conditions := []protos.PipelineStepCondition{protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_ABORT} + conditions := []protos.PipelineStepCondition{protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_ABORT_CURRENT} - got := s.handleConditions(context.Background(), conditions, pipeline, step, aud, req) - Expect(got).To(BeFalse()) + continuePipeline, _ := s.handleConditions(context.Background(), conditions, pipeline, step, aud, req) + Expect(continuePipeline).To(BeFalse()) }) }) Context("Process", func() { It("return error when process request is nil", func() { s := &Streamdal{} - _, err := s.Process(context.Background(), nil) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring(ErrEmptyProcessRequest.Error())) + resp := s.Process(context.Background(), nil) + Expect(resp.Error).To(BeTrue()) + Expect(resp.ErrorMessage).To(ContainSubstring(ErrEmptyProcessRequest.Error())) + Expect(len(resp.PipelineStatus)).To(Equal(0)) }) It("processes successfully", func() { @@ -256,7 +257,7 @@ var _ = Describe("Streamdal", func() { XWasmBytes: wasmData, XWasmFunction: stringPtr("f"), OnSuccess: make([]protos.PipelineStepCondition, 0), - OnFailure: []protos.PipelineStepCondition{protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_ABORT}, + OnFailure: []protos.PipelineStepCondition{protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_ABORT_CURRENT}, Step: &protos.PipelineStep_Detective{ Detective: &steps.DetectiveStep{ Path: stringPtr("object.payload"), @@ -299,90 +300,110 @@ var _ = Describe("Streamdal", func() { }, } - _, err = s.Process(context.Background(), &ProcessRequest{ + resp := s.Process(context.Background(), &ProcessRequest{ ComponentName: aud.ComponentName, OperationType: OperationType(aud.OperationType), OperationName: aud.OperationName, Data: []byte(`{"object":{"payload":"streamdal@gmail.com"}`), }) - Expect(err).ToNot(HaveOccurred()) + Expect(resp.Error).To(BeFalse()) + Expect(resp.ErrorMessage).To(Equal("")) + Expect(len(resp.PipelineStatus)).To(Equal(1)) + Expect(len(resp.PipelineStatus[0].StepStatus)).To(Equal(1)) }) - // TODO: how to test with multipipeline - //It("fails on a detective match and aborts", func() { - // aud := &protos.Audience{ - // ServiceName: "mysvc1", - // ComponentName: "kafka", - // OperationType: protos.OperationType_OPERATION_TYPE_PRODUCER, - // OperationName: "mytopic", - // } - // - // wasmData, err := os.ReadFile("test-assets/wasm/detective.wasm") - // Expect(err).ToNot(HaveOccurred()) - // - // pipeline := &protos.Pipeline{ - // Id: uuid.New().String(), - // Name: "Test Pipeline", - // Steps: []*protos.PipelineStep{ - // { - // Name: "Step 1", - // XWasmId: stringPtr(uuid.New().String()), - // XWasmBytes: wasmData, - // XWasmFunction: stringPtr("f"), - // OnSuccess: make([]protos.PipelineStepCondition, 0), - // OnFailure: []protos.PipelineStepCondition{protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_ABORT}, - // Step: &protos.PipelineStep_Detective{ - // Detective: &steps.DetectiveStep{ - // Path: stringPtr("object.payload"), - // Args: []string{"gmail.com"}, - // Negate: boolPtr(false), - // Type: steps.DetectiveType_DETECTIVE_TYPE_STRING_CONTAINS_ANY, - // }, - // }, - // }, - // }, - // } - // - // s := &Streamdal{ - // serverClient: &serverfakes.FakeIServerClient{}, - // functionsMtx: &sync.RWMutex{}, - // functions: map[string]*function{}, - // audiencesMtx: &sync.RWMutex{}, - // audiences: map[string]struct{}{}, - // config: &Config{ - // ServiceName: "mysvc1", - // Logger: &logger.TinyLogger{}, - // StepTimeout: time.Millisecond * 10, - // PipelineTimeout: time.Millisecond * 100, - // }, - // metrics: &metricsfakes.FakeIMetrics{}, - // tails: map[string]map[string]*Tail{}, - // tailsMtx: &sync.RWMutex{}, - // pipelinesMtx: &sync.RWMutex{}, - // pipelines: map[string]map[string]*protos.Command{ - // audToStr(aud): { - // pipeline.Id: { - // Audience: aud, - // Command: &protos.Command_AttachPipeline{ - // AttachPipeline: &protos.AttachPipelineCommand{ - // Pipeline: pipeline, - // }, - // }, - // }, - // }, - // }, - // } - // - // resp, err := s.Process(context.Background(), &ProcessRequest{ - // ComponentName: aud.ComponentName, - // OperationType: OperationType(aud.OperationType), - // OperationName: aud.OperationName, - // Data: []byte(`{"object":{"payload":"streamdal@hotmail.com"}`), - // }) - // Expect(err).ToNot(HaveOccurred()) - // Expect(resp.Error).To(BeTrue()) - // Expect(resp.Message).To(ContainSubstring("step failed")) - //}) + It("fails on a detective match and aborts entire pipeline", func() { + aud := &protos.Audience{ + ServiceName: "mysvc1", + ComponentName: "kafka", + OperationType: protos.OperationType_OPERATION_TYPE_PRODUCER, + OperationName: "mytopic", + } + + wasmData, err := os.ReadFile("test-assets/wasm/detective.wasm") + Expect(err).ToNot(HaveOccurred()) + + pipeline := &protos.Pipeline{ + Id: uuid.New().String(), + Name: "Test Pipeline", + Steps: []*protos.PipelineStep{ + { + Name: "Step 1", + XWasmId: stringPtr(uuid.New().String()), + XWasmBytes: wasmData, + XWasmFunction: stringPtr("f"), + OnSuccess: make([]protos.PipelineStepCondition, 0), + OnFailure: []protos.PipelineStepCondition{protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_ABORT_ALL}, + Step: &protos.PipelineStep_Detective{ + Detective: &steps.DetectiveStep{ + Path: stringPtr("object.payload"), + Args: []string{"gmail.com"}, + Negate: boolPtr(false), + Type: steps.DetectiveType_DETECTIVE_TYPE_STRING_CONTAINS_ANY, + }, + }, + }, + { + Name: "Step 2", + XWasmId: stringPtr(uuid.New().String()), + XWasmBytes: wasmData, + XWasmFunction: stringPtr("f"), + OnSuccess: make([]protos.PipelineStepCondition, 0), + OnFailure: []protos.PipelineStepCondition{protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_ABORT_ALL}, + Step: &protos.PipelineStep_Detective{ + Detective: &steps.DetectiveStep{ + Path: stringPtr("object.payload"), + Args: []string{"gmail.com"}, + Negate: boolPtr(false), + Type: steps.DetectiveType_DETECTIVE_TYPE_STRING_CONTAINS_ANY, + }, + }, + }, + }, + } + + s := &Streamdal{ + serverClient: &serverfakes.FakeIServerClient{}, + functionsMtx: &sync.RWMutex{}, + functions: map[string]*function{}, + audiencesMtx: &sync.RWMutex{}, + audiences: map[string]struct{}{}, + config: &Config{ + ServiceName: "mysvc1", + Logger: &logger.TinyLogger{}, + StepTimeout: time.Millisecond * 10, + PipelineTimeout: time.Millisecond * 100, + }, + metrics: &metricsfakes.FakeIMetrics{}, + tails: map[string]map[string]*Tail{}, + tailsMtx: &sync.RWMutex{}, + pipelinesMtx: &sync.RWMutex{}, + pipelines: map[string]map[string]*protos.Command{ + audToStr(aud): { + pipeline.Id: { + Audience: aud, + Command: &protos.Command_AttachPipeline{ + AttachPipeline: &protos.AttachPipelineCommand{ + Pipeline: pipeline, + }, + }, + }, + }, + }, + } + + resp := s.Process(context.Background(), &ProcessRequest{ + ComponentName: aud.ComponentName, + OperationType: OperationType(aud.OperationType), + OperationName: aud.OperationName, + Data: []byte(`{"object":{"payload":"streamdal@hotmail.com"}`), + }) + Expect(resp.Error).To(BeTrue()) + Expect(resp.ErrorMessage).To(ContainSubstring("Step failed")) + Expect(len(resp.PipelineStatus)).To(Equal(1)) + Expect(len(resp.PipelineStatus[0].StepStatus)).To(Equal(1)) + Expect(resp.PipelineStatus[0].StepStatus[0].AbortStatus).To(Equal(protos.AbortStatus_ABORT_STATUS_ALL)) + }) }) Context("Multithreaded test", func() { @@ -408,7 +429,7 @@ var _ = Describe("Streamdal", func() { XWasmBytes: wasmData, XWasmFunction: stringPtr("f"), OnSuccess: make([]protos.PipelineStepCondition, 0), - OnFailure: []protos.PipelineStepCondition{protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_ABORT}, + OnFailure: []protos.PipelineStepCondition{protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_ABORT_CURRENT}, Step: &protos.PipelineStep_Detective{ Detective: &steps.DetectiveStep{ Path: stringPtr("object.payload"), @@ -460,15 +481,17 @@ var _ = Describe("Streamdal", func() { go func() { defer GinkgoRecover() defer wg.Done() - resp, err := s.Process(context.Background(), &ProcessRequest{ + resp := s.Process(context.Background(), &ProcessRequest{ ComponentName: aud.ComponentName, OperationType: OperationType(aud.OperationType), OperationName: aud.OperationName, Data: payload, }) - Expect(err).ToNot(HaveOccurred()) + Expect(resp.Error).To(BeFalse()) Expect(resp).To(BeAssignableToTypeOf(&ProcessResponse{})) Expect(string(resp.Data)).To(Equal(string(payload))) + Expect(len(resp.PipelineStatus)).To(Equal(1)) + Expect(len(resp.PipelineStatus[0].StepStatus)).To(Equal(1)) }() } diff --git a/wasm_test.go b/wasm_test.go index ed3dacd..b6b2b2b 100644 --- a/wasm_test.go +++ b/wasm_test.go @@ -214,5 +214,44 @@ var _ = Describe("WASM Modules", func() { Expect(wasmResp).ToNot(BeNil()) Expect(wasmResp.ExitCode).To(Equal(protos.WASMExitCode_WASM_EXIT_CODE_FAILURE)) }) + + It("can scan the whole payload", func() { + req.InputPayload = []byte(`{"object": {"type": "streamdal", "cc_num": "4111111111111111"}}`) + + det := req.Step.GetDetective() + det.Path = stringPtr("") + det.Args = []string{""} + det.Type = steps.DetectiveType_DETECTIVE_TYPE_PII_CREDIT_CARD + + data, err := proto.Marshal(req) + Expect(err).ToNot(HaveOccurred()) + + res, err := f.Exec(context.Background(), data) + Expect(err).ToNot(HaveOccurred()) + + wasmResp := &protos.WASMResponse{} + + err = proto.Unmarshal(res, wasmResp) + Expect(err).ToNot(HaveOccurred()) + Expect(wasmResp).ToNot(BeNil()) + Expect(wasmResp.ExitCode).To(Equal(protos.WASMExitCode_WASM_EXIT_CODE_SUCCESS)) + + // Check that we don' find it + req.InputPayload = []byte(`{"object": {"type": "streamdal", "cc_num": "1234"}}`) + + data, err = proto.Marshal(req) + Expect(err).ToNot(HaveOccurred()) + + res, err = f.Exec(context.Background(), data) + Expect(err).ToNot(HaveOccurred()) + + wasmResp = &protos.WASMResponse{} + + err = proto.Unmarshal(res, wasmResp) + Expect(err).ToNot(HaveOccurred()) + Expect(wasmResp).ToNot(BeNil()) + Expect(wasmResp.ExitCode).To(Equal(protos.WASMExitCode_WASM_EXIT_CODE_FAILURE)) + }) + }) }) From 2a04333e66bf79b6e5562a1133ae1d4aeb187635 Mon Sep 17 00:00:00 2001 From: Mark G Date: Thu, 7 Dec 2023 13:34:09 -0600 Subject: [PATCH 2/7] Update wasm_test.go --- wasm_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/wasm_test.go b/wasm_test.go index b6b2b2b..21c2ba4 100644 --- a/wasm_test.go +++ b/wasm_test.go @@ -147,10 +147,10 @@ var _ = Describe("WASM Modules", func() { var req *protos.WASMRequest var f *function - wasmData, err := os.ReadFile("test-assets/wasm/detective.wasm") - Expect(err).ToNot(HaveOccurred()) - BeforeEach(func() { + wasmData, err := os.ReadFile("test-assets/wasm/detective.wasm") + Expect(err).ToNot(HaveOccurred()) + req = &protos.WASMRequest{ Step: &protos.PipelineStep{ Step: &protos.PipelineStep_Detective{ From b29887b39fa4ebfee78f320bd11297ad7669ee73 Mon Sep 17 00:00:00 2001 From: Mark G Date: Fri, 8 Dec 2023 09:21:02 -0600 Subject: [PATCH 3/7] output payload should only be updated on success --- go_sdk.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/go_sdk.go b/go_sdk.go index 5bd7fd7..be818a9 100644 --- a/go_sdk.go +++ b/go_sdk.go @@ -677,6 +677,11 @@ PIPELINE: continue // Step } + // Only update working payload if one is returned + if len(wasmResp.OutputPayload) > 0 { + data = wasmResp.OutputPayload + } + // Check on success and on-failures switch wasmResp.ExitCode { case protos.WASMExitCode_WASM_EXIT_CODE_SUCCESS: @@ -743,11 +748,6 @@ PIPELINE: s.config.Logger.Debugf("Step '%s' returned unknown exit code %d", step.Name, wasmResp.ExitCode) } - // Only update working payload if one is returned - if len(wasmResp.OutputPayload) > 0 { - data = wasmResp.OutputPayload - } - stepTimeoutCxl() pipelineStatus.StepStatus = append(pipelineStatus.StepStatus, stepStatus) } From 5a508e39465eba6e1a05d9b36422138a2097ed77 Mon Sep 17 00:00:00 2001 From: Mark G Date: Fri, 8 Dec 2023 15:04:34 -0600 Subject: [PATCH 4/7] Update README.md --- README.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index afc4155..99b67ee 100644 --- a/README.md +++ b/README.md @@ -37,13 +37,18 @@ func main() { ShutdownCtx: context.Background(), }) - resp, _ := sc.Process(context.Background(), &streamdal.ProcessRequest{ + resp := sc.Process(context.Background(), &streamdal.ProcessRequest{ OperationType: streamdal.OperationTypeConsumer, OperationName: "new-order-topic", ComponentName: "kafka", Data: []byte(`{"object": {"field": true}}`), }) + if resp.Error != nil { + fmt.Println(resp.ErrorMessage) + return + } + println(string(resp.Data)) } From f99fcef7cef800b46b285231dc3f14890f2174c7 Mon Sep 17 00:00:00 2001 From: Mark G Date: Fri, 8 Dec 2023 15:04:48 -0600 Subject: [PATCH 5/7] Update README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 99b67ee..aaeb83d 100644 --- a/README.md +++ b/README.md @@ -45,8 +45,8 @@ func main() { }) if resp.Error != nil { - fmt.Println(resp.ErrorMessage) - return + fmt.Println(resp.ErrorMessage) + return } println(string(resp.Data)) From 3634c260241a67e4fd282cc515111c0ee9ff2600 Mon Sep 17 00:00:00 2001 From: Mark G Date: Tue, 12 Dec 2023 15:37:11 -0600 Subject: [PATCH 6/7] ENG-1342 - Fix shared memory access * Function execution mutex needed to be moved up the call stack * Memory needed to be bigger. Instantiating with 1000 pages (64MB per wasm module) * Added additional dealloc() call when readMemory fails, just in case * Updated multi pipeline test to replicate data seen in original issue --- function.go | 12 ++++++++---- go_sdk.go | 23 ++++++++++++----------- go_sdk_test.go | 50 ++++++++++++++++++++++++++++++++++++-------------- 3 files changed, 56 insertions(+), 29 deletions(-) diff --git a/function.go b/function.go index 1ac9be4..92e5be7 100644 --- a/function.go +++ b/function.go @@ -24,9 +24,6 @@ type function struct { } func (f *function) Exec(ctx context.Context, req []byte) ([]byte, error) { - f.mtx.Lock() - defer f.mtx.Unlock() - ptrLen := uint64(len(req)) inputPtr, err := f.alloc.Call(ctx, ptrLen) @@ -65,6 +62,10 @@ func (f *function) Exec(ctx context.Context, req []byte) ([]byte, error) { // Read memory starting from result ptr resBytes, err := f.readMemory(resultPtr, resultSize) if err != nil { + // Dealloc response memory + if _, err := f.dealloc.Call(ctx, uint64(resultPtr), uint64(resultSize)); err != nil { + return nil, errors.Wrap(err, "unable to deallocate memory") + } return nil, errors.Wrap(err, "unable to read memory") } @@ -153,8 +154,11 @@ func (s *Streamdal) createWASMInstance(wasmBytes []byte) (api.Module, error) { "httpRequest": s.hf.HTTPRequest, } + rCfg := wazero.NewRuntimeConfig(). + WithMemoryLimitPages(1000) // 64MB (default is 1MB) + ctx := context.Background() - r := wazero.NewRuntime(ctx) + r := wazero.NewRuntimeWithConfig(ctx, rCfg) wasi_snapshot_preview1.MustInstantiate(ctx, r) diff --git a/go_sdk.go b/go_sdk.go index be818a9..2869925 100644 --- a/go_sdk.go +++ b/go_sdk.go @@ -192,7 +192,7 @@ func New(cfg *Config) (*Streamdal, error) { return nil, errors.Wrap(err, "unable to validate config") } - // We instantiate this library based on whether or not we have a Client URL+token + // We instantiate this library based on whether we have a Client URL+token or not. // If these are not provided, the wrapper library will not perform rule checks and // will act as normal if cfg.ServerURL == "" || cfg.ServerToken == "" { @@ -470,6 +470,9 @@ func (s *Streamdal) runStep(ctx context.Context, aud *protos.Audience, step *pro return nil, errors.Wrap(err, "failed to get wasm data") } + f.mtx.Lock() + defer f.mtx.Unlock() + // Don't need this anymore, and don't want to send it to the wasm function step.XWasmBytes = nil @@ -557,8 +560,7 @@ func (s *Streamdal) Process(ctx context.Context, req *ProcessRequest) *ProcessRe PipelineStatus: make([]*protos.PipelineStatus, 0), } - data := req.Data - payloadSize := int64(len(data)) + payloadSize := int64(len(resp.Data)) aud := &protos.Audience{ ServiceName: s.config.ServiceName, @@ -588,7 +590,7 @@ func (s *Streamdal) Process(ctx context.Context, req *ProcessRequest) *ProcessRe pipelines := s.getPipelines(ctx, aud) if len(pipelines) == 0 { // Send tail if there is any. Tails do not require a pipeline to operate - s.sendTail(aud, "", data, data) + s.sendTail(aud, "", resp.Data, resp.Data) // No pipelines for this mode, nothing to do return resp @@ -603,8 +605,6 @@ func (s *Streamdal) Process(ctx context.Context, req *ProcessRequest) *ProcessRe return resp } - originalData := data // Used for tail request - PIPELINE: for _, p := range pipelines { pipelineTimeoutCtx, pipelineTimeoutCxl := context.WithTimeout(ctx, s.config.PipelineTimeout) @@ -644,7 +644,7 @@ PIPELINE: // NOOP } - wasmResp, err := s.runStep(stepTimeoutCtx, aud, step, data) + wasmResp, err := s.runStep(stepTimeoutCtx, aud, step, resp.Data) if err != nil { stepTimeoutCxl() @@ -679,7 +679,7 @@ PIPELINE: // Only update working payload if one is returned if len(wasmResp.OutputPayload) > 0 { - data = wasmResp.OutputPayload + resp.Data = wasmResp.OutputPayload } // Check on success and on-failures @@ -718,7 +718,8 @@ PIPELINE: stepStatus.Error = true stepStatus.ErrorMessage = "Step failed: " + wasmResp.ExitMsg - s.config.Logger.Errorf("Step '%s' returned exit code '%s'", step.Name, wasmResp.ExitCode.String()) + s.config.Logger.Errorf("Step '%s' returned exit code '%s': %s", step.Name, wasmResp.ExitCode.String(), wasmResp.ExitMsg) + s.config.Logger.Errorf("Data: %s", string(resp.Data)) _ = s.metrics.Incr(ctx, &types.CounterEntry{Name: counterError, Labels: s.getCounterLabels(req, pipeline), Value: 1, Audience: aud}) @@ -757,12 +758,12 @@ PIPELINE: } // Perform tail if necessary - s.sendTail(aud, "", originalData, data) + s.sendTail(aud, "", req.Data, resp.Data) // Dry run should not modify anything, but we must allow pipeline to // mutate internal state in order to function properly if s.config.DryRun { - data = req.Data + resp.Data = req.Data } return resp diff --git a/go_sdk_test.go b/go_sdk_test.go index d3c24f5..1d60154 100644 --- a/go_sdk_test.go +++ b/go_sdk_test.go @@ -10,12 +10,11 @@ import ( "testing" "time" - "google.golang.org/grpc" - "github.com/google/uuid" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/pkg/errors" + "google.golang.org/grpc" "google.golang.org/protobuf/proto" "github.com/streamdal/protos/build/go/protos" @@ -396,7 +395,7 @@ var _ = Describe("Streamdal", func() { ComponentName: aud.ComponentName, OperationType: OperationType(aud.OperationType), OperationName: aud.OperationName, - Data: []byte(`{"object":{"payload":"streamdal@hotmail.com"}`), + Data: []byte(`{"object":{"payload":"streamdal@hotmail.com"}}`), }) Expect(resp.Error).To(BeTrue()) Expect(resp.ErrorMessage).To(ContainSubstring("Step failed")) @@ -416,7 +415,10 @@ var _ = Describe("Streamdal", func() { OperationName: "mytopic", } - wasmData, err := os.ReadFile("test-assets/wasm/detective.wasm") + wasmDetective, err := os.ReadFile("test-assets/wasm/detective.wasm") + Expect(err).ToNot(HaveOccurred()) + + transformDetective, err := os.ReadFile("test-assets/wasm/transform.wasm") Expect(err).ToNot(HaveOccurred()) pipeline := &protos.Pipeline{ @@ -424,21 +426,35 @@ var _ = Describe("Streamdal", func() { Name: "Test Pipeline", Steps: []*protos.PipelineStep{ { - Name: "Step 1", + Name: "Detective Step", XWasmId: stringPtr(uuid.New().String()), - XWasmBytes: wasmData, + XWasmBytes: wasmDetective, XWasmFunction: stringPtr("f"), OnSuccess: make([]protos.PipelineStepCondition, 0), - OnFailure: []protos.PipelineStepCondition{protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_ABORT_CURRENT}, + OnFailure: []protos.PipelineStepCondition{protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_ABORT_ALL}, Step: &protos.PipelineStep_Detective{ Detective: &steps.DetectiveStep{ Path: stringPtr("object.payload"), - Args: []string{"gmail.com"}, + Args: []string{".com"}, Negate: boolPtr(false), Type: steps.DetectiveType_DETECTIVE_TYPE_STRING_CONTAINS_ANY, }, }, }, + { + Name: "Transform Step", + XWasmId: stringPtr(uuid.New().String()), + XWasmBytes: transformDetective, + XWasmFunction: stringPtr("f"), + OnSuccess: make([]protos.PipelineStepCondition, 0), + OnFailure: []protos.PipelineStepCondition{protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_ABORT_ALL}, + Step: &protos.PipelineStep_Transform{ + Transform: &steps.TransformStep{ + Path: "object.payload", + Type: steps.TransformType_TRANSFORM_TYPE_MASK_VALUE, + }, + }, + }, }, } @@ -453,8 +469,9 @@ var _ = Describe("Streamdal", func() { config: &Config{ ServiceName: "mysvc1", Logger: &logger.TinyLogger{}, - StepTimeout: time.Millisecond * 10, - PipelineTimeout: time.Millisecond * 100, + StepTimeout: time.Millisecond * 100, + PipelineTimeout: time.Second * 10, + DryRun: false, }, metrics: &metricsfakes.FakeIMetrics{}, pipelinesMtx: &sync.RWMutex{}, @@ -472,9 +489,9 @@ var _ = Describe("Streamdal", func() { }, } - payload := []byte(`{"object":{"payload":"streamdal@gmail.com"}`) + payload := []byte(`{"object":{"payload":"streamdal@gmail.com"}}`) - // Run 1000 requests in parallel + // Run 100 requests in parallel wg := &sync.WaitGroup{} for i := 0; i < 100; i++ { wg.Add(1) @@ -487,11 +504,16 @@ var _ = Describe("Streamdal", func() { OperationName: aud.OperationName, Data: payload, }) + Expect(resp.Error).To(BeFalse()) Expect(resp).To(BeAssignableToTypeOf(&ProcessResponse{})) - Expect(string(resp.Data)).To(Equal(string(payload))) Expect(len(resp.PipelineStatus)).To(Equal(1)) - Expect(len(resp.PipelineStatus[0].StepStatus)).To(Equal(1)) + Expect(len(resp.PipelineStatus[0].StepStatus)).To(Equal(2)) + Expect(resp.PipelineStatus[0].StepStatus[0].Error).To(BeFalse()) + Expect(resp.PipelineStatus[0].StepStatus[1].Error).To(BeFalse()) + Expect(resp.PipelineStatus[0].StepStatus[0].AbortStatus).To(Equal(protos.AbortStatus_ABORT_STATUS_UNSET)) + Expect(resp.PipelineStatus[0].StepStatus[1].AbortStatus).To(Equal(protos.AbortStatus_ABORT_STATUS_UNSET)) + Expect(string(resp.Data)).To(Equal(`{"object":{"payload":"stre***************"}}`)) }() } From dbce4c47ee332c6b7ba4b82554f73ff411811517 Mon Sep 17 00:00:00 2001 From: Mark G Date: Tue, 12 Dec 2023 15:45:18 -0600 Subject: [PATCH 7/7] Update go_sdk_test.go --- go_sdk_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/go_sdk_test.go b/go_sdk_test.go index 1d60154..a8ad270 100644 --- a/go_sdk_test.go +++ b/go_sdk_test.go @@ -423,10 +423,10 @@ var _ = Describe("Streamdal", func() { pipeline := &protos.Pipeline{ Id: uuid.New().String(), - Name: "Test Pipeline", + Name: "Multithreaded Test Pipeline", Steps: []*protos.PipelineStep{ { - Name: "Detective Step", + Name: "Multithreaded - Detective Step", XWasmId: stringPtr(uuid.New().String()), XWasmBytes: wasmDetective, XWasmFunction: stringPtr("f"), @@ -442,7 +442,7 @@ var _ = Describe("Streamdal", func() { }, }, { - Name: "Transform Step", + Name: "Multithreaded - Transform Step", XWasmId: stringPtr(uuid.New().String()), XWasmBytes: transformDetective, XWasmFunction: stringPtr("f"), @@ -470,7 +470,7 @@ var _ = Describe("Streamdal", func() { ServiceName: "mysvc1", Logger: &logger.TinyLogger{}, StepTimeout: time.Millisecond * 100, - PipelineTimeout: time.Second * 10, + PipelineTimeout: time.Minute, // Due to mutex, this should be longer than the entire test will take under CI DryRun: false, }, metrics: &metricsfakes.FakeIMetrics{},