diff --git a/go.mod b/go.mod index d9bfa0b..0a37579 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/onsi/gomega v1.30.0 github.com/pkg/errors v0.9.1 github.com/relistan/go-director v0.0.0-20200406104025-dbbf5d95248d - github.com/streamdal/streamdal/libs/protos v0.1.10 + github.com/streamdal/streamdal/libs/protos v0.1.12 github.com/tetratelabs/wazero v1.6.0 golang.org/x/time v0.5.0 google.golang.org/grpc v1.60.1 @@ -21,10 +21,10 @@ require ( github.com/golang/protobuf v1.5.3 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/pprof v0.0.0-20231212022811-ec68065c825e // indirect - golang.org/x/net v0.19.0 // indirect + golang.org/x/net v0.20.0 // indirect golang.org/x/sys v0.16.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/tools v0.16.1 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240108191215-35c7eff3a6b1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 3618f98..c102e87 100644 --- a/go.sum +++ b/go.sum @@ -33,8 +33,10 @@ github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykE github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= -github.com/streamdal/streamdal/libs/protos v0.1.10 h1:gInzPnD0r08bf5Q4XcLMXMSLvUdcUGKnuuJTiwGQSF8= -github.com/streamdal/streamdal/libs/protos v0.1.10/go.mod h1:WZ6qCqzJu/9Vn+P5EUMN1hcOWlxN2EHgcsxehylhgJQ= +github.com/streamdal/streamdal/libs/protos v0.1.11 h1:PgGS2YHI3X7sHeazQKWjh1VMENW3klMF5NFcKtpsL3M= +github.com/streamdal/streamdal/libs/protos v0.1.11/go.mod h1:WZ6qCqzJu/9Vn+P5EUMN1hcOWlxN2EHgcsxehylhgJQ= +github.com/streamdal/streamdal/libs/protos v0.1.12 h1:U0301m8/EYRHW6tCIk7lIJM86iMQVcoXAqIWdOayKkQ= +github.com/streamdal/streamdal/libs/protos v0.1.12/go.mod h1:WZ6qCqzJu/9Vn+P5EUMN1hcOWlxN2EHgcsxehylhgJQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -44,6 +46,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= +golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= +golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= @@ -58,6 +62,8 @@ golang.org/x/tools v0.16.1/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 h1:6G8oQ016D88m1xAKljMlBOOGWDZkes4kMhgGFlf8WcQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917/go.mod h1:xtjpI3tXFPP051KaWnhvxkiubL/6dJ18vLVf7q2pTOU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240108191215-35c7eff3a6b1 h1:gphdwh0npgs8elJ4T6J+DQJHPVF7RsuJHCfwztUb4J4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240108191215-35c7eff3a6b1/go.mod h1:daQN87bsDqDoe316QbbvX60nMoJQa4r6Ds0ZuoAe5yA= google.golang.org/grpc v1.60.1 h1:26+wFr+cNqSGFcOXcabYC0lUVJVRa2Sb2ortSK7VrEU= google.golang.org/grpc v1.60.1/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz4eGM= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= diff --git a/go_sdk.go b/go_sdk.go index 235780a..6668ea5 100644 --- a/go_sdk.go +++ b/go_sdk.go @@ -654,8 +654,10 @@ PIPELINE: stepStatus.Error = true stepStatus.ErrorMessage = err.Error() - continuePipeline, continueProcess := s.handleConditions(ctx, step.OnFailure, pipeline, step, aud, req) - if !continueProcess { + cond := s.handleConditions(ctx, step.OnFailure, pipeline, step, aud, req) + // Do not drop message on a WASM execution error, excluding that logic from here + + if !cond.continueProcess { pipelineTimeoutCxl() s.config.Logger.Debugf("Step '%s' failed to run, aborting pipeline", step.Name) @@ -663,7 +665,7 @@ PIPELINE: pipelineStatus.StepStatus = append(pipelineStatus.StepStatus, stepStatus) resp.PipelineStatus = append(resp.PipelineStatus, pipelineStatus) return resp - } else if !continuePipeline { + } else if !cond.continuePipeline { pipelineTimeoutCxl() stepStatus.AbortStatus = protos.AbortStatus_ABORT_STATUS_CURRENT @@ -688,22 +690,30 @@ PIPELINE: stepTimeoutCxl() s.config.Logger.Debugf("Step '%s' returned exit code success", step.Name) - continuePipeline, continueProcess := s.handleConditions(ctx, step.OnSuccess, pipeline, step, aud, req) - if !continueProcess { + cond := s.handleConditions(ctx, step.OnSuccess, pipeline, step, aud, req) + if cond.dropMessage { pipelineTimeoutCxl() s.config.Logger.Debugf("Step '%s' returned exit code success but step "+ "condition failed, aborting further pipelines", step.Name) - - stepStatus.AbortStatus = protos.AbortStatus_ABORT_STATUS_ALL + stepStatus.AbortStatus = protos.AbortStatus_ABORT_STATUS_DROP_MESSAGE pipelineStatus.StepStatus = append(pipelineStatus.StepStatus, stepStatus) resp.PipelineStatus = append(resp.PipelineStatus, pipelineStatus) + resp.DropMessage = true return resp - } else if !continuePipeline { + } + + if !cond.continueProcess { pipelineTimeoutCxl() s.config.Logger.Debugf("Step '%s' returned exit code success but step "+ "condition failed, aborting step and continuing pipelines", step.Name) + stepStatus.AbortStatus = protos.AbortStatus_ABORT_STATUS_ALL + pipelineStatus.StepStatus = append(pipelineStatus.StepStatus, stepStatus) + resp.PipelineStatus = append(resp.PipelineStatus, pipelineStatus) + return resp + } else if !cond.continuePipeline { + pipelineTimeoutCxl() stepStatus.AbortStatus = protos.AbortStatus_ABORT_STATUS_CURRENT pipelineStatus.StepStatus = append(pipelineStatus.StepStatus, stepStatus) @@ -723,8 +733,18 @@ PIPELINE: _ = s.metrics.Incr(ctx, &types.CounterEntry{Name: counterError, Labels: s.getCounterLabels(req, pipeline), Value: 1, Audience: aud}) - continuePipeline, continueProcess := s.handleConditions(ctx, step.OnFailure, pipeline, step, aud, req) - if !continueProcess { + cond := s.handleConditions(ctx, step.OnFailure, pipeline, step, aud, req) + if cond.dropMessage { + pipelineTimeoutCxl() + + stepStatus.AbortStatus = protos.AbortStatus_ABORT_STATUS_DROP_MESSAGE + pipelineStatus.StepStatus = append(pipelineStatus.StepStatus, stepStatus) + resp.PipelineStatus = append(resp.PipelineStatus, pipelineStatus) + resp.DropMessage = true + return resp + } + + if !cond.continueProcess { pipelineTimeoutCxl() s.config.Logger.Debugf("Step '%s' returned exit code success but step "+ "condition failed, aborting pipeline", step.Name) @@ -735,7 +755,7 @@ PIPELINE: resp.Error = true resp.ErrorMessage = stepStatus.ErrorMessage return resp - } else if !continuePipeline { + } else if !cond.continuePipeline { pipelineTimeoutCxl() s.config.Logger.Debugf("Step '%s' returned exit code failure, aborting pipeline", step.Name) @@ -769,6 +789,13 @@ PIPELINE: return resp } +type condition struct { + continuePipeline bool + continueProcess bool + dropMessage bool +} + +// TODO: clean up the logic in this function and surrounding logic func (s *Streamdal) handleConditions( ctx context.Context, conditions []protos.PipelineStepCondition, @@ -776,41 +803,52 @@ func (s *Streamdal) handleConditions( step *protos.PipelineStep, aud *protos.Audience, req *ProcessRequest, -) (bool, bool) { - shouldContinuePipeline := true - shouldContinueProcess := true +) *condition { + ret := &condition{ + continuePipeline: true, + continueProcess: true, + dropMessage: false, + } + for _, condition := range conditions { switch condition { case protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_NOTIFY: s.config.Logger.Debugf("Step '%s' condition triggered, notifying", step.Name) - if !s.config.DryRun { - if err := s.serverClient.Notify(ctx, pipeline, step, aud); err != nil { - s.config.Logger.Errorf("failed to notify condition: %v", err) - } - labels := map[string]string{ - "service": s.config.ServiceName, - "component": req.ComponentName, - "operation": req.OperationName, - "pipeline_name": pipeline.Name, - "pipeline_id": pipeline.Id, - } - _ = s.metrics.Incr(ctx, &types.CounterEntry{Name: types.NotifyCount, Labels: labels, Value: 1, Audience: aud}) + if s.config.DryRun { + continue + } + + if err := s.serverClient.Notify(ctx, pipeline, step, aud); err != nil { + s.config.Logger.Errorf("failed to notify condition: %s", err) } + + labels := map[string]string{ + "service": s.config.ServiceName, + "component": req.ComponentName, + "operation": req.OperationName, + "pipeline_name": pipeline.Name, + "pipeline_id": pipeline.Id, + } + _ = s.metrics.Incr(ctx, &types.CounterEntry{Name: types.NotifyCount, Labels: labels, Value: 1, Audience: aud}) + case protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_ABORT_CURRENT: s.config.Logger.Debugf("Step '%s' failed, aborting further pipeline steps", step.Name) - shouldContinuePipeline = false + ret.continuePipeline = false case protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_ABORT_ALL: s.config.Logger.Debugf("Step '%s' failed, aborting all pipelines", step.Name) - shouldContinuePipeline = false - shouldContinueProcess = false + ret.continuePipeline = false + ret.continueProcess = false + case protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_DISCARD_MESSAGE: + s.config.Logger.Debugf("Step '%s' failed, discarding message", step.Name) + ret.dropMessage = true default: // Assume continue s.config.Logger.Debugf("Step '%s' failed, continuing to next step", step.Name) } } - return shouldContinuePipeline, shouldContinueProcess + return ret } func (a *Audience) toProto(serviceName string) *protos.Audience { diff --git a/go_sdk_test.go b/go_sdk_test.go index 691f7cf..0f733cd 100644 --- a/go_sdk_test.go +++ b/go_sdk_test.go @@ -213,16 +213,16 @@ var _ = Describe("Streamdal", func() { It("handles notify condition", func() { conditions := []protos.PipelineStepCondition{protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_NOTIFY} - continuePipeline, _ := s.handleConditions(context.Background(), conditions, pipeline, step, aud, req) - Expect(continuePipeline).To(BeTrue()) + cond := s.handleConditions(context.Background(), conditions, pipeline, step, aud, req) + Expect(cond.continuePipeline).To(BeTrue()) Expect(fakeClient.NotifyCallCount()).To(Equal(1)) }) It("handles abort condition", func() { conditions := []protos.PipelineStepCondition{protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_ABORT_CURRENT} - continuePipeline, _ := s.handleConditions(context.Background(), conditions, pipeline, step, aud, req) - Expect(continuePipeline).To(BeFalse()) + cond := s.handleConditions(context.Background(), conditions, pipeline, step, aud, req) + Expect(cond.continuePipeline).To(BeFalse()) }) }) diff --git a/wasm_test.go b/wasm_test.go index 6c4ea2e..2a58a26 100644 --- a/wasm_test.go +++ b/wasm_test.go @@ -482,7 +482,7 @@ var _ = Describe("WASM Modules", func() { Options: &steps.SchemaValidationStep_JsonSchema{ JsonSchema: &steps.SchemaValidationJSONSchema{ JsonSchema: schema, - Draft: steps.JSONSchemaDraft_JSON_SCHEMA_DRAFT_07, + Draft: steps.JSONSchemaDraft_JSONSCHEMA_DRAFT_07, }, }, },