Skip to content
This repository has been archived by the owner on Feb 9, 2024. It is now read-only.

Dselans/ordered pipelines #66

Merged
merged 3 commits into from
Feb 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func benchmarkWASM(wasmFile, testPayloadFile string, step *protos.PipelineStep,

s := &Streamdal{
pipelinesMtx: &sync.RWMutex{},
pipelines: map[string]map[string]*protos.Command{},
pipelines: map[string][]*protos.Pipeline{},
audiencesMtx: &sync.RWMutex{},
audiences: map[string]struct{}{},
}
Expand Down Expand Up @@ -204,7 +204,7 @@ func inferSchema(fileName string) (*protos.WASMResponse, error) {

s := &Streamdal{
pipelinesMtx: &sync.RWMutex{},
pipelines: map[string]map[string]*protos.Command{},
pipelines: map[string][]*protos.Pipeline{},
audiencesMtx: &sync.RWMutex{},
audiences: map[string]struct{}{},
}
Expand Down
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/onsi/gomega v1.30.0
github.com/pkg/errors v0.9.1
github.com/relistan/go-director v0.0.0-20200406104025-dbbf5d95248d
github.com/streamdal/streamdal/libs/protos v0.1.16
github.com/streamdal/streamdal/libs/protos v0.1.17
github.com/tetratelabs/wazero v1.6.0
golang.org/x/time v0.5.0
google.golang.org/grpc v1.60.1
Expand All @@ -21,10 +21,12 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/pprof v0.0.0-20231212022811-ec68065c825e // indirect
github.com/maxbrunsfeld/counterfeiter/v6 v6.8.1 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.16.1 // indirect
golang.org/x/tools v0.17.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGa
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/maxbrunsfeld/counterfeiter/v6 v6.8.1 h1:NicmruxkeqHjDv03SfSxqmaLuisddudfP3h5wdXFbhM=
github.com/maxbrunsfeld/counterfeiter/v6 v6.8.1/go.mod h1:eyp4DdUJAKkr9tvxR3jWhw2mDK7CWABMG5r9uyaKC7I=
github.com/onsi/ginkgo/v2 v2.13.2 h1:Bi2gGVkfn6gQcjNjZJVO8Gf0FHzMPf2phUei9tejVMs=
github.com/onsi/ginkgo/v2 v2.13.2/go.mod h1:XStQ8QcGwLyF4HdfcZB8SFOS/MWCgDuXMSBe6zrvLgM=
github.com/onsi/gomega v1.30.0 h1:hvMK7xYz4D3HapigLTeGdId/NcfQx1VHMJc60ew99+8=
Expand All @@ -37,12 +39,18 @@ github.com/streamdal/streamdal/libs/protos v0.1.14 h1:DK1vPmBrX6fLy6uCmNJsonZdqp
github.com/streamdal/streamdal/libs/protos v0.1.14/go.mod h1:WZ6qCqzJu/9Vn+P5EUMN1hcOWlxN2EHgcsxehylhgJQ=
github.com/streamdal/streamdal/libs/protos v0.1.16 h1:e/xGg9rGBpr+wfNBAsRVPMGs8MlZLtmzfcjnYLdp2K0=
github.com/streamdal/streamdal/libs/protos v0.1.16/go.mod h1:WZ6qCqzJu/9Vn+P5EUMN1hcOWlxN2EHgcsxehylhgJQ=
github.com/streamdal/streamdal/libs/protos v0.1.17-0.20240129193545-8d8c941ad631 h1:Oy+1xjLuheTh/DH9zhJ1xNhfkaxZL1F8ZKhscopzytk=
github.com/streamdal/streamdal/libs/protos v0.1.17-0.20240129193545-8d8c941ad631/go.mod h1:WZ6qCqzJu/9Vn+P5EUMN1hcOWlxN2EHgcsxehylhgJQ=
github.com/streamdal/streamdal/libs/protos v0.1.17 h1:VbG0AQ2qIGVrXwz3/8W6630P2RFtQApZ+fDdyfVuSu8=
github.com/streamdal/streamdal/libs/protos v0.1.17/go.mod h1:WZ6qCqzJu/9Vn+P5EUMN1hcOWlxN2EHgcsxehylhgJQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tetratelabs/wazero v1.6.0 h1:z0H1iikCdP8t+q341xqepY4EWvHEw8Es7tlqiVzlP3g=
github.com/tetratelabs/wazero v1.6.0/go.mod h1:0U0G41+ochRKoPKCJlh0jMg1CHkyfK8kDqiirMmKY8A=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0=
golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
Expand All @@ -57,6 +65,8 @@ golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.16.1 h1:TLyB3WofjdOEepBHAU20JdNC1Zbg87elYofWYAY5oZA=
golang.org/x/tools v0.16.1/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0=
golang.org/x/tools v0.17.0 h1:FvmRgNOcs3kOa+T20R1uhfP9F6HgG2mfxDv1vrx1Htc=
golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac h1:nUQEQmH/csSvFECKYRv6HWEyypysidKl2I6Qpsglq/0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac/go.mod h1:daQN87bsDqDoe316QbbvX60nMoJQa4r6Ds0ZuoAe5yA=
Expand Down
126 changes: 57 additions & 69 deletions go_sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,26 +98,24 @@ type IStreamdal interface {

// Streamdal is the main struct for this library
type Streamdal struct {
config *Config
functions map[string]*function
pipelines map[string]map[string]*protos.Command // k1: audienceStr k2: pipelineID
pipelinesPaused map[string]map[string]*protos.Command // k1: audienceStr k2: pipelineID
functionsMtx *sync.RWMutex
pipelinesMtx *sync.RWMutex
pipelinesPausedMtx *sync.RWMutex
serverClient server.IServerClient
metrics metrics.IMetrics
audiences map[string]struct{} // k: audienceStr
audiencesMtx *sync.RWMutex
sessionID string
kv kv.IKV
hf *hostfunc.HostFunc
tailsMtx *sync.RWMutex
tails map[string]map[string]*Tail // k1: audienceStr k2: tailID
pausedTailsMtx *sync.RWMutex
pausedTails map[string]map[string]*Tail // k1: audienceStr k2: tailID
schemas map[string]*protos.Schema // k: audienceStr
schemasMtx *sync.RWMutex
config *Config
functions map[string]*function
functionsMtx *sync.RWMutex
pipelines map[string][]*protos.Pipeline // k: audienceStr
pipelinesMtx *sync.RWMutex
serverClient server.IServerClient
metrics metrics.IMetrics
audiences map[string]struct{} // k: audienceStr
audiencesMtx *sync.RWMutex
sessionID string
kv kv.IKV
hf *hostfunc.HostFunc
tailsMtx *sync.RWMutex
tails map[string]map[string]*Tail // k1: audienceStr k2: tailID
pausedTailsMtx *sync.RWMutex
pausedTails map[string]map[string]*Tail // k1: audienceStr k2: tailID
schemas map[string]*protos.Schema // k: audienceStr
schemasMtx *sync.RWMutex
}

type Config struct {
Expand Down Expand Up @@ -230,26 +228,24 @@ func New(cfg *Config) (*Streamdal, error) {
}

s := &Streamdal{
functions: make(map[string]*function),
functionsMtx: &sync.RWMutex{},
serverClient: serverClient,
pipelines: make(map[string]map[string]*protos.Command),
pipelinesMtx: &sync.RWMutex{},
pipelinesPaused: make(map[string]map[string]*protos.Command),
pipelinesPausedMtx: &sync.RWMutex{},
audiences: map[string]struct{}{},
audiencesMtx: &sync.RWMutex{},
config: cfg,
metrics: m,
sessionID: uuid.New().String(),
kv: kvInstance,
hf: hf,
tailsMtx: &sync.RWMutex{},
tails: make(map[string]map[string]*Tail),
pausedTailsMtx: &sync.RWMutex{},
pausedTails: make(map[string]map[string]*Tail),
schemasMtx: &sync.RWMutex{},
schemas: make(map[string]*protos.Schema),
functions: make(map[string]*function),
functionsMtx: &sync.RWMutex{},
serverClient: serverClient,
pipelines: make(map[string][]*protos.Pipeline),
pipelinesMtx: &sync.RWMutex{},
audiences: map[string]struct{}{},
audiencesMtx: &sync.RWMutex{},
config: cfg,
metrics: m,
sessionID: uuid.New().String(),
kv: kvInstance,
hf: hf,
tailsMtx: &sync.RWMutex{},
tails: make(map[string]map[string]*Tail),
pausedTailsMtx: &sync.RWMutex{},
pausedTails: make(map[string]map[string]*Tail),
schemasMtx: &sync.RWMutex{},
schemas: make(map[string]*protos.Schema),
}

if cfg.DryRun {
Expand Down Expand Up @@ -392,36 +388,30 @@ func (s *Streamdal) watchForShutdown() {
}

func (s *Streamdal) pullInitialPipelines(ctx context.Context) error {
cmds, err := s.serverClient.GetAttachCommandsByService(ctx, s.config.ServiceName)
cmds, err := s.serverClient.GetSetPipelinesCommandByService(ctx, s.config.ServiceName)
if err != nil {
return errors.Wrap(err, "unable to pull initial pipelines")
}

for _, cmd := range cmds.Active {
s.config.Logger.Debugf("Attaching pipeline '%s'", cmd.GetAttachPipeline().Pipeline.Name)
// Commands won't include paused pipelines but we can check just in case
for _, cmd := range cmds.SetPipelineCommands {
for _, p := range cmd.GetSetPipelines().Pipelines {
s.config.Logger.Debugf("saving pipeline '%s' for audience '%s' to internal map", p.Name, audToStr(cmd.Audience))

// Fill in WASM data from the deduplication map
for _, step := range cmd.GetAttachPipeline().Pipeline.Steps {
wasmData, ok := cmds.WasmModules[step.GetXWasmId()]
if !ok {
return errors.Errorf("BUG: unable to find WASM data for step '%s'", step.Name)
}

step.XWasmBytes = wasmData.Bytes
}
// Fill in WASM data from the deduplication map
for _, step := range p.Steps {
wasmData, ok := cmds.WasmModules[step.GetXWasmId()]
if !ok {
return errors.Errorf("BUG: unable to find WASM data for step '%s'", step.Name)
}

if err := s.attachPipeline(ctx, cmd); err != nil {
s.config.Logger.Errorf("failed to attach pipeline: %s", err)
}
}
step.XWasmBytes = wasmData.Bytes
}

for _, cmd := range cmds.Paused {
s.config.Logger.Debugf("Pipeline '%s' is paused", cmd.GetAttachPipeline().Pipeline.Name)
if _, ok := s.pipelinesPaused[audToStr(cmd.Audience)]; !ok {
s.pipelinesPaused[audToStr(cmd.Audience)] = make(map[string]*protos.Command)
if err := s.setPipelines(ctx, cmd); err != nil {
s.config.Logger.Errorf("failed to attach pipeline: %s", err)
}
}

s.pipelinesPaused[audToStr(cmd.Audience)][cmd.GetAttachPipeline().Pipeline.Id] = cmd
}

return nil
Expand Down Expand Up @@ -512,15 +502,15 @@ func (s *Streamdal) runStep(ctx context.Context, aud *protos.Audience, step *pro
return resp, nil
}

func (s *Streamdal) getPipelines(ctx context.Context, aud *protos.Audience) map[string]*protos.Command {
func (s *Streamdal) getPipelines(ctx context.Context, aud *protos.Audience) []*protos.Pipeline {
s.pipelinesMtx.RLock()
defer s.pipelinesMtx.RUnlock()

s.addAudience(ctx, aud)

pipelines, ok := s.pipelines[audToStr(aud)]
if !ok {
return make(map[string]*protos.Command)
return make([]*protos.Pipeline, 0)
}

return pipelines
Expand Down Expand Up @@ -630,20 +620,18 @@ func (s *Streamdal) Process(ctx context.Context, req *ProcessRequest) *ProcessRe
)

PIPELINE:
for _, p := range pipelines {
for _, pipeline := range pipelines {
var isr *protos.InterStepResult
pIndex += 1

pipelineTimeoutCtx, pipelineTimeoutCxl := context.WithTimeout(ctx, s.config.PipelineTimeout)

pipelineStatus := &protos.PipelineStatus{
Id: p.GetAttachPipeline().GetPipeline().Id,
Name: p.GetAttachPipeline().GetPipeline().Name,
Id: pipeline.Id,
Name: pipeline.Name,
StepStatus: make([]*protos.StepStatus, 0),
}

pipeline := p.GetAttachPipeline().GetPipeline()

_ = s.metrics.Incr(ctx, &types.CounterEntry{Name: counterProcessed, Labels: s.getCounterLabels(req, pipeline), Value: 1, Audience: aud})
_ = s.metrics.Incr(ctx, &types.CounterEntry{Name: counterBytes, Labels: s.getCounterLabels(req, pipeline), Value: payloadSize, Audience: aud})

Expand Down
30 changes: 15 additions & 15 deletions go_sdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@ type InternalServer struct {
protos.UnimplementedInternalServer
}

func (i *InternalServer) GetAttachCommandsByService(ctx context.Context, req *protos.GetAttachCommandsByServiceRequest) (*protos.GetAttachCommandsByServiceResponse, error) {
return &protos.GetAttachCommandsByServiceResponse{}, nil
// Needed for implementing fake grpc server for testing
func (i *InternalServer) GetSetPipelinesCommandsByService(
_ context.Context,
_ *protos.GetSetPipelinesCommandsByServiceRequest,
) (*protos.GetSetPipelinesCommandsByServiceResponse, error) {
return &protos.GetSetPipelinesCommandsByServiceResponse{}, nil
}

func (i *InternalServer) SendTail(srv protos.Internal_SendTailServer) error {
Expand Down Expand Up @@ -153,11 +157,12 @@ var _ = Describe("Streamdal", func() {
Context("getPipelines", func() {
ctx := context.Background()

// TODO: Re-generate fake
fakeClient := &serverfakes.FakeIServerClient{}

s := &Streamdal{
pipelinesMtx: &sync.RWMutex{},
pipelines: map[string]map[string]*protos.Command{},
pipelines: map[string][]*protos.Pipeline{},
serverClient: fakeClient,
audiencesMtx: &sync.RWMutex{},
audiences: map[string]struct{}{},
Expand All @@ -182,8 +187,10 @@ var _ = Describe("Streamdal", func() {
})

It("returns a single pipeline", func() {
s.pipelines[audToStr(aud)] = map[string]*protos.Command{
uuid.New().String(): {},
s.pipelines[audToStr(aud)] = []*protos.Pipeline{
{
Id: uuid.New().String(),
},
}
Expect(len(s.getPipelines(ctx, aud))).To(Equal(1))
})
Expand Down Expand Up @@ -536,16 +543,9 @@ func createStreamdalClientFull(serviceName string, aud *protos.Audience, pipelin
tails: map[string]map[string]*Tail{},
tailsMtx: &sync.RWMutex{},
pipelinesMtx: &sync.RWMutex{},
pipelines: map[string]map[string]*protos.Command{
pipelines: map[string][]*protos.Pipeline{
audToStr(aud): {
pipeline.Id: {
Audience: aud,
Command: &protos.Command_AttachPipeline{
AttachPipeline: &protos.AttachPipelineCommand{
Pipeline: pipeline,
},
},
},
pipeline,
},
},
}
Expand All @@ -564,7 +564,7 @@ func createStreamdalClient() (*Streamdal, *kv.KV, error) {

return &Streamdal{
pipelinesMtx: &sync.RWMutex{},
pipelines: map[string]map[string]*protos.Command{},
pipelines: map[string][]*protos.Pipeline{},
audiencesMtx: &sync.RWMutex{},
audiences: map[string]struct{}{},
kv: kvClient,
Expand Down
Loading
Loading