Skip to content
This repository has been archived by the owner on Feb 9, 2024. It is now read-only.

Commit

Permalink
Merge pull request #51 from streamdal/blinktag/multipipeline_fix
Browse files Browse the repository at this point in the history
Multi-pipeline Fix
  • Loading branch information
blinktag authored Nov 9, 2023
2 parents 9fe5591 + e7b9a57 commit 5945780
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 35 deletions.
32 changes: 9 additions & 23 deletions go_sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@ func (s *Streamdal) Process(ctx context.Context, req *ProcessRequest) (*ProcessR

originalData := data // Used for tail request

PIPELINE:
for _, p := range pipelines {
pipeline := p.GetAttachPipeline().GetPipeline()

Expand All @@ -588,11 +589,8 @@ func (s *Streamdal) Process(ctx context.Context, req *ProcessRequest) (*ProcessR
select {
case <-timeoutCtx.Done():
timeoutCxl()
return &ProcessResponse{
Data: req.Data,
Error: true,
Message: "pipeline timeout exceeded",
}, nil
s.config.Logger.Errorf("pipeline '%s' timeout exceeded", pipeline.Name)
continue PIPELINE
default:
// NOOP
}
Expand All @@ -603,12 +601,7 @@ func (s *Streamdal) Process(ctx context.Context, req *ProcessRequest) (*ProcessR
shouldContinue := s.handleConditions(ctx, step.OnFailure, pipeline, step, aud, req)
if !shouldContinue {
timeoutCxl()
s.sendTail(aud, pipeline.Id, originalData, wasmResp.OutputPayload)
return &ProcessResponse{
Data: req.Data,
Error: true,
Message: err.Error(),
}, nil
continue PIPELINE
}

// wasmResp will be nil, so don't allow code below to execute
Expand All @@ -623,12 +616,9 @@ func (s *Streamdal) Process(ctx context.Context, req *ProcessRequest) (*ProcessR
shouldContinue := s.handleConditions(ctx, step.OnSuccess, pipeline, step, aud, req)
if !shouldContinue {
timeoutCxl()
s.sendTail(aud, pipeline.Id, originalData, wasmResp.OutputPayload)
return &ProcessResponse{
Data: wasmResp.OutputPayload,
Error: false,
Message: "",
}, nil
s.config.Logger.Debugf("Step '%s' returned exit code success but step "+
"condition failed, aborting pipeline", step.Name)
continue PIPELINE
}
case protos.WASMExitCode_WASM_EXIT_CODE_FAILURE:
fallthrough
Expand All @@ -640,12 +630,8 @@ func (s *Streamdal) Process(ctx context.Context, req *ProcessRequest) (*ProcessR
shouldContinue := s.handleConditions(ctx, step.OnFailure, pipeline, step, aud, req)
if !shouldContinue {
timeoutCxl()
s.sendTail(aud, pipeline.Id, originalData, wasmResp.OutputPayload)
return &ProcessResponse{
Data: wasmResp.OutputPayload,
Error: true,
Message: "step failed: " + wasmResp.ExitMsg,
}, nil
s.config.Logger.Debugf("Step '%s' returned exit code failure, aborting pipeline", step.Name)
continue PIPELINE
}
default:
_ = s.metrics.Incr(ctx, &types.CounterEntry{Name: counterError, Labels: s.getCounterLabels(req, pipeline), Value: 1, Audience: aud})
Expand Down
116 changes: 104 additions & 12 deletions go_sdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,86 @@ var _ = Describe("Streamdal", func() {
Expect(resp.Message).ToNot(Equal("No pipelines, message ignored"))
})

It("fails on a detective match and aborts", func() {
// TODO: how to test with multipipeline
//It("fails on a detective match and aborts", func() {
// aud := &protos.Audience{
// ServiceName: "mysvc1",
// ComponentName: "kafka",
// OperationType: protos.OperationType_OPERATION_TYPE_PRODUCER,
// OperationName: "mytopic",
// }
//
// wasmData, err := os.ReadFile("test-assets/wasm/detective.wasm")
// Expect(err).ToNot(HaveOccurred())
//
// pipeline := &protos.Pipeline{
// Id: uuid.New().String(),
// Name: "Test Pipeline",
// Steps: []*protos.PipelineStep{
// {
// Name: "Step 1",
// XWasmId: stringPtr(uuid.New().String()),
// XWasmBytes: wasmData,
// XWasmFunction: stringPtr("f"),
// OnSuccess: make([]protos.PipelineStepCondition, 0),
// OnFailure: []protos.PipelineStepCondition{protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_ABORT},
// Step: &protos.PipelineStep_Detective{
// Detective: &steps.DetectiveStep{
// Path: stringPtr("object.payload"),
// Args: []string{"gmail.com"},
// Negate: boolPtr(false),
// Type: steps.DetectiveType_DETECTIVE_TYPE_STRING_CONTAINS_ANY,
// },
// },
// },
// },
// }
//
// s := &Streamdal{
// serverClient: &serverfakes.FakeIServerClient{},
// functionsMtx: &sync.RWMutex{},
// functions: map[string]*function{},
// audiencesMtx: &sync.RWMutex{},
// audiences: map[string]struct{}{},
// config: &Config{
// ServiceName: "mysvc1",
// Logger: &logger.TinyLogger{},
// StepTimeout: time.Millisecond * 10,
// PipelineTimeout: time.Millisecond * 100,
// },
// metrics: &metricsfakes.FakeIMetrics{},
// tails: map[string]map[string]*Tail{},
// tailsMtx: &sync.RWMutex{},
// pipelinesMtx: &sync.RWMutex{},
// pipelines: map[string]map[string]*protos.Command{
// audToStr(aud): {
// pipeline.Id: {
// Audience: aud,
// Command: &protos.Command_AttachPipeline{
// AttachPipeline: &protos.AttachPipelineCommand{
// Pipeline: pipeline,
// },
// },
// },
// },
// },
// }
//
// resp, err := s.Process(context.Background(), &ProcessRequest{
// ComponentName: aud.ComponentName,
// OperationType: OperationType(aud.OperationType),
// OperationName: aud.OperationName,
// Data: []byte(`{"object":{"payload":"[email protected]"}`),
// })
// Expect(err).ToNot(HaveOccurred())
// Expect(resp.Error).To(BeTrue())
// Expect(resp.Message).To(ContainSubstring("step failed"))
//})
})

Context("Multithreaded test", func() {
It("succeeds with multiple threads", func() {

aud := &protos.Audience{
ServiceName: "mysvc1",
ComponentName: "kafka",
Expand Down Expand Up @@ -350,15 +429,15 @@ var _ = Describe("Streamdal", func() {
functions: map[string]*function{},
audiencesMtx: &sync.RWMutex{},
audiences: map[string]struct{}{},
tails: map[string]map[string]*Tail{},
tailsMtx: &sync.RWMutex{},
config: &Config{
ServiceName: "mysvc1",
Logger: &logger.TinyLogger{},
StepTimeout: time.Millisecond * 10,
PipelineTimeout: time.Millisecond * 100,
},
metrics: &metricsfakes.FakeIMetrics{},
tails: map[string]map[string]*Tail{},
tailsMtx: &sync.RWMutex{},
pipelinesMtx: &sync.RWMutex{},
pipelines: map[string]map[string]*protos.Command{
audToStr(aud): {
Expand All @@ -374,15 +453,28 @@ var _ = Describe("Streamdal", func() {
},
}

resp, err := s.Process(context.Background(), &ProcessRequest{
ComponentName: aud.ComponentName,
OperationType: OperationType(aud.OperationType),
OperationName: aud.OperationName,
Data: []byte(`{"object":{"payload":"[email protected]"}`),
})
Expect(err).ToNot(HaveOccurred())
Expect(resp.Error).To(BeTrue())
Expect(resp.Message).To(ContainSubstring("step failed"))
// Run 1000 requests in parallel
wg := &sync.WaitGroup{}
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer GinkgoRecover()
defer wg.Done()
resp, err := s.Process(context.Background(), &ProcessRequest{
ComponentName: aud.ComponentName,
OperationType: OperationType(aud.OperationType),
OperationName: aud.OperationName,
Data: []byte(`{"object":{"payload":"[email protected]"}`),
})
Expect(err).ToNot(HaveOccurred())
Expect(resp).To(BeAssignableToTypeOf(&ProcessResponse{}))
Expect(resp.Error).To(BeFalse())
Expect(resp.Message).To(Equal(""))

}()
}

wg.Wait()
})
})
})
Expand Down

0 comments on commit 5945780

Please sign in to comment.