From 4190d3ec06282ec2a461a93bf01fdda1af39d846 Mon Sep 17 00:00:00 2001 From: Daniel Selans Date: Mon, 29 Jan 2024 17:36:53 -0800 Subject: [PATCH 1/3] ordered pipelines so far updating tests --- go.mod | 2 +- go.sum | 2 + go_sdk.go | 126 ++++++++++++++---------------- go_sdk_test.go | 20 ++--- register.go | 122 ++--------------------------- register_test.go | 181 +++---------------------------------------- server/client.go | 8 +- validate/validate.go | 12 +++ 8 files changed, 102 insertions(+), 371 deletions(-) diff --git a/go.mod b/go.mod index 54fded1..d570ec5 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/onsi/gomega v1.30.0 github.com/pkg/errors v0.9.1 github.com/relistan/go-director v0.0.0-20200406104025-dbbf5d95248d - github.com/streamdal/streamdal/libs/protos v0.1.16 + github.com/streamdal/streamdal/libs/protos v0.1.17-0.20240129193545-8d8c941ad631 github.com/tetratelabs/wazero v1.6.0 golang.org/x/time v0.5.0 google.golang.org/grpc v1.60.1 diff --git a/go.sum b/go.sum index c943966..c1026a8 100644 --- a/go.sum +++ b/go.sum @@ -37,6 +37,8 @@ github.com/streamdal/streamdal/libs/protos v0.1.14 h1:DK1vPmBrX6fLy6uCmNJsonZdqp github.com/streamdal/streamdal/libs/protos v0.1.14/go.mod h1:WZ6qCqzJu/9Vn+P5EUMN1hcOWlxN2EHgcsxehylhgJQ= github.com/streamdal/streamdal/libs/protos v0.1.16 h1:e/xGg9rGBpr+wfNBAsRVPMGs8MlZLtmzfcjnYLdp2K0= github.com/streamdal/streamdal/libs/protos v0.1.16/go.mod h1:WZ6qCqzJu/9Vn+P5EUMN1hcOWlxN2EHgcsxehylhgJQ= +github.com/streamdal/streamdal/libs/protos v0.1.17-0.20240129193545-8d8c941ad631 h1:Oy+1xjLuheTh/DH9zhJ1xNhfkaxZL1F8ZKhscopzytk= +github.com/streamdal/streamdal/libs/protos v0.1.17-0.20240129193545-8d8c941ad631/go.mod h1:WZ6qCqzJu/9Vn+P5EUMN1hcOWlxN2EHgcsxehylhgJQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= diff --git a/go_sdk.go b/go_sdk.go index 70b2084..e51e4de 100644 --- a/go_sdk.go +++ b/go_sdk.go @@ -98,26 +98,24 @@ type IStreamdal interface { // Streamdal is the main struct for this library type Streamdal struct { - config *Config - functions map[string]*function - pipelines map[string]map[string]*protos.Command // k1: audienceStr k2: pipelineID - pipelinesPaused map[string]map[string]*protos.Command // k1: audienceStr k2: pipelineID - functionsMtx *sync.RWMutex - pipelinesMtx *sync.RWMutex - pipelinesPausedMtx *sync.RWMutex - serverClient server.IServerClient - metrics metrics.IMetrics - audiences map[string]struct{} // k: audienceStr - audiencesMtx *sync.RWMutex - sessionID string - kv kv.IKV - hf *hostfunc.HostFunc - tailsMtx *sync.RWMutex - tails map[string]map[string]*Tail // k1: audienceStr k2: tailID - pausedTailsMtx *sync.RWMutex - pausedTails map[string]map[string]*Tail // k1: audienceStr k2: tailID - schemas map[string]*protos.Schema // k: audienceStr - schemasMtx *sync.RWMutex + config *Config + functions map[string]*function + functionsMtx *sync.RWMutex + pipelines map[string][]*protos.Pipeline // k: audienceStr + pipelinesMtx *sync.RWMutex + serverClient server.IServerClient + metrics metrics.IMetrics + audiences map[string]struct{} // k: audienceStr + audiencesMtx *sync.RWMutex + sessionID string + kv kv.IKV + hf *hostfunc.HostFunc + tailsMtx *sync.RWMutex + tails map[string]map[string]*Tail // k1: audienceStr k2: tailID + pausedTailsMtx *sync.RWMutex + pausedTails map[string]map[string]*Tail // k1: audienceStr k2: tailID + schemas map[string]*protos.Schema // k: audienceStr + schemasMtx *sync.RWMutex } type Config struct { @@ -230,26 +228,24 @@ func New(cfg *Config) (*Streamdal, error) { } s := &Streamdal{ - functions: make(map[string]*function), - functionsMtx: &sync.RWMutex{}, - serverClient: serverClient, - pipelines: make(map[string]map[string]*protos.Command), - pipelinesMtx: &sync.RWMutex{}, - pipelinesPaused: make(map[string]map[string]*protos.Command), - pipelinesPausedMtx: &sync.RWMutex{}, - audiences: map[string]struct{}{}, - audiencesMtx: &sync.RWMutex{}, - config: cfg, - metrics: m, - sessionID: uuid.New().String(), - kv: kvInstance, - hf: hf, - tailsMtx: &sync.RWMutex{}, - tails: make(map[string]map[string]*Tail), - pausedTailsMtx: &sync.RWMutex{}, - pausedTails: make(map[string]map[string]*Tail), - schemasMtx: &sync.RWMutex{}, - schemas: make(map[string]*protos.Schema), + functions: make(map[string]*function), + functionsMtx: &sync.RWMutex{}, + serverClient: serverClient, + pipelines: make(map[string][]*protos.Pipeline), + pipelinesMtx: &sync.RWMutex{}, + audiences: map[string]struct{}{}, + audiencesMtx: &sync.RWMutex{}, + config: cfg, + metrics: m, + sessionID: uuid.New().String(), + kv: kvInstance, + hf: hf, + tailsMtx: &sync.RWMutex{}, + tails: make(map[string]map[string]*Tail), + pausedTailsMtx: &sync.RWMutex{}, + pausedTails: make(map[string]map[string]*Tail), + schemasMtx: &sync.RWMutex{}, + schemas: make(map[string]*protos.Schema), } if cfg.DryRun { @@ -392,36 +388,30 @@ func (s *Streamdal) watchForShutdown() { } func (s *Streamdal) pullInitialPipelines(ctx context.Context) error { - cmds, err := s.serverClient.GetAttachCommandsByService(ctx, s.config.ServiceName) + cmds, err := s.serverClient.GetSetPipelinesCommandByService(ctx, s.config.ServiceName) if err != nil { return errors.Wrap(err, "unable to pull initial pipelines") } - for _, cmd := range cmds.Active { - s.config.Logger.Debugf("Attaching pipeline '%s'", cmd.GetAttachPipeline().Pipeline.Name) + // Commands won't include paused pipelines but we can check just in case + for _, cmd := range cmds.SetPipelineCommands { + for _, p := range cmd.GetSetPipelines().Pipelines { + s.config.Logger.Debugf("saving pipeline '%s' for audience '%s' to internal map", p.Name, audToStr(cmd.Audience)) - // Fill in WASM data from the deduplication map - for _, step := range cmd.GetAttachPipeline().Pipeline.Steps { - wasmData, ok := cmds.WasmModules[step.GetXWasmId()] - if !ok { - return errors.Errorf("BUG: unable to find WASM data for step '%s'", step.Name) - } - - step.XWasmBytes = wasmData.Bytes - } + // Fill in WASM data from the deduplication map + for _, step := range p.Steps { + wasmData, ok := cmds.WasmModules[step.GetXWasmId()] + if !ok { + return errors.Errorf("BUG: unable to find WASM data for step '%s'", step.Name) + } - if err := s.attachPipeline(ctx, cmd); err != nil { - s.config.Logger.Errorf("failed to attach pipeline: %s", err) - } - } + step.XWasmBytes = wasmData.Bytes + } - for _, cmd := range cmds.Paused { - s.config.Logger.Debugf("Pipeline '%s' is paused", cmd.GetAttachPipeline().Pipeline.Name) - if _, ok := s.pipelinesPaused[audToStr(cmd.Audience)]; !ok { - s.pipelinesPaused[audToStr(cmd.Audience)] = make(map[string]*protos.Command) + if err := s.setPipelines(ctx, cmd); err != nil { + s.config.Logger.Errorf("failed to attach pipeline: %s", err) + } } - - s.pipelinesPaused[audToStr(cmd.Audience)][cmd.GetAttachPipeline().Pipeline.Id] = cmd } return nil @@ -512,7 +502,7 @@ func (s *Streamdal) runStep(ctx context.Context, aud *protos.Audience, step *pro return resp, nil } -func (s *Streamdal) getPipelines(ctx context.Context, aud *protos.Audience) map[string]*protos.Command { +func (s *Streamdal) getPipelines(ctx context.Context, aud *protos.Audience) []*protos.Pipeline { s.pipelinesMtx.RLock() defer s.pipelinesMtx.RUnlock() @@ -520,7 +510,7 @@ func (s *Streamdal) getPipelines(ctx context.Context, aud *protos.Audience) map[ pipelines, ok := s.pipelines[audToStr(aud)] if !ok { - return make(map[string]*protos.Command) + return make([]*protos.Pipeline, 0) } return pipelines @@ -630,20 +620,18 @@ func (s *Streamdal) Process(ctx context.Context, req *ProcessRequest) *ProcessRe ) PIPELINE: - for _, p := range pipelines { + for _, pipeline := range pipelines { var isr *protos.InterStepResult pIndex += 1 pipelineTimeoutCtx, pipelineTimeoutCxl := context.WithTimeout(ctx, s.config.PipelineTimeout) pipelineStatus := &protos.PipelineStatus{ - Id: p.GetAttachPipeline().GetPipeline().Id, - Name: p.GetAttachPipeline().GetPipeline().Name, + Id: pipeline.Id, + Name: pipeline.Name, StepStatus: make([]*protos.StepStatus, 0), } - pipeline := p.GetAttachPipeline().GetPipeline() - _ = s.metrics.Incr(ctx, &types.CounterEntry{Name: counterProcessed, Labels: s.getCounterLabels(req, pipeline), Value: 1, Audience: aud}) _ = s.metrics.Incr(ctx, &types.CounterEntry{Name: counterBytes, Labels: s.getCounterLabels(req, pipeline), Value: payloadSize, Audience: aud}) diff --git a/go_sdk_test.go b/go_sdk_test.go index 31402b9..752caab 100644 --- a/go_sdk_test.go +++ b/go_sdk_test.go @@ -34,8 +34,8 @@ type InternalServer struct { protos.UnimplementedInternalServer } -func (i *InternalServer) GetAttachCommandsByService(ctx context.Context, req *protos.GetAttachCommandsByServiceRequest) (*protos.GetAttachCommandsByServiceResponse, error) { - return &protos.GetAttachCommandsByServiceResponse{}, nil +func (i *InternalServer) GetSetPipelinesCommandByService(ctx context.Context, service string) (*protos.GetSetPipelinesCommandsByServiceResponse, error) { + return &protos.GetSetPipelinesCommandsByServiceResponse{}, nil } func (i *InternalServer) SendTail(srv protos.Internal_SendTailServer) error { @@ -153,11 +153,12 @@ var _ = Describe("Streamdal", func() { Context("getPipelines", func() { ctx := context.Background() + // TODO: Re-generate fake fakeClient := &serverfakes.FakeIServerClient{} s := &Streamdal{ pipelinesMtx: &sync.RWMutex{}, - pipelines: map[string]map[string]*protos.Command{}, + pipelines: map[string][]*protos.Pipeline{}, serverClient: fakeClient, audiencesMtx: &sync.RWMutex{}, audiences: map[string]struct{}{}, @@ -536,16 +537,9 @@ func createStreamdalClientFull(serviceName string, aud *protos.Audience, pipelin tails: map[string]map[string]*Tail{}, tailsMtx: &sync.RWMutex{}, pipelinesMtx: &sync.RWMutex{}, - pipelines: map[string]map[string]*protos.Command{ + pipelines: map[string][]*protos.Pipeline{ audToStr(aud): { - pipeline.Id: { - Audience: aud, - Command: &protos.Command_AttachPipeline{ - AttachPipeline: &protos.AttachPipelineCommand{ - Pipeline: pipeline, - }, - }, - }, + pipeline, }, }, } @@ -564,7 +558,7 @@ func createStreamdalClient() (*Streamdal, *kv.KV, error) { return &Streamdal{ pipelinesMtx: &sync.RWMutex{}, - pipelines: map[string]map[string]*protos.Command{}, + pipelines: map[string][]*protos.Pipeline{}, audiencesMtx: &sync.RWMutex{}, audiences: map[string]struct{}{}, kv: kvClient, diff --git a/register.go b/register.go index b6dadd3..530779b 100644 --- a/register.go +++ b/register.go @@ -189,21 +189,12 @@ func (s *Streamdal) handleCommand(ctx context.Context, cmd *protos.Command) erro var err error switch cmd.Command.(type) { + case *protos.Command_SetPipelines: + s.config.Logger.Debug("Received set pipelines command") + err = s.setPipelines(ctx, cmd) case *protos.Command_Kv: s.config.Logger.Debug("Received kv command") err = s.handleKVCommand(ctx, cmd.GetKv()) - case *protos.Command_AttachPipeline: - s.config.Logger.Debug("Received attach pipeline command") - err = s.attachPipeline(ctx, cmd) - case *protos.Command_DetachPipeline: - s.config.Logger.Debug("Received detach pipeline command") - err = s.detachPipeline(ctx, cmd) - case *protos.Command_PausePipeline: - s.config.Logger.Debug("Received pause pipeline command") - err = s.pausePipeline(ctx, cmd) - case *protos.Command_ResumePipeline: - s.config.Logger.Debug("Received resume pipeline command") - err = s.resumePipeline(ctx, cmd) case *protos.Command_Tail: s.config.Logger.Debug("Received tail command") err = s.handleTailCommand(ctx, cmd) @@ -281,118 +272,21 @@ func (s *Streamdal) handleKVCommand(_ context.Context, kv *protos.KVCommand) err return nil } -func (s *Streamdal) attachPipeline(_ context.Context, cmd *protos.Command) error { +func (s *Streamdal) setPipelines(_ context.Context, cmd *protos.Command) error { if cmd == nil { return ErrEmptyCommand } - s.pipelinesMtx.Lock() - defer s.pipelinesMtx.Unlock() - - if _, ok := s.pipelines[audToStr(cmd.Audience)]; !ok { - s.pipelines[audToStr(cmd.Audience)] = make(map[string]*protos.Command) - } - - s.pipelines[audToStr(cmd.Audience)][cmd.GetAttachPipeline().Pipeline.Id] = cmd - - s.config.Logger.Debugf("Attached pipeline %s", cmd.GetAttachPipeline().Pipeline.Id) - - return nil -} - -func (s *Streamdal) detachPipeline(_ context.Context, cmd *protos.Command) error { - if cmd == nil { - return ErrEmptyCommand - } - - s.pipelinesMtx.Lock() - defer s.pipelinesMtx.Unlock() - - audStr := audToStr(cmd.Audience) - - if _, ok := s.pipelines[audStr]; !ok { - return nil - } - - delete(s.pipelines[audStr], cmd.GetDetachPipeline().PipelineId) - - if len(s.pipelines[audStr]) == 0 { - delete(s.pipelines, audStr) - } - - s.config.Logger.Debugf("Detached pipeline %s", cmd.GetDetachPipeline().PipelineId) - - return nil -} - -func (s *Streamdal) pausePipeline(_ context.Context, cmd *protos.Command) error { - if cmd == nil { - return ErrEmptyCommand + if err := validate.SetPipelinesCommand(cmd); err != nil { + return errors.Wrap(err, "failed to validate set pipelines command") } s.pipelinesMtx.Lock() defer s.pipelinesMtx.Unlock() - s.pipelinesPausedMtx.Lock() - defer s.pipelinesPausedMtx.Unlock() - audStr := audToStr(cmd.Audience) + s.pipelines[audToStr(cmd.Audience)] = cmd.GetSetPipelines().Pipelines - if _, ok := s.pipelines[audStr]; !ok { - return ErrPipelineNotActive - } - - pipeline, ok := s.pipelines[audStr][cmd.GetPausePipeline().PipelineId] - if !ok { - return ErrPipelineNotActive - } - - if _, ok := s.pipelinesPaused[audStr]; !ok { - s.pipelinesPaused[audStr] = make(map[string]*protos.Command) - } - - s.pipelinesPaused[audStr][cmd.GetPausePipeline().PipelineId] = pipeline - - delete(s.pipelines[audStr], cmd.GetPausePipeline().PipelineId) - - if len(s.pipelines[audStr]) == 0 { - delete(s.pipelines, audStr) - } - - return nil -} - -func (s *Streamdal) resumePipeline(_ context.Context, cmd *protos.Command) error { - if cmd == nil { - return ErrEmptyCommand - } - - s.pipelinesMtx.Lock() - defer s.pipelinesMtx.Unlock() - s.pipelinesPausedMtx.Lock() - defer s.pipelinesPausedMtx.Unlock() - - audStr := audToStr(cmd.Audience) - - if _, ok := s.pipelinesPaused[audStr]; !ok { - return ErrPipelineNotPaused - } - - pipeline, ok := s.pipelinesPaused[audStr][cmd.GetResumePipeline().PipelineId] - if !ok { - return ErrPipelineNotPaused - } - - if _, ok := s.pipelines[audStr]; !ok { - s.pipelines[audStr] = make(map[string]*protos.Command) - } - - s.pipelines[audStr][cmd.GetResumePipeline().PipelineId] = pipeline - - delete(s.pipelinesPaused[audStr], cmd.GetResumePipeline().PipelineId) - - if len(s.pipelinesPaused[audStr]) == 0 { - delete(s.pipelinesPaused, audStr) - } + s.config.Logger.Debugf("saved '%d' pipelines for audience '%s'", len(cmd.GetSetPipelines().Pipelines), audToStr(cmd.Audience)) return nil } diff --git a/register_test.go b/register_test.go index a1fa0a5..7e32259 100644 --- a/register_test.go +++ b/register_test.go @@ -87,17 +87,17 @@ var _ = Describe("Register", func() { }) }) - Context("attachPipeline", func() { - It("should attach a pipeline", func() { + Context("setPipelines", func() { + It("should save pipelines", func() { s := &Streamdal{ pipelinesMtx: &sync.RWMutex{}, - pipelines: make(map[string]map[string]*protos.Command), + pipelines: make(map[string][]*protos.Pipeline), config: &Config{ Logger: &loggerfakes.FakeLogger{}, }, } - err := s.attachPipeline(context.Background(), nil) + err := s.setPipelines(context.Background(), nil) Expect(err).To(Equal(ErrEmptyCommand)) aud := &protos.Audience{ @@ -109,16 +109,18 @@ var _ = Describe("Register", func() { cmd := &protos.Command{ Audience: aud, - Command: &protos.Command_AttachPipeline{ - AttachPipeline: &protos.AttachPipelineCommand{ - Pipeline: &protos.Pipeline{ - Id: uuid.New().String(), + Command: &protos.Command_SetPipelines{ + SetPipelines: &protos.SetPipelinesCommand{ + Pipelines: []*protos.Pipeline{ + { + Id: uuid.New().String(), + }, }, }, }, } - err = s.attachPipeline(context.Background(), cmd) + err = s.setPipelines(context.Background(), cmd) Expect(err).To(BeNil()) Expect(len(s.pipelines)).To(Equal(1)) @@ -126,167 +128,6 @@ var _ = Describe("Register", func() { }) }) - Context("detachPipeline", func() { - It("should detach a pipeline", func() { - pipelineID := uuid.New().String() - - s := &Streamdal{ - pipelinesMtx: &sync.RWMutex{}, - pipelines: make(map[string]map[string]*protos.Command), - config: &Config{ - Logger: &loggerfakes.FakeLogger{}, - }, - } - - err := s.detachPipeline(context.Background(), nil) - Expect(err).To(Equal(ErrEmptyCommand)) - - aud := &protos.Audience{ - OperationName: "test-operation", - ServiceName: "test-service", - OperationType: protos.OperationType_OPERATION_TYPE_PRODUCER, - ComponentName: "test-component", - } - - cmd := &protos.Command{ - Audience: aud, - Command: &protos.Command_DetachPipeline{ - DetachPipeline: &protos.DetachPipelineCommand{ - PipelineId: pipelineID, - }, - }, - } - - s.pipelines[audToStr(aud)] = make(map[string]*protos.Command) - s.pipelines[audToStr(aud)][pipelineID] = cmd - - err = s.detachPipeline(context.Background(), cmd) - - Expect(err).To(BeNil()) - Expect(len(s.pipelines)).To(Equal(0)) - }) - }) - - Context("pausePipeline", func() { - It("should pause a pipeline", func() { - pipelineID := uuid.New().String() - - s := &Streamdal{ - pipelinesPausedMtx: &sync.RWMutex{}, - pipelinesPaused: make(map[string]map[string]*protos.Command), - pipelinesMtx: &sync.RWMutex{}, - pipelines: make(map[string]map[string]*protos.Command), - config: &Config{ - Logger: &loggerfakes.FakeLogger{}, - }, - } - - err := s.pausePipeline(context.Background(), nil) - Expect(err).To(Equal(ErrEmptyCommand)) - - aud := &protos.Audience{ - OperationName: "test-operation", - ServiceName: "test-service", - OperationType: protos.OperationType_OPERATION_TYPE_PRODUCER, - ComponentName: "test-component", - } - - attachCmd := &protos.Command{ - Audience: aud, - Command: &protos.Command_AttachPipeline{ - AttachPipeline: &protos.AttachPipelineCommand{ - Pipeline: &protos.Pipeline{ - Id: pipelineID, - }, - }, - }, - } - - if err := s.pausePipeline(context.Background(), attachCmd); err != ErrPipelineNotActive { - - } - - s.pipelines[audToStr(aud)] = make(map[string]*protos.Command) - s.pipelines[audToStr(aud)][pipelineID] = attachCmd - - pauseCmd := &protos.Command{ - Audience: aud, - Command: &protos.Command_PausePipeline{ - PausePipeline: &protos.PausePipelineCommand{ - PipelineId: pipelineID, - }, - }, - } - - err = s.pausePipeline(context.Background(), pauseCmd) - Expect(err).To(BeNil()) - - Expect(len(s.pipelinesPaused)).To(Equal(1)) - Expect(len(s.pipelinesPaused[audToStr(aud)])).To(Equal(1)) - // It should no longer be in active pipeline map - Expect(len(s.pipelines)).To(Equal(0)) - }) - }) - - Context("resumePipeline", func() { - It("should resume a pipeline", func() { - pipelineID := uuid.New().String() - - s := &Streamdal{ - pipelinesPausedMtx: &sync.RWMutex{}, - pipelinesPaused: make(map[string]map[string]*protos.Command), - pipelinesMtx: &sync.RWMutex{}, - pipelines: make(map[string]map[string]*protos.Command), - config: &Config{ - Logger: &loggerfakes.FakeLogger{}, - }, - } - - err := s.resumePipeline(context.Background(), nil) - Expect(err).To(Equal(ErrEmptyCommand)) - - aud := &protos.Audience{ - OperationName: "test-operation", - ServiceName: "test-service", - OperationType: protos.OperationType_OPERATION_TYPE_PRODUCER, - ComponentName: "test-component", - } - - resumeCmd := &protos.Command{ - Audience: aud, - Command: &protos.Command_ResumePipeline{ - ResumePipeline: &protos.ResumePipelineCommand{ - PipelineId: pipelineID, - }, - }, - } - - err = s.resumePipeline(context.Background(), resumeCmd) - Expect(err).To(Equal(ErrPipelineNotPaused)) - - attachCmd := &protos.Command{ - Audience: aud, - Command: &protos.Command_AttachPipeline{ - AttachPipeline: &protos.AttachPipelineCommand{ - Pipeline: &protos.Pipeline{ - Id: pipelineID, - }, - }, - }, - } - - s.pipelinesPaused[audToStr(aud)] = make(map[string]*protos.Command) - s.pipelinesPaused[audToStr(aud)][pipelineID] = attachCmd - - err = s.resumePipeline(context.Background(), resumeCmd) - Expect(err).To(BeNil()) - Expect(len(s.pipelines)).To(Equal(1)) - Expect(len(s.pipelines[audToStr(aud)])).To(Equal(1)) - // It should no longer be in paused pipeline map - Expect(len(s.pipelinesPaused)).To(Equal(0)) - }) - }) - Context("handleCommand", func() { var s *Streamdal var fakeLogger *loggerfakes.FakeLogger diff --git a/server/client.go b/server/client.go index 5e850ae..250516c 100644 --- a/server/client.go +++ b/server/client.go @@ -20,9 +20,9 @@ import ( //go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . IServerClient type IServerClient interface { - // GetAttachCommandsByService is called in New() in order to get all AttachCommands in a synchronous manner + // GetSetPipelinesCommandByService is called in New() in order to get all AttachCommands in a synchronous manner // before we allow the client to start processing. - GetAttachCommandsByService(ctx context.Context, service string) (*protos.GetAttachCommandsByServiceResponse, error) + GetSetPipelinesCommandByService(ctx context.Context, service string) (*protos.GetSetPipelinesCommandsByServiceResponse, error) // GetTailStream returns a gRPC client stream used to send TailResponses to the streamdal server GetTailStream(ctx context.Context) (protos.Internal_SendTailClient, error) @@ -196,10 +196,10 @@ func (c *Client) HeartBeat(ctx context.Context, req *protos.HeartbeatRequest) er return err } -func (c *Client) GetAttachCommandsByService(ctx context.Context, service string) (*protos.GetAttachCommandsByServiceResponse, error) { +func (c *Client) GetSetPipelinesCommandByService(ctx context.Context, service string) (*protos.GetSetPipelinesCommandsByServiceResponse, error) { ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("auth-token", c.Token)) - resp, err := c.Server.GetAttachCommandsByService(ctx, &protos.GetAttachCommandsByServiceRequest{ServiceName: service}) + resp, err := c.Server.GetSetPipelinesCommandsByService(ctx, &protos.GetSetPipelinesCommandsByServiceRequest{ServiceName: service}) if err != nil { return nil, errors.Wrap(err, "unable to get attach commands by service") } diff --git a/validate/validate.go b/validate/validate.go index 9552fd8..e762604 100644 --- a/validate/validate.go +++ b/validate/validate.go @@ -47,6 +47,18 @@ func Audience(aud *protos.Audience) error { return nil } +func SetPipelinesCommand(cmd *protos.Command) error { + if cmd == nil { + return ErrNilInput + } + + if cmd.GetSetPipelines() == nil { + return errors.New("not a SetPipelines command") + } + + return nil +} + func KVInstruction(i *protos.KVInstruction) error { if i == nil { return errors.New("KVInstruction cannot be nil") From 3b85f5f0be47f22fda8cc87c43970329fa29537a Mon Sep 17 00:00:00 2001 From: Daniel Selans Date: Mon, 29 Jan 2024 19:21:46 -0800 Subject: [PATCH 2/3] updated to use SetPipelines, removed pause, resume, attach changed pipelines to `map[string][]*protos.Pipeline` (was `map[string]map[string]*protos.Command`. --- benchmarks_test.go | 4 +- go.mod | 4 +- go.sum | 6 ++ go_sdk_test.go | 12 +++- server/client.go | 2 +- server/serverfakes/fake_iserver_client.go | 88 +++++++++++------------ validate/validate_test.go | 4 +- wasm_test.go | 12 ++-- 8 files changed, 73 insertions(+), 59 deletions(-) diff --git a/benchmarks_test.go b/benchmarks_test.go index 2c610be..85ea585 100644 --- a/benchmarks_test.go +++ b/benchmarks_test.go @@ -133,7 +133,7 @@ func benchmarkWASM(wasmFile, testPayloadFile string, step *protos.PipelineStep, s := &Streamdal{ pipelinesMtx: &sync.RWMutex{}, - pipelines: map[string]map[string]*protos.Command{}, + pipelines: map[string][]*protos.Pipeline{}, audiencesMtx: &sync.RWMutex{}, audiences: map[string]struct{}{}, } @@ -204,7 +204,7 @@ func inferSchema(fileName string) (*protos.WASMResponse, error) { s := &Streamdal{ pipelinesMtx: &sync.RWMutex{}, - pipelines: map[string]map[string]*protos.Command{}, + pipelines: map[string][]*protos.Pipeline{}, audiencesMtx: &sync.RWMutex{}, audiences: map[string]struct{}{}, } diff --git a/go.mod b/go.mod index d570ec5..d04a1f0 100644 --- a/go.mod +++ b/go.mod @@ -21,10 +21,12 @@ require ( github.com/golang/protobuf v1.5.3 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/pprof v0.0.0-20231212022811-ec68065c825e // indirect + github.com/maxbrunsfeld/counterfeiter/v6 v6.8.1 // indirect + golang.org/x/mod v0.14.0 // indirect golang.org/x/net v0.20.0 // indirect golang.org/x/sys v0.16.0 // indirect golang.org/x/text v0.14.0 // indirect - golang.org/x/tools v0.16.1 // indirect + golang.org/x/tools v0.17.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index c1026a8..e2fd414 100644 --- a/go.sum +++ b/go.sum @@ -19,6 +19,8 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGa github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/maxbrunsfeld/counterfeiter/v6 v6.8.1 h1:NicmruxkeqHjDv03SfSxqmaLuisddudfP3h5wdXFbhM= +github.com/maxbrunsfeld/counterfeiter/v6 v6.8.1/go.mod h1:eyp4DdUJAKkr9tvxR3jWhw2mDK7CWABMG5r9uyaKC7I= github.com/onsi/ginkgo/v2 v2.13.2 h1:Bi2gGVkfn6gQcjNjZJVO8Gf0FHzMPf2phUei9tejVMs= github.com/onsi/ginkgo/v2 v2.13.2/go.mod h1:XStQ8QcGwLyF4HdfcZB8SFOS/MWCgDuXMSBe6zrvLgM= github.com/onsi/gomega v1.30.0 h1:hvMK7xYz4D3HapigLTeGdId/NcfQx1VHMJc60ew99+8= @@ -45,6 +47,8 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/tetratelabs/wazero v1.6.0 h1:z0H1iikCdP8t+q341xqepY4EWvHEw8Es7tlqiVzlP3g= github.com/tetratelabs/wazero v1.6.0/go.mod h1:0U0G41+ochRKoPKCJlh0jMg1CHkyfK8kDqiirMmKY8A= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0= +golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= @@ -59,6 +63,8 @@ golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.16.1 h1:TLyB3WofjdOEepBHAU20JdNC1Zbg87elYofWYAY5oZA= golang.org/x/tools v0.16.1/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0= +golang.org/x/tools v0.17.0 h1:FvmRgNOcs3kOa+T20R1uhfP9F6HgG2mfxDv1vrx1Htc= +golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac h1:nUQEQmH/csSvFECKYRv6HWEyypysidKl2I6Qpsglq/0= google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac/go.mod h1:daQN87bsDqDoe316QbbvX60nMoJQa4r6Ds0ZuoAe5yA= diff --git a/go_sdk_test.go b/go_sdk_test.go index 752caab..ab5471c 100644 --- a/go_sdk_test.go +++ b/go_sdk_test.go @@ -34,7 +34,11 @@ type InternalServer struct { protos.UnimplementedInternalServer } -func (i *InternalServer) GetSetPipelinesCommandByService(ctx context.Context, service string) (*protos.GetSetPipelinesCommandsByServiceResponse, error) { +// Needed for implementing fake grpc server for testing +func (i *InternalServer) GetSetPipelinesCommandsByService( + _ context.Context, + _ *protos.GetSetPipelinesCommandsByServiceRequest, +) (*protos.GetSetPipelinesCommandsByServiceResponse, error) { return &protos.GetSetPipelinesCommandsByServiceResponse{}, nil } @@ -183,8 +187,10 @@ var _ = Describe("Streamdal", func() { }) It("returns a single pipeline", func() { - s.pipelines[audToStr(aud)] = map[string]*protos.Command{ - uuid.New().String(): {}, + s.pipelines[audToStr(aud)] = []*protos.Pipeline{ + { + Id: uuid.New().String(), + }, } Expect(len(s.getPipelines(ctx, aud))).To(Equal(1)) }) diff --git a/server/client.go b/server/client.go index 250516c..2099510 100644 --- a/server/client.go +++ b/server/client.go @@ -201,7 +201,7 @@ func (c *Client) GetSetPipelinesCommandByService(ctx context.Context, service st resp, err := c.Server.GetSetPipelinesCommandsByService(ctx, &protos.GetSetPipelinesCommandsByServiceRequest{ServiceName: service}) if err != nil { - return nil, errors.Wrap(err, "unable to get attach commands by service") + return nil, errors.Wrap(err, "unable to get set pipeline commands by service") } return resp, nil diff --git a/server/serverfakes/fake_iserver_client.go b/server/serverfakes/fake_iserver_client.go index a22cb75..69c8dfa 100644 --- a/server/serverfakes/fake_iserver_client.go +++ b/server/serverfakes/fake_iserver_client.go @@ -11,18 +11,18 @@ import ( ) type FakeIServerClient struct { - GetAttachCommandsByServiceStub func(context.Context, string) (*protos.GetAttachCommandsByServiceResponse, error) - getAttachCommandsByServiceMutex sync.RWMutex - getAttachCommandsByServiceArgsForCall []struct { + GetSetPipelinesCommandByServiceStub func(context.Context, string) (*protos.GetSetPipelinesCommandsByServiceResponse, error) + getSetPipelinesCommandByServiceMutex sync.RWMutex + getSetPipelinesCommandByServiceArgsForCall []struct { arg1 context.Context arg2 string } - getAttachCommandsByServiceReturns struct { - result1 *protos.GetAttachCommandsByServiceResponse + getSetPipelinesCommandByServiceReturns struct { + result1 *protos.GetSetPipelinesCommandsByServiceResponse result2 error } - getAttachCommandsByServiceReturnsOnCall map[int]struct { - result1 *protos.GetAttachCommandsByServiceResponse + getSetPipelinesCommandByServiceReturnsOnCall map[int]struct { + result1 *protos.GetSetPipelinesCommandsByServiceResponse result2 error } GetTailStreamStub func(context.Context) (protos.Internal_SendTailClient, error) @@ -130,17 +130,17 @@ type FakeIServerClient struct { invocationsMutex sync.RWMutex } -func (fake *FakeIServerClient) GetAttachCommandsByService(arg1 context.Context, arg2 string) (*protos.GetAttachCommandsByServiceResponse, error) { - fake.getAttachCommandsByServiceMutex.Lock() - ret, specificReturn := fake.getAttachCommandsByServiceReturnsOnCall[len(fake.getAttachCommandsByServiceArgsForCall)] - fake.getAttachCommandsByServiceArgsForCall = append(fake.getAttachCommandsByServiceArgsForCall, struct { +func (fake *FakeIServerClient) GetSetPipelinesCommandByService(arg1 context.Context, arg2 string) (*protos.GetSetPipelinesCommandsByServiceResponse, error) { + fake.getSetPipelinesCommandByServiceMutex.Lock() + ret, specificReturn := fake.getSetPipelinesCommandByServiceReturnsOnCall[len(fake.getSetPipelinesCommandByServiceArgsForCall)] + fake.getSetPipelinesCommandByServiceArgsForCall = append(fake.getSetPipelinesCommandByServiceArgsForCall, struct { arg1 context.Context arg2 string }{arg1, arg2}) - stub := fake.GetAttachCommandsByServiceStub - fakeReturns := fake.getAttachCommandsByServiceReturns - fake.recordInvocation("GetAttachCommandsByService", []interface{}{arg1, arg2}) - fake.getAttachCommandsByServiceMutex.Unlock() + stub := fake.GetSetPipelinesCommandByServiceStub + fakeReturns := fake.getSetPipelinesCommandByServiceReturns + fake.recordInvocation("GetSetPipelinesCommandByService", []interface{}{arg1, arg2}) + fake.getSetPipelinesCommandByServiceMutex.Unlock() if stub != nil { return stub(arg1, arg2) } @@ -150,47 +150,47 @@ func (fake *FakeIServerClient) GetAttachCommandsByService(arg1 context.Context, return fakeReturns.result1, fakeReturns.result2 } -func (fake *FakeIServerClient) GetAttachCommandsByServiceCallCount() int { - fake.getAttachCommandsByServiceMutex.RLock() - defer fake.getAttachCommandsByServiceMutex.RUnlock() - return len(fake.getAttachCommandsByServiceArgsForCall) +func (fake *FakeIServerClient) GetSetPipelinesCommandByServiceCallCount() int { + fake.getSetPipelinesCommandByServiceMutex.RLock() + defer fake.getSetPipelinesCommandByServiceMutex.RUnlock() + return len(fake.getSetPipelinesCommandByServiceArgsForCall) } -func (fake *FakeIServerClient) GetAttachCommandsByServiceCalls(stub func(context.Context, string) (*protos.GetAttachCommandsByServiceResponse, error)) { - fake.getAttachCommandsByServiceMutex.Lock() - defer fake.getAttachCommandsByServiceMutex.Unlock() - fake.GetAttachCommandsByServiceStub = stub +func (fake *FakeIServerClient) GetSetPipelinesCommandByServiceCalls(stub func(context.Context, string) (*protos.GetSetPipelinesCommandsByServiceResponse, error)) { + fake.getSetPipelinesCommandByServiceMutex.Lock() + defer fake.getSetPipelinesCommandByServiceMutex.Unlock() + fake.GetSetPipelinesCommandByServiceStub = stub } -func (fake *FakeIServerClient) GetAttachCommandsByServiceArgsForCall(i int) (context.Context, string) { - fake.getAttachCommandsByServiceMutex.RLock() - defer fake.getAttachCommandsByServiceMutex.RUnlock() - argsForCall := fake.getAttachCommandsByServiceArgsForCall[i] +func (fake *FakeIServerClient) GetSetPipelinesCommandByServiceArgsForCall(i int) (context.Context, string) { + fake.getSetPipelinesCommandByServiceMutex.RLock() + defer fake.getSetPipelinesCommandByServiceMutex.RUnlock() + argsForCall := fake.getSetPipelinesCommandByServiceArgsForCall[i] return argsForCall.arg1, argsForCall.arg2 } -func (fake *FakeIServerClient) GetAttachCommandsByServiceReturns(result1 *protos.GetAttachCommandsByServiceResponse, result2 error) { - fake.getAttachCommandsByServiceMutex.Lock() - defer fake.getAttachCommandsByServiceMutex.Unlock() - fake.GetAttachCommandsByServiceStub = nil - fake.getAttachCommandsByServiceReturns = struct { - result1 *protos.GetAttachCommandsByServiceResponse +func (fake *FakeIServerClient) GetSetPipelinesCommandByServiceReturns(result1 *protos.GetSetPipelinesCommandsByServiceResponse, result2 error) { + fake.getSetPipelinesCommandByServiceMutex.Lock() + defer fake.getSetPipelinesCommandByServiceMutex.Unlock() + fake.GetSetPipelinesCommandByServiceStub = nil + fake.getSetPipelinesCommandByServiceReturns = struct { + result1 *protos.GetSetPipelinesCommandsByServiceResponse result2 error }{result1, result2} } -func (fake *FakeIServerClient) GetAttachCommandsByServiceReturnsOnCall(i int, result1 *protos.GetAttachCommandsByServiceResponse, result2 error) { - fake.getAttachCommandsByServiceMutex.Lock() - defer fake.getAttachCommandsByServiceMutex.Unlock() - fake.GetAttachCommandsByServiceStub = nil - if fake.getAttachCommandsByServiceReturnsOnCall == nil { - fake.getAttachCommandsByServiceReturnsOnCall = make(map[int]struct { - result1 *protos.GetAttachCommandsByServiceResponse +func (fake *FakeIServerClient) GetSetPipelinesCommandByServiceReturnsOnCall(i int, result1 *protos.GetSetPipelinesCommandsByServiceResponse, result2 error) { + fake.getSetPipelinesCommandByServiceMutex.Lock() + defer fake.getSetPipelinesCommandByServiceMutex.Unlock() + fake.GetSetPipelinesCommandByServiceStub = nil + if fake.getSetPipelinesCommandByServiceReturnsOnCall == nil { + fake.getSetPipelinesCommandByServiceReturnsOnCall = make(map[int]struct { + result1 *protos.GetSetPipelinesCommandsByServiceResponse result2 error }) } - fake.getAttachCommandsByServiceReturnsOnCall[i] = struct { - result1 *protos.GetAttachCommandsByServiceResponse + fake.getSetPipelinesCommandByServiceReturnsOnCall[i] = struct { + result1 *protos.GetSetPipelinesCommandsByServiceResponse result2 error }{result1, result2} } @@ -704,8 +704,8 @@ func (fake *FakeIServerClient) SendSchemaReturnsOnCall(i int, result1 error) { func (fake *FakeIServerClient) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() - fake.getAttachCommandsByServiceMutex.RLock() - defer fake.getAttachCommandsByServiceMutex.RUnlock() + fake.getSetPipelinesCommandByServiceMutex.RLock() + defer fake.getSetPipelinesCommandByServiceMutex.RUnlock() fake.getTailStreamMutex.RLock() defer fake.getTailStreamMutex.RUnlock() fake.heartBeatMutex.RLock() diff --git a/validate/validate_test.go b/validate/validate_test.go index fba0057..2bef6be 100644 --- a/validate/validate_test.go +++ b/validate/validate_test.go @@ -104,7 +104,7 @@ var _ = Describe("Validate", func() { It("cannot have unset tail request", func() { err := TailRequestStartCommand(&protos.Command{ - Command: &protos.Command_AttachPipeline{}, + Command: &protos.Command_SetPipelines{}, }) Expect(err.Error()).To(Equal(ErrNilField("Tail").Error())) }) @@ -159,7 +159,7 @@ var _ = Describe("Validate", func() { It("cannot have unset tail request", func() { err := TailRequestStopCommand(&protos.Command{ - Command: &protos.Command_AttachPipeline{}, + Command: &protos.Command_SetPipelines{}, }) Expect(err.Error()).To(Equal(ErrNilField("Tail").Error())) }) diff --git a/wasm_test.go b/wasm_test.go index 3bb93f8..d7bb1d2 100644 --- a/wasm_test.go +++ b/wasm_test.go @@ -41,7 +41,7 @@ var _ = Describe("WASM Modules", func() { s = &Streamdal{ pipelinesMtx: &sync.RWMutex{}, - pipelines: map[string]map[string]*protos.Command{}, + pipelines: map[string][]*protos.Pipeline{}, audiencesMtx: &sync.RWMutex{}, audiences: map[string]struct{}{}, } @@ -110,7 +110,7 @@ var _ = Describe("WASM Modules", func() { s := &Streamdal{ pipelinesMtx: &sync.RWMutex{}, - pipelines: map[string]map[string]*protos.Command{}, + pipelines: map[string][]*protos.Pipeline{}, audiencesMtx: &sync.RWMutex{}, audiences: map[string]struct{}{}, } @@ -172,7 +172,7 @@ var _ = Describe("WASM Modules", func() { s = &Streamdal{ pipelinesMtx: &sync.RWMutex{}, - pipelines: map[string]map[string]*protos.Command{}, + pipelines: map[string][]*protos.Pipeline{}, audiencesMtx: &sync.RWMutex{}, audiences: map[string]struct{}{}, } @@ -263,7 +263,7 @@ var _ = Describe("WASM Modules", func() { BeforeEach(func() { s = &Streamdal{ pipelinesMtx: &sync.RWMutex{}, - pipelines: map[string]map[string]*protos.Command{}, + pipelines: map[string][]*protos.Pipeline{}, audiencesMtx: &sync.RWMutex{}, audiences: map[string]struct{}{}, } @@ -497,7 +497,7 @@ var _ = Describe("WASM Modules", func() { s = &Streamdal{ pipelinesMtx: &sync.RWMutex{}, - pipelines: map[string]map[string]*protos.Command{}, + pipelines: map[string][]*protos.Pipeline{}, audiencesMtx: &sync.RWMutex{}, audiences: map[string]struct{}{}, } @@ -581,7 +581,7 @@ var _ = Describe("WASM Modules", func() { s = &Streamdal{ pipelinesMtx: &sync.RWMutex{}, - pipelines: map[string]map[string]*protos.Command{}, + pipelines: map[string][]*protos.Pipeline{}, audiencesMtx: &sync.RWMutex{}, audiences: map[string]struct{}{}, } From 131758e71abdb400f5443fd38bbaa09ea914c483 Mon Sep 17 00:00:00 2001 From: Daniel Selans Date: Fri, 2 Feb 2024 19:05:12 -0800 Subject: [PATCH 3/3] updated to use protos v0.1.17 --- go.mod | 2 +- go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index d04a1f0..d554a2c 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/onsi/gomega v1.30.0 github.com/pkg/errors v0.9.1 github.com/relistan/go-director v0.0.0-20200406104025-dbbf5d95248d - github.com/streamdal/streamdal/libs/protos v0.1.17-0.20240129193545-8d8c941ad631 + github.com/streamdal/streamdal/libs/protos v0.1.17 github.com/tetratelabs/wazero v1.6.0 golang.org/x/time v0.5.0 google.golang.org/grpc v1.60.1 diff --git a/go.sum b/go.sum index e2fd414..029c402 100644 --- a/go.sum +++ b/go.sum @@ -41,6 +41,8 @@ github.com/streamdal/streamdal/libs/protos v0.1.16 h1:e/xGg9rGBpr+wfNBAsRVPMGs8M github.com/streamdal/streamdal/libs/protos v0.1.16/go.mod h1:WZ6qCqzJu/9Vn+P5EUMN1hcOWlxN2EHgcsxehylhgJQ= github.com/streamdal/streamdal/libs/protos v0.1.17-0.20240129193545-8d8c941ad631 h1:Oy+1xjLuheTh/DH9zhJ1xNhfkaxZL1F8ZKhscopzytk= github.com/streamdal/streamdal/libs/protos v0.1.17-0.20240129193545-8d8c941ad631/go.mod h1:WZ6qCqzJu/9Vn+P5EUMN1hcOWlxN2EHgcsxehylhgJQ= +github.com/streamdal/streamdal/libs/protos v0.1.17 h1:VbG0AQ2qIGVrXwz3/8W6630P2RFtQApZ+fDdyfVuSu8= +github.com/streamdal/streamdal/libs/protos v0.1.17/go.mod h1:WZ6qCqzJu/9Vn+P5EUMN1hcOWlxN2EHgcsxehylhgJQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=