diff --git a/benchmarks_test.go b/benchmarks_test.go index 2c610be..85ea585 100644 --- a/benchmarks_test.go +++ b/benchmarks_test.go @@ -133,7 +133,7 @@ func benchmarkWASM(wasmFile, testPayloadFile string, step *protos.PipelineStep, s := &Streamdal{ pipelinesMtx: &sync.RWMutex{}, - pipelines: map[string]map[string]*protos.Command{}, + pipelines: map[string][]*protos.Pipeline{}, audiencesMtx: &sync.RWMutex{}, audiences: map[string]struct{}{}, } @@ -204,7 +204,7 @@ func inferSchema(fileName string) (*protos.WASMResponse, error) { s := &Streamdal{ pipelinesMtx: &sync.RWMutex{}, - pipelines: map[string]map[string]*protos.Command{}, + pipelines: map[string][]*protos.Pipeline{}, audiencesMtx: &sync.RWMutex{}, audiences: map[string]struct{}{}, } diff --git a/go.mod b/go.mod index d570ec5..d04a1f0 100644 --- a/go.mod +++ b/go.mod @@ -21,10 +21,12 @@ require ( github.com/golang/protobuf v1.5.3 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/pprof v0.0.0-20231212022811-ec68065c825e // indirect + github.com/maxbrunsfeld/counterfeiter/v6 v6.8.1 // indirect + golang.org/x/mod v0.14.0 // indirect golang.org/x/net v0.20.0 // indirect golang.org/x/sys v0.16.0 // indirect golang.org/x/text v0.14.0 // indirect - golang.org/x/tools v0.16.1 // indirect + golang.org/x/tools v0.17.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index c1026a8..e2fd414 100644 --- a/go.sum +++ b/go.sum @@ -19,6 +19,8 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGa github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/maxbrunsfeld/counterfeiter/v6 v6.8.1 h1:NicmruxkeqHjDv03SfSxqmaLuisddudfP3h5wdXFbhM= +github.com/maxbrunsfeld/counterfeiter/v6 v6.8.1/go.mod h1:eyp4DdUJAKkr9tvxR3jWhw2mDK7CWABMG5r9uyaKC7I= github.com/onsi/ginkgo/v2 v2.13.2 h1:Bi2gGVkfn6gQcjNjZJVO8Gf0FHzMPf2phUei9tejVMs= github.com/onsi/ginkgo/v2 v2.13.2/go.mod h1:XStQ8QcGwLyF4HdfcZB8SFOS/MWCgDuXMSBe6zrvLgM= github.com/onsi/gomega v1.30.0 h1:hvMK7xYz4D3HapigLTeGdId/NcfQx1VHMJc60ew99+8= @@ -45,6 +47,8 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/tetratelabs/wazero v1.6.0 h1:z0H1iikCdP8t+q341xqepY4EWvHEw8Es7tlqiVzlP3g= github.com/tetratelabs/wazero v1.6.0/go.mod h1:0U0G41+ochRKoPKCJlh0jMg1CHkyfK8kDqiirMmKY8A= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0= +golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= @@ -59,6 +63,8 @@ golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.16.1 h1:TLyB3WofjdOEepBHAU20JdNC1Zbg87elYofWYAY5oZA= golang.org/x/tools v0.16.1/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0= +golang.org/x/tools v0.17.0 h1:FvmRgNOcs3kOa+T20R1uhfP9F6HgG2mfxDv1vrx1Htc= +golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac h1:nUQEQmH/csSvFECKYRv6HWEyypysidKl2I6Qpsglq/0= google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac/go.mod h1:daQN87bsDqDoe316QbbvX60nMoJQa4r6Ds0ZuoAe5yA= diff --git a/go_sdk_test.go b/go_sdk_test.go index 752caab..ab5471c 100644 --- a/go_sdk_test.go +++ b/go_sdk_test.go @@ -34,7 +34,11 @@ type InternalServer struct { protos.UnimplementedInternalServer } -func (i *InternalServer) GetSetPipelinesCommandByService(ctx context.Context, service string) (*protos.GetSetPipelinesCommandsByServiceResponse, error) { +// Needed for implementing fake grpc server for testing +func (i *InternalServer) GetSetPipelinesCommandsByService( + _ context.Context, + _ *protos.GetSetPipelinesCommandsByServiceRequest, +) (*protos.GetSetPipelinesCommandsByServiceResponse, error) { return &protos.GetSetPipelinesCommandsByServiceResponse{}, nil } @@ -183,8 +187,10 @@ var _ = Describe("Streamdal", func() { }) It("returns a single pipeline", func() { - s.pipelines[audToStr(aud)] = map[string]*protos.Command{ - uuid.New().String(): {}, + s.pipelines[audToStr(aud)] = []*protos.Pipeline{ + { + Id: uuid.New().String(), + }, } Expect(len(s.getPipelines(ctx, aud))).To(Equal(1)) }) diff --git a/server/client.go b/server/client.go index 250516c..2099510 100644 --- a/server/client.go +++ b/server/client.go @@ -201,7 +201,7 @@ func (c *Client) GetSetPipelinesCommandByService(ctx context.Context, service st resp, err := c.Server.GetSetPipelinesCommandsByService(ctx, &protos.GetSetPipelinesCommandsByServiceRequest{ServiceName: service}) if err != nil { - return nil, errors.Wrap(err, "unable to get attach commands by service") + return nil, errors.Wrap(err, "unable to get set pipeline commands by service") } return resp, nil diff --git a/server/serverfakes/fake_iserver_client.go b/server/serverfakes/fake_iserver_client.go index a22cb75..69c8dfa 100644 --- a/server/serverfakes/fake_iserver_client.go +++ b/server/serverfakes/fake_iserver_client.go @@ -11,18 +11,18 @@ import ( ) type FakeIServerClient struct { - GetAttachCommandsByServiceStub func(context.Context, string) (*protos.GetAttachCommandsByServiceResponse, error) - getAttachCommandsByServiceMutex sync.RWMutex - getAttachCommandsByServiceArgsForCall []struct { + GetSetPipelinesCommandByServiceStub func(context.Context, string) (*protos.GetSetPipelinesCommandsByServiceResponse, error) + getSetPipelinesCommandByServiceMutex sync.RWMutex + getSetPipelinesCommandByServiceArgsForCall []struct { arg1 context.Context arg2 string } - getAttachCommandsByServiceReturns struct { - result1 *protos.GetAttachCommandsByServiceResponse + getSetPipelinesCommandByServiceReturns struct { + result1 *protos.GetSetPipelinesCommandsByServiceResponse result2 error } - getAttachCommandsByServiceReturnsOnCall map[int]struct { - result1 *protos.GetAttachCommandsByServiceResponse + getSetPipelinesCommandByServiceReturnsOnCall map[int]struct { + result1 *protos.GetSetPipelinesCommandsByServiceResponse result2 error } GetTailStreamStub func(context.Context) (protos.Internal_SendTailClient, error) @@ -130,17 +130,17 @@ type FakeIServerClient struct { invocationsMutex sync.RWMutex } -func (fake *FakeIServerClient) GetAttachCommandsByService(arg1 context.Context, arg2 string) (*protos.GetAttachCommandsByServiceResponse, error) { - fake.getAttachCommandsByServiceMutex.Lock() - ret, specificReturn := fake.getAttachCommandsByServiceReturnsOnCall[len(fake.getAttachCommandsByServiceArgsForCall)] - fake.getAttachCommandsByServiceArgsForCall = append(fake.getAttachCommandsByServiceArgsForCall, struct { +func (fake *FakeIServerClient) GetSetPipelinesCommandByService(arg1 context.Context, arg2 string) (*protos.GetSetPipelinesCommandsByServiceResponse, error) { + fake.getSetPipelinesCommandByServiceMutex.Lock() + ret, specificReturn := fake.getSetPipelinesCommandByServiceReturnsOnCall[len(fake.getSetPipelinesCommandByServiceArgsForCall)] + fake.getSetPipelinesCommandByServiceArgsForCall = append(fake.getSetPipelinesCommandByServiceArgsForCall, struct { arg1 context.Context arg2 string }{arg1, arg2}) - stub := fake.GetAttachCommandsByServiceStub - fakeReturns := fake.getAttachCommandsByServiceReturns - fake.recordInvocation("GetAttachCommandsByService", []interface{}{arg1, arg2}) - fake.getAttachCommandsByServiceMutex.Unlock() + stub := fake.GetSetPipelinesCommandByServiceStub + fakeReturns := fake.getSetPipelinesCommandByServiceReturns + fake.recordInvocation("GetSetPipelinesCommandByService", []interface{}{arg1, arg2}) + fake.getSetPipelinesCommandByServiceMutex.Unlock() if stub != nil { return stub(arg1, arg2) } @@ -150,47 +150,47 @@ func (fake *FakeIServerClient) GetAttachCommandsByService(arg1 context.Context, return fakeReturns.result1, fakeReturns.result2 } -func (fake *FakeIServerClient) GetAttachCommandsByServiceCallCount() int { - fake.getAttachCommandsByServiceMutex.RLock() - defer fake.getAttachCommandsByServiceMutex.RUnlock() - return len(fake.getAttachCommandsByServiceArgsForCall) +func (fake *FakeIServerClient) GetSetPipelinesCommandByServiceCallCount() int { + fake.getSetPipelinesCommandByServiceMutex.RLock() + defer fake.getSetPipelinesCommandByServiceMutex.RUnlock() + return len(fake.getSetPipelinesCommandByServiceArgsForCall) } -func (fake *FakeIServerClient) GetAttachCommandsByServiceCalls(stub func(context.Context, string) (*protos.GetAttachCommandsByServiceResponse, error)) { - fake.getAttachCommandsByServiceMutex.Lock() - defer fake.getAttachCommandsByServiceMutex.Unlock() - fake.GetAttachCommandsByServiceStub = stub +func (fake *FakeIServerClient) GetSetPipelinesCommandByServiceCalls(stub func(context.Context, string) (*protos.GetSetPipelinesCommandsByServiceResponse, error)) { + fake.getSetPipelinesCommandByServiceMutex.Lock() + defer fake.getSetPipelinesCommandByServiceMutex.Unlock() + fake.GetSetPipelinesCommandByServiceStub = stub } -func (fake *FakeIServerClient) GetAttachCommandsByServiceArgsForCall(i int) (context.Context, string) { - fake.getAttachCommandsByServiceMutex.RLock() - defer fake.getAttachCommandsByServiceMutex.RUnlock() - argsForCall := fake.getAttachCommandsByServiceArgsForCall[i] +func (fake *FakeIServerClient) GetSetPipelinesCommandByServiceArgsForCall(i int) (context.Context, string) { + fake.getSetPipelinesCommandByServiceMutex.RLock() + defer fake.getSetPipelinesCommandByServiceMutex.RUnlock() + argsForCall := fake.getSetPipelinesCommandByServiceArgsForCall[i] return argsForCall.arg1, argsForCall.arg2 } -func (fake *FakeIServerClient) GetAttachCommandsByServiceReturns(result1 *protos.GetAttachCommandsByServiceResponse, result2 error) { - fake.getAttachCommandsByServiceMutex.Lock() - defer fake.getAttachCommandsByServiceMutex.Unlock() - fake.GetAttachCommandsByServiceStub = nil - fake.getAttachCommandsByServiceReturns = struct { - result1 *protos.GetAttachCommandsByServiceResponse +func (fake *FakeIServerClient) GetSetPipelinesCommandByServiceReturns(result1 *protos.GetSetPipelinesCommandsByServiceResponse, result2 error) { + fake.getSetPipelinesCommandByServiceMutex.Lock() + defer fake.getSetPipelinesCommandByServiceMutex.Unlock() + fake.GetSetPipelinesCommandByServiceStub = nil + fake.getSetPipelinesCommandByServiceReturns = struct { + result1 *protos.GetSetPipelinesCommandsByServiceResponse result2 error }{result1, result2} } -func (fake *FakeIServerClient) GetAttachCommandsByServiceReturnsOnCall(i int, result1 *protos.GetAttachCommandsByServiceResponse, result2 error) { - fake.getAttachCommandsByServiceMutex.Lock() - defer fake.getAttachCommandsByServiceMutex.Unlock() - fake.GetAttachCommandsByServiceStub = nil - if fake.getAttachCommandsByServiceReturnsOnCall == nil { - fake.getAttachCommandsByServiceReturnsOnCall = make(map[int]struct { - result1 *protos.GetAttachCommandsByServiceResponse +func (fake *FakeIServerClient) GetSetPipelinesCommandByServiceReturnsOnCall(i int, result1 *protos.GetSetPipelinesCommandsByServiceResponse, result2 error) { + fake.getSetPipelinesCommandByServiceMutex.Lock() + defer fake.getSetPipelinesCommandByServiceMutex.Unlock() + fake.GetSetPipelinesCommandByServiceStub = nil + if fake.getSetPipelinesCommandByServiceReturnsOnCall == nil { + fake.getSetPipelinesCommandByServiceReturnsOnCall = make(map[int]struct { + result1 *protos.GetSetPipelinesCommandsByServiceResponse result2 error }) } - fake.getAttachCommandsByServiceReturnsOnCall[i] = struct { - result1 *protos.GetAttachCommandsByServiceResponse + fake.getSetPipelinesCommandByServiceReturnsOnCall[i] = struct { + result1 *protos.GetSetPipelinesCommandsByServiceResponse result2 error }{result1, result2} } @@ -704,8 +704,8 @@ func (fake *FakeIServerClient) SendSchemaReturnsOnCall(i int, result1 error) { func (fake *FakeIServerClient) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() - fake.getAttachCommandsByServiceMutex.RLock() - defer fake.getAttachCommandsByServiceMutex.RUnlock() + fake.getSetPipelinesCommandByServiceMutex.RLock() + defer fake.getSetPipelinesCommandByServiceMutex.RUnlock() fake.getTailStreamMutex.RLock() defer fake.getTailStreamMutex.RUnlock() fake.heartBeatMutex.RLock() diff --git a/validate/validate_test.go b/validate/validate_test.go index fba0057..2bef6be 100644 --- a/validate/validate_test.go +++ b/validate/validate_test.go @@ -104,7 +104,7 @@ var _ = Describe("Validate", func() { It("cannot have unset tail request", func() { err := TailRequestStartCommand(&protos.Command{ - Command: &protos.Command_AttachPipeline{}, + Command: &protos.Command_SetPipelines{}, }) Expect(err.Error()).To(Equal(ErrNilField("Tail").Error())) }) @@ -159,7 +159,7 @@ var _ = Describe("Validate", func() { It("cannot have unset tail request", func() { err := TailRequestStopCommand(&protos.Command{ - Command: &protos.Command_AttachPipeline{}, + Command: &protos.Command_SetPipelines{}, }) Expect(err.Error()).To(Equal(ErrNilField("Tail").Error())) }) diff --git a/wasm_test.go b/wasm_test.go index 3bb93f8..d7bb1d2 100644 --- a/wasm_test.go +++ b/wasm_test.go @@ -41,7 +41,7 @@ var _ = Describe("WASM Modules", func() { s = &Streamdal{ pipelinesMtx: &sync.RWMutex{}, - pipelines: map[string]map[string]*protos.Command{}, + pipelines: map[string][]*protos.Pipeline{}, audiencesMtx: &sync.RWMutex{}, audiences: map[string]struct{}{}, } @@ -110,7 +110,7 @@ var _ = Describe("WASM Modules", func() { s := &Streamdal{ pipelinesMtx: &sync.RWMutex{}, - pipelines: map[string]map[string]*protos.Command{}, + pipelines: map[string][]*protos.Pipeline{}, audiencesMtx: &sync.RWMutex{}, audiences: map[string]struct{}{}, } @@ -172,7 +172,7 @@ var _ = Describe("WASM Modules", func() { s = &Streamdal{ pipelinesMtx: &sync.RWMutex{}, - pipelines: map[string]map[string]*protos.Command{}, + pipelines: map[string][]*protos.Pipeline{}, audiencesMtx: &sync.RWMutex{}, audiences: map[string]struct{}{}, } @@ -263,7 +263,7 @@ var _ = Describe("WASM Modules", func() { BeforeEach(func() { s = &Streamdal{ pipelinesMtx: &sync.RWMutex{}, - pipelines: map[string]map[string]*protos.Command{}, + pipelines: map[string][]*protos.Pipeline{}, audiencesMtx: &sync.RWMutex{}, audiences: map[string]struct{}{}, } @@ -497,7 +497,7 @@ var _ = Describe("WASM Modules", func() { s = &Streamdal{ pipelinesMtx: &sync.RWMutex{}, - pipelines: map[string]map[string]*protos.Command{}, + pipelines: map[string][]*protos.Pipeline{}, audiencesMtx: &sync.RWMutex{}, audiences: map[string]struct{}{}, } @@ -581,7 +581,7 @@ var _ = Describe("WASM Modules", func() { s = &Streamdal{ pipelinesMtx: &sync.RWMutex{}, - pipelines: map[string]map[string]*protos.Command{}, + pipelines: map[string][]*protos.Pipeline{}, audiencesMtx: &sync.RWMutex{}, audiences: map[string]struct{}{}, }