diff --git a/assembler.go b/assembler.go index 4bd1ab3..c54a3c8 100644 --- a/assembler.go +++ b/assembler.go @@ -5,7 +5,6 @@ package wrpssp import ( "context" - "errors" "io" "strings" "sync" @@ -13,10 +12,6 @@ import ( "github.com/xmidt-org/wrp-go/v3" ) -var ( - errNotHandled = errors.New("not handled") -) - // Assembler is a struct that reads from a stream of WRP messages and assembles // them into a single stream. // @@ -114,7 +109,7 @@ func (a *Assembler) close() { // The context is not used, but is required by the wrp.Processor interface. func (a *Assembler) ProcessWRP(_ context.Context, msg wrp.Message) error { if !isSSP(&msg) { - return errNotHandled + return wrp.ErrNotHandled } h, err := get(&msg) @@ -150,3 +145,17 @@ func (a *Assembler) ProcessWRP(_ context.Context, msg wrp.Message) error { return nil } + +// GetStreamID returns the stream ID of the message if it is an SSP message. +func GetStreamID(msg wrp.Message) (string, error) { + if !isSSP(&msg) { + return "", wrp.ErrNotHandled + } + + h, err := get(&msg) + if err != nil { + return "", err + } + + return h.id, nil +} diff --git a/assembler_test.go b/assembler_test.go index 462b224..cdd2fb0 100644 --- a/assembler_test.go +++ b/assembler_test.go @@ -5,6 +5,8 @@ package wrpssp import ( "context" + "errors" + "fmt" "io" "testing" @@ -157,7 +159,7 @@ func TestAssembler_ProcessWRP(t *testing.T) { Payload: []byte("Hello"), }, expected: map[uint64]block{}, - err: errNotHandled, + err: wrp.ErrNotHandled, }, { name: "no message number", assembler: &Assembler{ @@ -280,3 +282,64 @@ func TestAssembler_ProcessWRP(t *testing.T) { }) } } + +func TestGetStreamID(t *testing.T) { + someErr := fmt.Errorf("some error") + tests := []struct { + name string + msg wrp.Message + wantID string + wantErr error + }{ + { + name: "Valid SSP Message", + msg: wrp.Message{ + Type: wrp.SimpleEventMessageType, + Headers: []string{ + "stream-id:Test-Stream-Id", + "stream-packet-number:0", + }, + }, + wantID: "Test-Stream-Id", + }, { + name: "Non-SSP Message", + msg: wrp.Message{ + Type: wrp.SimpleRequestResponseMessageType, + }, + wantErr: wrp.ErrNotHandled, + }, { + name: "A SSP message without the stream-pack-number", + msg: wrp.Message{ + Type: wrp.SimpleEventMessageType, + Headers: []string{ + "stream-id:Test-Stream-Id", + }, + }, + wantErr: someErr, + }, { + name: "SSP Message Without Stream ID", + msg: wrp.Message{ + Type: wrp.SimpleEventMessageType, + }, + wantErr: wrp.ErrNotHandled, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotID, err := GetStreamID(tt.msg) + + if tt.wantErr != nil { + assert.Error(t, err) + if !errors.Is(tt.wantErr, someErr) { + assert.ErrorIs(t, err, tt.wantErr) + } + assert.Empty(t, gotID) + return + } + + assert.NoError(t, err) + assert.Equal(t, tt.wantID, gotID) + }) + } +} diff --git a/go.mod b/go.mod index 84ac4d3..2278828 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,8 @@ module github.com/xmidt-org/wrpssp go 1.23.1 require ( - github.com/stretchr/testify v1.9.0 - github.com/xmidt-org/wrp-go/v3 v3.6.0 + github.com/stretchr/testify v1.10.0 + github.com/xmidt-org/wrp-go/v3 v3.7.0 ) require ( diff --git a/go.sum b/go.sum index 290a982..9e9776f 100644 --- a/go.sum +++ b/go.sum @@ -4,12 +4,12 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE= github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= -github.com/xmidt-org/wrp-go/v3 v3.6.0 h1:g8qk4Xtzm7f9AslSlhv46syb9FJpDdyOSqVlUa45L7g= -github.com/xmidt-org/wrp-go/v3 v3.6.0/go.mod h1:eyMj+q/7LQ4SU6Z3s6VOwuTVSh6/DJBb2soBGBFSung= +github.com/xmidt-org/wrp-go/v3 v3.7.0 h1:m9ghdq79Zzb0WjomUJ02rzFpI0RK8KTjArYpNIwx1fc= +github.com/xmidt-org/wrp-go/v3 v3.7.0/go.mod h1:eyMj+q/7LQ4SU6Z3s6VOwuTVSh6/DJBb2soBGBFSung= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/options.go b/options.go index 9e74a15..c7e6b81 100644 --- a/options.go +++ b/options.go @@ -21,7 +21,7 @@ func (f optionFunc) apply(file *Packetizer) error { } // ID sets the ID of the stream. The ID must be a non-empty string containing -// only [A-Za-z0-9_-]. This is a required field. +// only [A-Za-z0-9 !#$&'()*+,./:;=?@[\]~_-]. This is a required field. func ID(id string) Option { return optionFunc(func(s *Packetizer) error { s.headers.id = id