Skip to content
This repository has been archived by the owner on Feb 9, 2024. It is now read-only.

Commit

Permalink
Merge pull request #60 from streamdal/blinktag/drop_message
Browse files Browse the repository at this point in the history
Support for drop message flag
  • Loading branch information
blinktag authored Jan 10, 2024
2 parents 44d0adc + 9576764 commit 7251305
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 40 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/onsi/gomega v1.30.0
github.com/pkg/errors v0.9.1
github.com/relistan/go-director v0.0.0-20200406104025-dbbf5d95248d
github.com/streamdal/streamdal/libs/protos v0.1.10
github.com/streamdal/streamdal/libs/protos v0.1.12
github.com/tetratelabs/wazero v1.6.0
golang.org/x/time v0.5.0
google.golang.org/grpc v1.60.1
Expand All @@ -21,10 +21,10 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/pprof v0.0.0-20231212022811-ec68065c825e // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.16.1 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240108191215-35c7eff3a6b1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
10 changes: 8 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykE
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/streamdal/streamdal/libs/protos v0.1.10 h1:gInzPnD0r08bf5Q4XcLMXMSLvUdcUGKnuuJTiwGQSF8=
github.com/streamdal/streamdal/libs/protos v0.1.10/go.mod h1:WZ6qCqzJu/9Vn+P5EUMN1hcOWlxN2EHgcsxehylhgJQ=
github.com/streamdal/streamdal/libs/protos v0.1.11 h1:PgGS2YHI3X7sHeazQKWjh1VMENW3klMF5NFcKtpsL3M=
github.com/streamdal/streamdal/libs/protos v0.1.11/go.mod h1:WZ6qCqzJu/9Vn+P5EUMN1hcOWlxN2EHgcsxehylhgJQ=
github.com/streamdal/streamdal/libs/protos v0.1.12 h1:U0301m8/EYRHW6tCIk7lIJM86iMQVcoXAqIWdOayKkQ=
github.com/streamdal/streamdal/libs/protos v0.1.12/go.mod h1:WZ6qCqzJu/9Vn+P5EUMN1hcOWlxN2EHgcsxehylhgJQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand All @@ -44,6 +46,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
Expand All @@ -58,6 +62,8 @@ golang.org/x/tools v0.16.1/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 h1:6G8oQ016D88m1xAKljMlBOOGWDZkes4kMhgGFlf8WcQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917/go.mod h1:xtjpI3tXFPP051KaWnhvxkiubL/6dJ18vLVf7q2pTOU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240108191215-35c7eff3a6b1 h1:gphdwh0npgs8elJ4T6J+DQJHPVF7RsuJHCfwztUb4J4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240108191215-35c7eff3a6b1/go.mod h1:daQN87bsDqDoe316QbbvX60nMoJQa4r6Ds0ZuoAe5yA=
google.golang.org/grpc v1.60.1 h1:26+wFr+cNqSGFcOXcabYC0lUVJVRa2Sb2ortSK7VrEU=
google.golang.org/grpc v1.60.1/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz4eGM=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
Expand Down
98 changes: 68 additions & 30 deletions go_sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,16 +654,18 @@ PIPELINE:
stepStatus.Error = true
stepStatus.ErrorMessage = err.Error()

continuePipeline, continueProcess := s.handleConditions(ctx, step.OnFailure, pipeline, step, aud, req)
if !continueProcess {
cond := s.handleConditions(ctx, step.OnFailure, pipeline, step, aud, req)
// Do not drop message on a WASM execution error, excluding that logic from here

if !cond.continueProcess {
pipelineTimeoutCxl()
s.config.Logger.Debugf("Step '%s' failed to run, aborting pipeline", step.Name)

stepStatus.AbortStatus = protos.AbortStatus_ABORT_STATUS_ALL
pipelineStatus.StepStatus = append(pipelineStatus.StepStatus, stepStatus)
resp.PipelineStatus = append(resp.PipelineStatus, pipelineStatus)
return resp
} else if !continuePipeline {
} else if !cond.continuePipeline {
pipelineTimeoutCxl()

stepStatus.AbortStatus = protos.AbortStatus_ABORT_STATUS_CURRENT
Expand All @@ -688,22 +690,30 @@ PIPELINE:
stepTimeoutCxl()
s.config.Logger.Debugf("Step '%s' returned exit code success", step.Name)

continuePipeline, continueProcess := s.handleConditions(ctx, step.OnSuccess, pipeline, step, aud, req)
if !continueProcess {
cond := s.handleConditions(ctx, step.OnSuccess, pipeline, step, aud, req)
if cond.dropMessage {
pipelineTimeoutCxl()

s.config.Logger.Debugf("Step '%s' returned exit code success but step "+
"condition failed, aborting further pipelines", step.Name)

stepStatus.AbortStatus = protos.AbortStatus_ABORT_STATUS_ALL
stepStatus.AbortStatus = protos.AbortStatus_ABORT_STATUS_DROP_MESSAGE
pipelineStatus.StepStatus = append(pipelineStatus.StepStatus, stepStatus)
resp.PipelineStatus = append(resp.PipelineStatus, pipelineStatus)
resp.DropMessage = true
return resp
} else if !continuePipeline {
}

if !cond.continueProcess {
pipelineTimeoutCxl()

s.config.Logger.Debugf("Step '%s' returned exit code success but step "+
"condition failed, aborting step and continuing pipelines", step.Name)
stepStatus.AbortStatus = protos.AbortStatus_ABORT_STATUS_ALL
pipelineStatus.StepStatus = append(pipelineStatus.StepStatus, stepStatus)
resp.PipelineStatus = append(resp.PipelineStatus, pipelineStatus)
return resp
} else if !cond.continuePipeline {
pipelineTimeoutCxl()

stepStatus.AbortStatus = protos.AbortStatus_ABORT_STATUS_CURRENT
pipelineStatus.StepStatus = append(pipelineStatus.StepStatus, stepStatus)
Expand All @@ -723,8 +733,18 @@ PIPELINE:

_ = s.metrics.Incr(ctx, &types.CounterEntry{Name: counterError, Labels: s.getCounterLabels(req, pipeline), Value: 1, Audience: aud})

continuePipeline, continueProcess := s.handleConditions(ctx, step.OnFailure, pipeline, step, aud, req)
if !continueProcess {
cond := s.handleConditions(ctx, step.OnFailure, pipeline, step, aud, req)
if cond.dropMessage {
pipelineTimeoutCxl()

stepStatus.AbortStatus = protos.AbortStatus_ABORT_STATUS_DROP_MESSAGE
pipelineStatus.StepStatus = append(pipelineStatus.StepStatus, stepStatus)
resp.PipelineStatus = append(resp.PipelineStatus, pipelineStatus)
resp.DropMessage = true
return resp
}

if !cond.continueProcess {
pipelineTimeoutCxl()
s.config.Logger.Debugf("Step '%s' returned exit code success but step "+
"condition failed, aborting pipeline", step.Name)
Expand All @@ -735,7 +755,7 @@ PIPELINE:
resp.Error = true
resp.ErrorMessage = stepStatus.ErrorMessage
return resp
} else if !continuePipeline {
} else if !cond.continuePipeline {
pipelineTimeoutCxl()

s.config.Logger.Debugf("Step '%s' returned exit code failure, aborting pipeline", step.Name)
Expand Down Expand Up @@ -769,48 +789,66 @@ PIPELINE:
return resp
}

type condition struct {
continuePipeline bool
continueProcess bool
dropMessage bool
}

// TODO: clean up the logic in this function and surrounding logic
func (s *Streamdal) handleConditions(
ctx context.Context,
conditions []protos.PipelineStepCondition,
pipeline *protos.Pipeline,
step *protos.PipelineStep,
aud *protos.Audience,
req *ProcessRequest,
) (bool, bool) {
shouldContinuePipeline := true
shouldContinueProcess := true
) *condition {
ret := &condition{
continuePipeline: true,
continueProcess: true,
dropMessage: false,
}

for _, condition := range conditions {
switch condition {
case protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_NOTIFY:
s.config.Logger.Debugf("Step '%s' condition triggered, notifying", step.Name)
if !s.config.DryRun {
if err := s.serverClient.Notify(ctx, pipeline, step, aud); err != nil {
s.config.Logger.Errorf("failed to notify condition: %v", err)
}

labels := map[string]string{
"service": s.config.ServiceName,
"component": req.ComponentName,
"operation": req.OperationName,
"pipeline_name": pipeline.Name,
"pipeline_id": pipeline.Id,
}
_ = s.metrics.Incr(ctx, &types.CounterEntry{Name: types.NotifyCount, Labels: labels, Value: 1, Audience: aud})
if s.config.DryRun {
continue
}

if err := s.serverClient.Notify(ctx, pipeline, step, aud); err != nil {
s.config.Logger.Errorf("failed to notify condition: %s", err)
}

labels := map[string]string{
"service": s.config.ServiceName,
"component": req.ComponentName,
"operation": req.OperationName,
"pipeline_name": pipeline.Name,
"pipeline_id": pipeline.Id,
}
_ = s.metrics.Incr(ctx, &types.CounterEntry{Name: types.NotifyCount, Labels: labels, Value: 1, Audience: aud})

case protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_ABORT_CURRENT:
s.config.Logger.Debugf("Step '%s' failed, aborting further pipeline steps", step.Name)
shouldContinuePipeline = false
ret.continuePipeline = false
case protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_ABORT_ALL:
s.config.Logger.Debugf("Step '%s' failed, aborting all pipelines", step.Name)
shouldContinuePipeline = false
shouldContinueProcess = false
ret.continuePipeline = false
ret.continueProcess = false
case protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_DISCARD_MESSAGE:
s.config.Logger.Debugf("Step '%s' failed, discarding message", step.Name)
ret.dropMessage = true
default:
// Assume continue
s.config.Logger.Debugf("Step '%s' failed, continuing to next step", step.Name)
}
}

return shouldContinuePipeline, shouldContinueProcess
return ret
}

func (a *Audience) toProto(serviceName string) *protos.Audience {
Expand Down
8 changes: 4 additions & 4 deletions go_sdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,16 +213,16 @@ var _ = Describe("Streamdal", func() {
It("handles notify condition", func() {
conditions := []protos.PipelineStepCondition{protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_NOTIFY}

continuePipeline, _ := s.handleConditions(context.Background(), conditions, pipeline, step, aud, req)
Expect(continuePipeline).To(BeTrue())
cond := s.handleConditions(context.Background(), conditions, pipeline, step, aud, req)
Expect(cond.continuePipeline).To(BeTrue())
Expect(fakeClient.NotifyCallCount()).To(Equal(1))
})

It("handles abort condition", func() {
conditions := []protos.PipelineStepCondition{protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_ABORT_CURRENT}

continuePipeline, _ := s.handleConditions(context.Background(), conditions, pipeline, step, aud, req)
Expect(continuePipeline).To(BeFalse())
cond := s.handleConditions(context.Background(), conditions, pipeline, step, aud, req)
Expect(cond.continuePipeline).To(BeFalse())
})
})

Expand Down
2 changes: 1 addition & 1 deletion wasm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ var _ = Describe("WASM Modules", func() {
Options: &steps.SchemaValidationStep_JsonSchema{
JsonSchema: &steps.SchemaValidationJSONSchema{
JsonSchema: schema,
Draft: steps.JSONSchemaDraft_JSON_SCHEMA_DRAFT_07,
Draft: steps.JSONSchemaDraft_JSONSCHEMA_DRAFT_07,
},
},
},
Expand Down

0 comments on commit 7251305

Please sign in to comment.