Skip to content
This repository has been archived by the owner on Feb 9, 2024. It is now read-only.

Commit

Permalink
updated to use SetPipelines, removed pause, resume, attach
Browse files Browse the repository at this point in the history
changed pipelines to `map[string][]*protos.Pipeline` (was
`map[string]map[string]*protos.Command`.
  • Loading branch information
dselans committed Jan 30, 2024
1 parent 4190d3e commit 3b85f5f
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 59 deletions.
4 changes: 2 additions & 2 deletions benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func benchmarkWASM(wasmFile, testPayloadFile string, step *protos.PipelineStep,

s := &Streamdal{
pipelinesMtx: &sync.RWMutex{},
pipelines: map[string]map[string]*protos.Command{},
pipelines: map[string][]*protos.Pipeline{},
audiencesMtx: &sync.RWMutex{},
audiences: map[string]struct{}{},
}
Expand Down Expand Up @@ -204,7 +204,7 @@ func inferSchema(fileName string) (*protos.WASMResponse, error) {

s := &Streamdal{
pipelinesMtx: &sync.RWMutex{},
pipelines: map[string]map[string]*protos.Command{},
pipelines: map[string][]*protos.Pipeline{},
audiencesMtx: &sync.RWMutex{},
audiences: map[string]struct{}{},
}
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/pprof v0.0.0-20231212022811-ec68065c825e // indirect
github.com/maxbrunsfeld/counterfeiter/v6 v6.8.1 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.16.1 // indirect
golang.org/x/tools v0.17.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGa
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/maxbrunsfeld/counterfeiter/v6 v6.8.1 h1:NicmruxkeqHjDv03SfSxqmaLuisddudfP3h5wdXFbhM=
github.com/maxbrunsfeld/counterfeiter/v6 v6.8.1/go.mod h1:eyp4DdUJAKkr9tvxR3jWhw2mDK7CWABMG5r9uyaKC7I=
github.com/onsi/ginkgo/v2 v2.13.2 h1:Bi2gGVkfn6gQcjNjZJVO8Gf0FHzMPf2phUei9tejVMs=
github.com/onsi/ginkgo/v2 v2.13.2/go.mod h1:XStQ8QcGwLyF4HdfcZB8SFOS/MWCgDuXMSBe6zrvLgM=
github.com/onsi/gomega v1.30.0 h1:hvMK7xYz4D3HapigLTeGdId/NcfQx1VHMJc60ew99+8=
Expand All @@ -45,6 +47,8 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/tetratelabs/wazero v1.6.0 h1:z0H1iikCdP8t+q341xqepY4EWvHEw8Es7tlqiVzlP3g=
github.com/tetratelabs/wazero v1.6.0/go.mod h1:0U0G41+ochRKoPKCJlh0jMg1CHkyfK8kDqiirMmKY8A=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0=
golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
Expand All @@ -59,6 +63,8 @@ golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.16.1 h1:TLyB3WofjdOEepBHAU20JdNC1Zbg87elYofWYAY5oZA=
golang.org/x/tools v0.16.1/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0=
golang.org/x/tools v0.17.0 h1:FvmRgNOcs3kOa+T20R1uhfP9F6HgG2mfxDv1vrx1Htc=
golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac h1:nUQEQmH/csSvFECKYRv6HWEyypysidKl2I6Qpsglq/0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac/go.mod h1:daQN87bsDqDoe316QbbvX60nMoJQa4r6Ds0ZuoAe5yA=
Expand Down
12 changes: 9 additions & 3 deletions go_sdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ type InternalServer struct {
protos.UnimplementedInternalServer
}

func (i *InternalServer) GetSetPipelinesCommandByService(ctx context.Context, service string) (*protos.GetSetPipelinesCommandsByServiceResponse, error) {
// Needed for implementing fake grpc server for testing
func (i *InternalServer) GetSetPipelinesCommandsByService(
_ context.Context,
_ *protos.GetSetPipelinesCommandsByServiceRequest,
) (*protos.GetSetPipelinesCommandsByServiceResponse, error) {
return &protos.GetSetPipelinesCommandsByServiceResponse{}, nil
}

Expand Down Expand Up @@ -183,8 +187,10 @@ var _ = Describe("Streamdal", func() {
})

It("returns a single pipeline", func() {
s.pipelines[audToStr(aud)] = map[string]*protos.Command{
uuid.New().String(): {},
s.pipelines[audToStr(aud)] = []*protos.Pipeline{
{
Id: uuid.New().String(),
},
}
Expect(len(s.getPipelines(ctx, aud))).To(Equal(1))
})
Expand Down
2 changes: 1 addition & 1 deletion server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (c *Client) GetSetPipelinesCommandByService(ctx context.Context, service st

resp, err := c.Server.GetSetPipelinesCommandsByService(ctx, &protos.GetSetPipelinesCommandsByServiceRequest{ServiceName: service})
if err != nil {
return nil, errors.Wrap(err, "unable to get attach commands by service")
return nil, errors.Wrap(err, "unable to get set pipeline commands by service")
}

return resp, nil
Expand Down
88 changes: 44 additions & 44 deletions server/serverfakes/fake_iserver_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions validate/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ var _ = Describe("Validate", func() {

It("cannot have unset tail request", func() {
err := TailRequestStartCommand(&protos.Command{
Command: &protos.Command_AttachPipeline{},
Command: &protos.Command_SetPipelines{},
})
Expect(err.Error()).To(Equal(ErrNilField("Tail").Error()))
})
Expand Down Expand Up @@ -159,7 +159,7 @@ var _ = Describe("Validate", func() {

It("cannot have unset tail request", func() {
err := TailRequestStopCommand(&protos.Command{
Command: &protos.Command_AttachPipeline{},
Command: &protos.Command_SetPipelines{},
})
Expect(err.Error()).To(Equal(ErrNilField("Tail").Error()))
})
Expand Down
12 changes: 6 additions & 6 deletions wasm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var _ = Describe("WASM Modules", func() {

s = &Streamdal{
pipelinesMtx: &sync.RWMutex{},
pipelines: map[string]map[string]*protos.Command{},
pipelines: map[string][]*protos.Pipeline{},
audiencesMtx: &sync.RWMutex{},
audiences: map[string]struct{}{},
}
Expand Down Expand Up @@ -110,7 +110,7 @@ var _ = Describe("WASM Modules", func() {

s := &Streamdal{
pipelinesMtx: &sync.RWMutex{},
pipelines: map[string]map[string]*protos.Command{},
pipelines: map[string][]*protos.Pipeline{},
audiencesMtx: &sync.RWMutex{},
audiences: map[string]struct{}{},
}
Expand Down Expand Up @@ -172,7 +172,7 @@ var _ = Describe("WASM Modules", func() {

s = &Streamdal{
pipelinesMtx: &sync.RWMutex{},
pipelines: map[string]map[string]*protos.Command{},
pipelines: map[string][]*protos.Pipeline{},
audiencesMtx: &sync.RWMutex{},
audiences: map[string]struct{}{},
}
Expand Down Expand Up @@ -263,7 +263,7 @@ var _ = Describe("WASM Modules", func() {
BeforeEach(func() {
s = &Streamdal{
pipelinesMtx: &sync.RWMutex{},
pipelines: map[string]map[string]*protos.Command{},
pipelines: map[string][]*protos.Pipeline{},
audiencesMtx: &sync.RWMutex{},
audiences: map[string]struct{}{},
}
Expand Down Expand Up @@ -497,7 +497,7 @@ var _ = Describe("WASM Modules", func() {

s = &Streamdal{
pipelinesMtx: &sync.RWMutex{},
pipelines: map[string]map[string]*protos.Command{},
pipelines: map[string][]*protos.Pipeline{},
audiencesMtx: &sync.RWMutex{},
audiences: map[string]struct{}{},
}
Expand Down Expand Up @@ -581,7 +581,7 @@ var _ = Describe("WASM Modules", func() {

s = &Streamdal{
pipelinesMtx: &sync.RWMutex{},
pipelines: map[string]map[string]*protos.Command{},
pipelines: map[string][]*protos.Pipeline{},
audiencesMtx: &sync.RWMutex{},
audiences: map[string]struct{}{},
}
Expand Down

0 comments on commit 3b85f5f

Please sign in to comment.