Skip to content
This repository has been archived by the owner on Feb 9, 2024. It is now read-only.

Commit

Permalink
Merge pull request #66 from streamdal/dselans/ordered_pipelines
Browse files Browse the repository at this point in the history
Dselans/ordered pipelines
  • Loading branch information
dselans authored Feb 3, 2024
2 parents 1c15a1a + 131758e commit 65f0bc1
Show file tree
Hide file tree
Showing 12 changed files with 176 additions and 429 deletions.
4 changes: 2 additions & 2 deletions benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func benchmarkWASM(wasmFile, testPayloadFile string, step *protos.PipelineStep,

s := &Streamdal{
pipelinesMtx: &sync.RWMutex{},
pipelines: map[string]map[string]*protos.Command{},
pipelines: map[string][]*protos.Pipeline{},
audiencesMtx: &sync.RWMutex{},
audiences: map[string]struct{}{},
}
Expand Down Expand Up @@ -204,7 +204,7 @@ func inferSchema(fileName string) (*protos.WASMResponse, error) {

s := &Streamdal{
pipelinesMtx: &sync.RWMutex{},
pipelines: map[string]map[string]*protos.Command{},
pipelines: map[string][]*protos.Pipeline{},
audiencesMtx: &sync.RWMutex{},
audiences: map[string]struct{}{},
}
Expand Down
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/onsi/gomega v1.30.0
github.com/pkg/errors v0.9.1
github.com/relistan/go-director v0.0.0-20200406104025-dbbf5d95248d
github.com/streamdal/streamdal/libs/protos v0.1.16
github.com/streamdal/streamdal/libs/protos v0.1.17
github.com/tetratelabs/wazero v1.6.0
golang.org/x/time v0.5.0
google.golang.org/grpc v1.60.1
Expand All @@ -21,10 +21,12 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/pprof v0.0.0-20231212022811-ec68065c825e // indirect
github.com/maxbrunsfeld/counterfeiter/v6 v6.8.1 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.16.1 // indirect
golang.org/x/tools v0.17.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGa
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/maxbrunsfeld/counterfeiter/v6 v6.8.1 h1:NicmruxkeqHjDv03SfSxqmaLuisddudfP3h5wdXFbhM=
github.com/maxbrunsfeld/counterfeiter/v6 v6.8.1/go.mod h1:eyp4DdUJAKkr9tvxR3jWhw2mDK7CWABMG5r9uyaKC7I=
github.com/onsi/ginkgo/v2 v2.13.2 h1:Bi2gGVkfn6gQcjNjZJVO8Gf0FHzMPf2phUei9tejVMs=
github.com/onsi/ginkgo/v2 v2.13.2/go.mod h1:XStQ8QcGwLyF4HdfcZB8SFOS/MWCgDuXMSBe6zrvLgM=
github.com/onsi/gomega v1.30.0 h1:hvMK7xYz4D3HapigLTeGdId/NcfQx1VHMJc60ew99+8=
Expand All @@ -37,12 +39,18 @@ github.com/streamdal/streamdal/libs/protos v0.1.14 h1:DK1vPmBrX6fLy6uCmNJsonZdqp
github.com/streamdal/streamdal/libs/protos v0.1.14/go.mod h1:WZ6qCqzJu/9Vn+P5EUMN1hcOWlxN2EHgcsxehylhgJQ=
github.com/streamdal/streamdal/libs/protos v0.1.16 h1:e/xGg9rGBpr+wfNBAsRVPMGs8MlZLtmzfcjnYLdp2K0=
github.com/streamdal/streamdal/libs/protos v0.1.16/go.mod h1:WZ6qCqzJu/9Vn+P5EUMN1hcOWlxN2EHgcsxehylhgJQ=
github.com/streamdal/streamdal/libs/protos v0.1.17-0.20240129193545-8d8c941ad631 h1:Oy+1xjLuheTh/DH9zhJ1xNhfkaxZL1F8ZKhscopzytk=
github.com/streamdal/streamdal/libs/protos v0.1.17-0.20240129193545-8d8c941ad631/go.mod h1:WZ6qCqzJu/9Vn+P5EUMN1hcOWlxN2EHgcsxehylhgJQ=
github.com/streamdal/streamdal/libs/protos v0.1.17 h1:VbG0AQ2qIGVrXwz3/8W6630P2RFtQApZ+fDdyfVuSu8=
github.com/streamdal/streamdal/libs/protos v0.1.17/go.mod h1:WZ6qCqzJu/9Vn+P5EUMN1hcOWlxN2EHgcsxehylhgJQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tetratelabs/wazero v1.6.0 h1:z0H1iikCdP8t+q341xqepY4EWvHEw8Es7tlqiVzlP3g=
github.com/tetratelabs/wazero v1.6.0/go.mod h1:0U0G41+ochRKoPKCJlh0jMg1CHkyfK8kDqiirMmKY8A=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0=
golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
Expand All @@ -57,6 +65,8 @@ golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.16.1 h1:TLyB3WofjdOEepBHAU20JdNC1Zbg87elYofWYAY5oZA=
golang.org/x/tools v0.16.1/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0=
golang.org/x/tools v0.17.0 h1:FvmRgNOcs3kOa+T20R1uhfP9F6HgG2mfxDv1vrx1Htc=
golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac h1:nUQEQmH/csSvFECKYRv6HWEyypysidKl2I6Qpsglq/0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac/go.mod h1:daQN87bsDqDoe316QbbvX60nMoJQa4r6Ds0ZuoAe5yA=
Expand Down
126 changes: 57 additions & 69 deletions go_sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,26 +104,24 @@ type IStreamdal interface {

// Streamdal is the main struct for this library
type Streamdal struct {
config *Config
functions map[string]*function
pipelines map[string]map[string]*protos.Command // k1: audienceStr k2: pipelineID
pipelinesPaused map[string]map[string]*protos.Command // k1: audienceStr k2: pipelineID
functionsMtx *sync.RWMutex
pipelinesMtx *sync.RWMutex
pipelinesPausedMtx *sync.RWMutex
serverClient server.IServerClient
metrics metrics.IMetrics
audiences map[string]struct{} // k: audienceStr
audiencesMtx *sync.RWMutex
sessionID string
kv kv.IKV
hf *hostfunc.HostFunc
tailsMtx *sync.RWMutex
tails map[string]map[string]*Tail // k1: audienceStr k2: tailID
pausedTailsMtx *sync.RWMutex
pausedTails map[string]map[string]*Tail // k1: audienceStr k2: tailID
schemas map[string]*protos.Schema // k: audienceStr
schemasMtx *sync.RWMutex
config *Config
functions map[string]*function
functionsMtx *sync.RWMutex
pipelines map[string][]*protos.Pipeline // k: audienceStr
pipelinesMtx *sync.RWMutex
serverClient server.IServerClient
metrics metrics.IMetrics
audiences map[string]struct{} // k: audienceStr
audiencesMtx *sync.RWMutex
sessionID string
kv kv.IKV
hf *hostfunc.HostFunc
tailsMtx *sync.RWMutex
tails map[string]map[string]*Tail // k1: audienceStr k2: tailID
pausedTailsMtx *sync.RWMutex
pausedTails map[string]map[string]*Tail // k1: audienceStr k2: tailID
schemas map[string]*protos.Schema // k: audienceStr
schemasMtx *sync.RWMutex
}

type Config struct {
Expand Down Expand Up @@ -236,26 +234,24 @@ func New(cfg *Config) (*Streamdal, error) {
}

s := &Streamdal{
functions: make(map[string]*function),
functionsMtx: &sync.RWMutex{},
serverClient: serverClient,
pipelines: make(map[string]map[string]*protos.Command),
pipelinesMtx: &sync.RWMutex{},
pipelinesPaused: make(map[string]map[string]*protos.Command),
pipelinesPausedMtx: &sync.RWMutex{},
audiences: map[string]struct{}{},
audiencesMtx: &sync.RWMutex{},
config: cfg,
metrics: m,
sessionID: uuid.New().String(),
kv: kvInstance,
hf: hf,
tailsMtx: &sync.RWMutex{},
tails: make(map[string]map[string]*Tail),
pausedTailsMtx: &sync.RWMutex{},
pausedTails: make(map[string]map[string]*Tail),
schemasMtx: &sync.RWMutex{},
schemas: make(map[string]*protos.Schema),
functions: make(map[string]*function),
functionsMtx: &sync.RWMutex{},
serverClient: serverClient,
pipelines: make(map[string][]*protos.Pipeline),
pipelinesMtx: &sync.RWMutex{},
audiences: map[string]struct{}{},
audiencesMtx: &sync.RWMutex{},
config: cfg,
metrics: m,
sessionID: uuid.New().String(),
kv: kvInstance,
hf: hf,
tailsMtx: &sync.RWMutex{},
tails: make(map[string]map[string]*Tail),
pausedTailsMtx: &sync.RWMutex{},
pausedTails: make(map[string]map[string]*Tail),
schemasMtx: &sync.RWMutex{},
schemas: make(map[string]*protos.Schema),
}

if cfg.DryRun {
Expand Down Expand Up @@ -398,36 +394,30 @@ func (s *Streamdal) watchForShutdown() {
}

func (s *Streamdal) pullInitialPipelines(ctx context.Context) error {
cmds, err := s.serverClient.GetAttachCommandsByService(ctx, s.config.ServiceName)
cmds, err := s.serverClient.GetSetPipelinesCommandByService(ctx, s.config.ServiceName)
if err != nil {
return errors.Wrap(err, "unable to pull initial pipelines")
}

for _, cmd := range cmds.Active {
s.config.Logger.Debugf("Attaching pipeline '%s'", cmd.GetAttachPipeline().Pipeline.Name)
// Commands won't include paused pipelines but we can check just in case
for _, cmd := range cmds.SetPipelineCommands {
for _, p := range cmd.GetSetPipelines().Pipelines {
s.config.Logger.Debugf("saving pipeline '%s' for audience '%s' to internal map", p.Name, audToStr(cmd.Audience))

// Fill in WASM data from the deduplication map
for _, step := range cmd.GetAttachPipeline().Pipeline.Steps {
wasmData, ok := cmds.WasmModules[step.GetXWasmId()]
if !ok {
return errors.Errorf("BUG: unable to find WASM data for step '%s'", step.Name)
}

step.XWasmBytes = wasmData.Bytes
}
// Fill in WASM data from the deduplication map
for _, step := range p.Steps {
wasmData, ok := cmds.WasmModules[step.GetXWasmId()]
if !ok {
return errors.Errorf("BUG: unable to find WASM data for step '%s'", step.Name)
}

if err := s.attachPipeline(ctx, cmd); err != nil {
s.config.Logger.Errorf("failed to attach pipeline: %s", err)
}
}
step.XWasmBytes = wasmData.Bytes
}

for _, cmd := range cmds.Paused {
s.config.Logger.Debugf("Pipeline '%s' is paused", cmd.GetAttachPipeline().Pipeline.Name)
if _, ok := s.pipelinesPaused[audToStr(cmd.Audience)]; !ok {
s.pipelinesPaused[audToStr(cmd.Audience)] = make(map[string]*protos.Command)
if err := s.setPipelines(ctx, cmd); err != nil {
s.config.Logger.Errorf("failed to attach pipeline: %s", err)
}
}

s.pipelinesPaused[audToStr(cmd.Audience)][cmd.GetAttachPipeline().Pipeline.Id] = cmd
}

return nil
Expand Down Expand Up @@ -518,15 +508,15 @@ func (s *Streamdal) runStep(ctx context.Context, aud *protos.Audience, step *pro
return resp, nil
}

func (s *Streamdal) getPipelines(ctx context.Context, aud *protos.Audience) map[string]*protos.Command {
func (s *Streamdal) getPipelines(ctx context.Context, aud *protos.Audience) []*protos.Pipeline {
s.pipelinesMtx.RLock()
defer s.pipelinesMtx.RUnlock()

s.addAudience(ctx, aud)

pipelines, ok := s.pipelines[audToStr(aud)]
if !ok {
return make(map[string]*protos.Command)
return make([]*protos.Pipeline, 0)
}

return pipelines
Expand Down Expand Up @@ -636,20 +626,18 @@ func (s *Streamdal) Process(ctx context.Context, req *ProcessRequest) *ProcessRe
)

PIPELINE:
for _, p := range pipelines {
for _, pipeline := range pipelines {
var isr *protos.InterStepResult
pIndex += 1

pipelineTimeoutCtx, pipelineTimeoutCxl := context.WithTimeout(ctx, s.config.PipelineTimeout)

pipelineStatus := &protos.PipelineStatus{
Id: p.GetAttachPipeline().GetPipeline().Id,
Name: p.GetAttachPipeline().GetPipeline().Name,
Id: pipeline.Id,
Name: pipeline.Name,
StepStatus: make([]*protos.StepStatus, 0),
}

pipeline := p.GetAttachPipeline().GetPipeline()

_ = s.metrics.Incr(ctx, &types.CounterEntry{Name: counterProcessed, Labels: s.getCounterLabels(req, pipeline), Value: 1, Audience: aud})
_ = s.metrics.Incr(ctx, &types.CounterEntry{Name: counterBytes, Labels: s.getCounterLabels(req, pipeline), Value: payloadSize, Audience: aud})

Expand Down
30 changes: 15 additions & 15 deletions go_sdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@ type InternalServer struct {
protos.UnimplementedInternalServer
}

func (i *InternalServer) GetAttachCommandsByService(ctx context.Context, req *protos.GetAttachCommandsByServiceRequest) (*protos.GetAttachCommandsByServiceResponse, error) {
return &protos.GetAttachCommandsByServiceResponse{}, nil
// Needed for implementing fake grpc server for testing
func (i *InternalServer) GetSetPipelinesCommandsByService(
_ context.Context,
_ *protos.GetSetPipelinesCommandsByServiceRequest,
) (*protos.GetSetPipelinesCommandsByServiceResponse, error) {
return &protos.GetSetPipelinesCommandsByServiceResponse{}, nil
}

func (i *InternalServer) SendTail(srv protos.Internal_SendTailServer) error {
Expand Down Expand Up @@ -153,11 +157,12 @@ var _ = Describe("Streamdal", func() {
Context("getPipelines", func() {
ctx := context.Background()

// TODO: Re-generate fake
fakeClient := &serverfakes.FakeIServerClient{}

s := &Streamdal{
pipelinesMtx: &sync.RWMutex{},
pipelines: map[string]map[string]*protos.Command{},
pipelines: map[string][]*protos.Pipeline{},
serverClient: fakeClient,
audiencesMtx: &sync.RWMutex{},
audiences: map[string]struct{}{},
Expand All @@ -182,8 +187,10 @@ var _ = Describe("Streamdal", func() {
})

It("returns a single pipeline", func() {
s.pipelines[audToStr(aud)] = map[string]*protos.Command{
uuid.New().String(): {},
s.pipelines[audToStr(aud)] = []*protos.Pipeline{
{
Id: uuid.New().String(),
},
}
Expect(len(s.getPipelines(ctx, aud))).To(Equal(1))
})
Expand Down Expand Up @@ -536,16 +543,9 @@ func createStreamdalClientFull(serviceName string, aud *protos.Audience, pipelin
tails: map[string]map[string]*Tail{},
tailsMtx: &sync.RWMutex{},
pipelinesMtx: &sync.RWMutex{},
pipelines: map[string]map[string]*protos.Command{
pipelines: map[string][]*protos.Pipeline{
audToStr(aud): {
pipeline.Id: {
Audience: aud,
Command: &protos.Command_AttachPipeline{
AttachPipeline: &protos.AttachPipelineCommand{
Pipeline: pipeline,
},
},
},
pipeline,
},
},
}
Expand All @@ -564,7 +564,7 @@ func createStreamdalClient() (*Streamdal, *kv.KV, error) {

return &Streamdal{
pipelinesMtx: &sync.RWMutex{},
pipelines: map[string]map[string]*protos.Command{},
pipelines: map[string][]*protos.Pipeline{},
audiencesMtx: &sync.RWMutex{},
audiences: map[string]struct{}{},
kv: kvClient,
Expand Down
Loading

0 comments on commit 65f0bc1

Please sign in to comment.