Skip to content

Commit

Permalink
fix:Use the wrp.ErrNotHandled. Add fn to get stream id.
Browse files Browse the repository at this point in the history
  • Loading branch information
schmidtw committed Jan 21, 2025
1 parent dfbfb3e commit 8c74632
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 14 deletions.
21 changes: 15 additions & 6 deletions assembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,13 @@ package wrpssp

import (
"context"
"errors"
"io"
"strings"
"sync"

"github.com/xmidt-org/wrp-go/v3"
)

var (
errNotHandled = errors.New("not handled")
)

// Assembler is a struct that reads from a stream of WRP messages and assembles
// them into a single stream.
//
Expand Down Expand Up @@ -114,7 +109,7 @@ func (a *Assembler) close() {
// The context is not used, but is required by the wrp.Processor interface.
func (a *Assembler) ProcessWRP(_ context.Context, msg wrp.Message) error {
if !isSSP(&msg) {
return errNotHandled
return wrp.ErrNotHandled
}

h, err := get(&msg)
Expand Down Expand Up @@ -150,3 +145,17 @@ func (a *Assembler) ProcessWRP(_ context.Context, msg wrp.Message) error {

return nil
}

// GetStreamID returns the stream ID of the message if it is an SSP message.
func GetStreamID(msg wrp.Message) (string, error) {
if !isSSP(&msg) {
return "", wrp.ErrNotHandled
}

h, err := get(&msg)
if err != nil {
return "", err
}

return h.id, nil
}
65 changes: 64 additions & 1 deletion assembler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package wrpssp

import (
"context"
"errors"
"fmt"
"io"
"testing"

Expand Down Expand Up @@ -157,7 +159,7 @@ func TestAssembler_ProcessWRP(t *testing.T) {
Payload: []byte("Hello"),
},
expected: map[uint64]block{},
err: errNotHandled,
err: wrp.ErrNotHandled,
}, {
name: "no message number",
assembler: &Assembler{
Expand Down Expand Up @@ -280,3 +282,64 @@ func TestAssembler_ProcessWRP(t *testing.T) {
})
}
}

func TestGetStreamID(t *testing.T) {
someErr := fmt.Errorf("some error")
tests := []struct {
name string
msg wrp.Message
wantID string
wantErr error
}{
{
name: "Valid SSP Message",
msg: wrp.Message{
Type: wrp.SimpleEventMessageType,
Headers: []string{
"stream-id:Test-Stream-Id",
"stream-packet-number:0",
},
},
wantID: "Test-Stream-Id",
}, {
name: "Non-SSP Message",
msg: wrp.Message{
Type: wrp.SimpleRequestResponseMessageType,
},
wantErr: wrp.ErrNotHandled,
}, {
name: "A SSP message without the stream-pack-number",
msg: wrp.Message{
Type: wrp.SimpleEventMessageType,
Headers: []string{
"stream-id:Test-Stream-Id",
},
},
wantErr: someErr,
}, {
name: "SSP Message Without Stream ID",
msg: wrp.Message{
Type: wrp.SimpleEventMessageType,
},
wantErr: wrp.ErrNotHandled,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotID, err := GetStreamID(tt.msg)

if tt.wantErr != nil {
assert.Error(t, err)
if !errors.Is(tt.wantErr, someErr) {
assert.ErrorIs(t, err, tt.wantErr)
}
assert.Empty(t, gotID)
return
}

assert.NoError(t, err)
assert.Equal(t, tt.wantID, gotID)
})
}
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ module github.com/xmidt-org/wrpssp
go 1.23.1

require (
github.com/stretchr/testify v1.9.0
github.com/xmidt-org/wrp-go/v3 v3.6.0
github.com/stretchr/testify v1.10.0
github.com/xmidt-org/wrp-go/v3 v3.7.0
)

require (
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
github.com/xmidt-org/wrp-go/v3 v3.6.0 h1:g8qk4Xtzm7f9AslSlhv46syb9FJpDdyOSqVlUa45L7g=
github.com/xmidt-org/wrp-go/v3 v3.6.0/go.mod h1:eyMj+q/7LQ4SU6Z3s6VOwuTVSh6/DJBb2soBGBFSung=
github.com/xmidt-org/wrp-go/v3 v3.7.0 h1:m9ghdq79Zzb0WjomUJ02rzFpI0RK8KTjArYpNIwx1fc=
github.com/xmidt-org/wrp-go/v3 v3.7.0/go.mod h1:eyMj+q/7LQ4SU6Z3s6VOwuTVSh6/DJBb2soBGBFSung=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
2 changes: 1 addition & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (f optionFunc) apply(file *Packetizer) error {
}

// ID sets the ID of the stream. The ID must be a non-empty string containing
// only [A-Za-z0-9_-]. This is a required field.
// only [A-Za-z0-9 !#$&'()*+,./:;=?@[\]~_-]. This is a required field.
func ID(id string) Option {
return optionFunc(func(s *Packetizer) error {
s.headers.id = id
Expand Down

0 comments on commit 8c74632

Please sign in to comment.