Skip to content
This repository has been archived by the owner on Feb 9, 2024. It is now read-only.

Commit

Permalink
Merge branch 'main' into blinktag/error_handling
Browse files Browse the repository at this point in the history
  • Loading branch information
blinktag committed Nov 15, 2023
2 parents 51cec52 + b42b034 commit 4f2b715
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 21 deletions.
2 changes: 1 addition & 1 deletion audience.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (s *Streamdal) addAudiences(ctx context.Context) {
s.audiencesMtx.RLock()
defer s.audiencesMtx.RUnlock()

for audStr, _ := range s.audiences {
for audStr := range s.audiences {
aud := strToAud(audStr)

if aud == nil {
Expand Down
6 changes: 6 additions & 0 deletions function.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"sync"

"github.com/pkg/errors"
"github.com/tetratelabs/wazero"
Expand All @@ -19,9 +20,13 @@ type function struct {
entry api.Function
alloc api.Function
dealloc api.Function
mtx *sync.Mutex
}

func (f *function) Exec(ctx context.Context, req []byte) ([]byte, error) {
f.mtx.Lock()
defer f.mtx.Unlock()

ptrLen := uint64(len(req))

inputPtr, err := f.alloc.Call(ctx, ptrLen)
Expand Down Expand Up @@ -129,6 +134,7 @@ func (s *Streamdal) createFunction(step *protos.PipelineStep) (*function, error)
entry: f,
alloc: alloc,
dealloc: dealloc,
mtx: &sync.Mutex{},
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/onsi/gomega v1.28.0
github.com/pkg/errors v0.9.1
github.com/relistan/go-director v0.0.0-20200406104025-dbbf5d95248d
github.com/streamdal/protos v0.0.115
github.com/streamdal/protos v0.0.119
github.com/tetratelabs/wazero v1.5.0
google.golang.org/grpc v1.56.3
google.golang.org/protobuf v1.31.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykE
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/streamdal/protos v0.0.115 h1:wsLiKEA4oG2yAj2Tm2cl8yirE34oHur4iVjxRDk8Y5o=
github.com/streamdal/protos v0.0.115/go.mod h1:ciPOA0/x5PG4yxOdwBtLFPwyAKwecMkuEitO3csbB7A=
github.com/streamdal/protos v0.0.119 h1:mgnRZa9KxEolIfMJtDQjo3YgTOdFo1fwrI9aQQBT2aU=
github.com/streamdal/protos v0.0.119/go.mod h1:ciPOA0/x5PG4yxOdwBtLFPwyAKwecMkuEitO3csbB7A=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down
20 changes: 10 additions & 10 deletions go_sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func New(cfg *Config) (*Streamdal, error) {
return nil, err
}

errCh := make(chan error, 0)
errCh := make(chan error)

// Start register
go func() {
Expand Down Expand Up @@ -585,6 +585,7 @@ func (s *Streamdal) Process(ctx context.Context, req *ProcessRequest) (*ProcessR

originalData := data // Used for tail request

PIPELINE:
for _, p := range pipelines {
pipeline := p.GetAttachPipeline().GetPipeline()

Expand All @@ -599,7 +600,8 @@ func (s *Streamdal) Process(ctx context.Context, req *ProcessRequest) (*ProcessR
select {
case <-timeoutCtx.Done():
timeoutCxl()
return nil, ErrPipelineTimeout
s.config.Logger.Errorf("pipeline '%s' timeout exceeded", pipeline.Name)
continue PIPELINE
default:
// NOOP
}
Expand All @@ -611,8 +613,7 @@ func (s *Streamdal) Process(ctx context.Context, req *ProcessRequest) (*ProcessR
shouldContinue := s.handleConditions(ctx, step.OnFailure, pipeline, step, aud, req)
if !shouldContinue {
timeoutCxl()
s.sendTail(aud, pipeline.Id, originalData, wasmResp.OutputPayload)
return nil, err
continue PIPELINE
}

// wasmResp will be nil, so don't allow code below to execute
Expand All @@ -627,10 +628,9 @@ func (s *Streamdal) Process(ctx context.Context, req *ProcessRequest) (*ProcessR
shouldContinue := s.handleConditions(ctx, step.OnSuccess, pipeline, step, aud, req)
if !shouldContinue {
timeoutCxl()
s.sendTail(aud, pipeline.Id, originalData, wasmResp.OutputPayload)
return &ProcessResponse{
Data: wasmResp.OutputPayload,
}, nil
s.config.Logger.Debugf("Step '%s' returned exit code success but step "+
"condition failed, aborting pipeline", step.Name)
continue PIPELINE
}
case protos.WASMExitCode_WASM_EXIT_CODE_FAILURE:
fallthrough
Expand All @@ -642,8 +642,8 @@ func (s *Streamdal) Process(ctx context.Context, req *ProcessRequest) (*ProcessR
shouldContinue := s.handleConditions(ctx, step.OnFailure, pipeline, step, aud, req)
if !shouldContinue {
timeoutCxl()
s.sendTail(aud, pipeline.Id, originalData, wasmResp.OutputPayload)
return nil, fmt.Errorf("step failed: %s", wasmResp.ExitMsg)
s.config.Logger.Debugf("Step '%s' returned exit code failure, aborting pipeline", step.Name)
continue PIPELINE
}
default:
_ = s.metrics.Incr(ctx, &types.CounterEntry{Name: counterError, Labels: s.getCounterLabels(req, pipeline), Value: 1, Audience: aud})
Expand Down
86 changes: 83 additions & 3 deletions go_sdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,86 @@ var _ = Describe("Streamdal", func() {
Expect(err).ToNot(HaveOccurred())
})

It("fails on a detective match and aborts", func() {
// TODO: how to test with multipipeline
//It("fails on a detective match and aborts", func() {
// aud := &protos.Audience{
// ServiceName: "mysvc1",
// ComponentName: "kafka",
// OperationType: protos.OperationType_OPERATION_TYPE_PRODUCER,
// OperationName: "mytopic",
// }
//
// wasmData, err := os.ReadFile("test-assets/wasm/detective.wasm")
// Expect(err).ToNot(HaveOccurred())
//
// pipeline := &protos.Pipeline{
// Id: uuid.New().String(),
// Name: "Test Pipeline",
// Steps: []*protos.PipelineStep{
// {
// Name: "Step 1",
// XWasmId: stringPtr(uuid.New().String()),
// XWasmBytes: wasmData,
// XWasmFunction: stringPtr("f"),
// OnSuccess: make([]protos.PipelineStepCondition, 0),
// OnFailure: []protos.PipelineStepCondition{protos.PipelineStepCondition_PIPELINE_STEP_CONDITION_ABORT},
// Step: &protos.PipelineStep_Detective{
// Detective: &steps.DetectiveStep{
// Path: stringPtr("object.payload"),
// Args: []string{"gmail.com"},
// Negate: boolPtr(false),
// Type: steps.DetectiveType_DETECTIVE_TYPE_STRING_CONTAINS_ANY,
// },
// },
// },
// },
// }
//
// s := &Streamdal{
// serverClient: &serverfakes.FakeIServerClient{},
// functionsMtx: &sync.RWMutex{},
// functions: map[string]*function{},
// audiencesMtx: &sync.RWMutex{},
// audiences: map[string]struct{}{},
// config: &Config{
// ServiceName: "mysvc1",
// Logger: &logger.TinyLogger{},
// StepTimeout: time.Millisecond * 10,
// PipelineTimeout: time.Millisecond * 100,
// },
// metrics: &metricsfakes.FakeIMetrics{},
// tails: map[string]map[string]*Tail{},
// tailsMtx: &sync.RWMutex{},
// pipelinesMtx: &sync.RWMutex{},
// pipelines: map[string]map[string]*protos.Command{
// audToStr(aud): {
// pipeline.Id: {
// Audience: aud,
// Command: &protos.Command_AttachPipeline{
// AttachPipeline: &protos.AttachPipelineCommand{
// Pipeline: pipeline,
// },
// },
// },
// },
// },
// }
//
// resp, err := s.Process(context.Background(), &ProcessRequest{
// ComponentName: aud.ComponentName,
// OperationType: OperationType(aud.OperationType),
// OperationName: aud.OperationName,
// Data: []byte(`{"object":{"payload":"[email protected]"}`),
// })
// Expect(err).ToNot(HaveOccurred())
// Expect(resp.Error).To(BeTrue())
// Expect(resp.Message).To(ContainSubstring("step failed"))
//})
})

Context("Multithreaded test", func() {
It("succeeds with multiple threads", func() {

aud := &protos.Audience{
ServiceName: "mysvc1",
ComponentName: "kafka",
Expand Down Expand Up @@ -348,15 +427,15 @@ var _ = Describe("Streamdal", func() {
functions: map[string]*function{},
audiencesMtx: &sync.RWMutex{},
audiences: map[string]struct{}{},
tails: map[string]map[string]*Tail{},
tailsMtx: &sync.RWMutex{},
config: &Config{
ServiceName: "mysvc1",
Logger: &logger.TinyLogger{},
StepTimeout: time.Millisecond * 10,
PipelineTimeout: time.Millisecond * 100,
},
metrics: &metricsfakes.FakeIMetrics{},
tails: map[string]map[string]*Tail{},
tailsMtx: &sync.RWMutex{},
pipelinesMtx: &sync.RWMutex{},
pipelines: map[string]map[string]*protos.Command{
audToStr(aud): {
Expand All @@ -380,6 +459,7 @@ var _ = Describe("Streamdal", func() {
})
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("step failed"))

})
})
})
Expand Down
2 changes: 1 addition & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (k *KV) Keys() []string {

i := 0

for key, _ := range k.kvs {
for key := range k.kvs {
keys[i] = key
i++
}
Expand Down
2 changes: 1 addition & 1 deletion register.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (s *Streamdal) genClientInfo() *protos.ClientInfo {
return &protos.ClientInfo{
ClientType: protos.ClientType(s.config.ClientType),
LibraryName: "go-sdk",
LibraryVersion: "v0.0.71",
LibraryVersion: "v0.0.74",
Language: "go",
Arch: runtime.GOARCH,
Os: runtime.GOOS,
Expand Down
2 changes: 1 addition & 1 deletion schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var _ = Describe("Schema", func() {

Context("getSchema", func() {
It("returns empty when no schema found", func() {
schema := s.getSchema(nil, nil)
schema := s.getSchema(context.Background(), nil)
Expect(schema).To(Equal([]byte(``)))
})

Expand Down
3 changes: 2 additions & 1 deletion server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"

Expand Down Expand Up @@ -97,7 +98,7 @@ func dialServer(serverAddr string) (*grpc.ClientConn, error) {
Timeout: 30 * time.Second,
}),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxGRPCMessageRecvSize)),
grpc.WithInsecure(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)

conn, err := grpc.DialContext(dialCtx, serverAddr, opts...)
Expand Down

0 comments on commit 4f2b715

Please sign in to comment.