Skip to content

Commit

Permalink
Merge pull request #5 from xmidt-org/add-compression
Browse files Browse the repository at this point in the history
Add compression
  • Loading branch information
schmidtw authored Jan 22, 2025
2 parents c3e0dc3 + b6ad85e commit 7de0e63
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 18 deletions.
21 changes: 15 additions & 6 deletions assembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,13 @@ package wrpssp

import (
"context"
"errors"
"io"
"strings"
"sync"

"github.com/xmidt-org/wrp-go/v3"
)

var (
errNotHandled = errors.New("not handled")
)

// Assembler is a struct that reads from a stream of WRP messages and assembles
// them into a single stream.
//
Expand Down Expand Up @@ -114,7 +109,7 @@ func (a *Assembler) close() {
// The context is not used, but is required by the wrp.Processor interface.
func (a *Assembler) ProcessWRP(_ context.Context, msg wrp.Message) error {
if !isSSP(&msg) {
return errNotHandled
return wrp.ErrNotHandled
}

h, err := get(&msg)
Expand Down Expand Up @@ -150,3 +145,17 @@ func (a *Assembler) ProcessWRP(_ context.Context, msg wrp.Message) error {

return nil
}

// GetStreamID returns the stream ID of the message if it is an SSP message.
func GetStreamID(msg wrp.Message) (string, error) {
if !isSSP(&msg) {
return "", wrp.ErrNotHandled
}

h, err := get(&msg)
if err != nil {
return "", err
}

return h.id, nil
}
65 changes: 64 additions & 1 deletion assembler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package wrpssp

import (
"context"
"errors"
"fmt"
"io"
"testing"

Expand Down Expand Up @@ -157,7 +159,7 @@ func TestAssembler_ProcessWRP(t *testing.T) {
Payload: []byte("Hello"),
},
expected: map[uint64]block{},
err: errNotHandled,
err: wrp.ErrNotHandled,
}, {
name: "no message number",
assembler: &Assembler{
Expand Down Expand Up @@ -280,3 +282,64 @@ func TestAssembler_ProcessWRP(t *testing.T) {
})
}
}

func TestGetStreamID(t *testing.T) {
someErr := fmt.Errorf("some error")
tests := []struct {
name string
msg wrp.Message
wantID string
wantErr error
}{
{
name: "Valid SSP Message",
msg: wrp.Message{
Type: wrp.SimpleEventMessageType,
Headers: []string{
"stream-id:Test-Stream-Id",
"stream-packet-number:0",
},
},
wantID: "Test-Stream-Id",
}, {
name: "Non-SSP Message",
msg: wrp.Message{
Type: wrp.SimpleRequestResponseMessageType,
},
wantErr: wrp.ErrNotHandled,
}, {
name: "A SSP message without the stream-pack-number",
msg: wrp.Message{
Type: wrp.SimpleEventMessageType,
Headers: []string{
"stream-id:Test-Stream-Id",
},
},
wantErr: someErr,
}, {
name: "SSP Message Without Stream ID",
msg: wrp.Message{
Type: wrp.SimpleEventMessageType,
},
wantErr: wrp.ErrNotHandled,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotID, err := GetStreamID(tt.msg)

if tt.wantErr != nil {
assert.Error(t, err)
if !errors.Is(tt.wantErr, someErr) {
assert.ErrorIs(t, err, tt.wantErr)
}
assert.Empty(t, gotID)
return
}

assert.NoError(t, err)
assert.Equal(t, tt.wantID, gotID)
})
}
}
2 changes: 1 addition & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (f optionFunc) apply(file *Packetizer) error {
}

// ID sets the ID of the stream. The ID must be a non-empty string containing
// only [A-Za-z0-9_-]. This is a required field.
// only [A-Za-z0-9 !#$&'()*+,./:;=?@[\]~_-]. This is a required field.
func ID(id string) Option {
return optionFunc(func(s *Packetizer) error {
s.headers.id = id
Expand Down
26 changes: 16 additions & 10 deletions protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,29 +38,29 @@ is mainly designed for sending files from a CPE or streaming data from the CPE,
and request response handling will need to be handled via events instead of the
simpler "Simple Request-Response" (msg_type = 3) message type.

The destination (dst) field of the Simple Event message must be in the following format:
```bnf
event:<application-name>/<stream-id>
<application-name> ::= <string>
<stream-id> ::= <string>
```
- `application-id`: A unique application identifier.
- `stream-id`: The unique stream identifier.

The wrp message headers field should contain the following control headers:

```bnf
<stream-id> ::= <string>
<stream-packet-number> ::= [1-9][0-9]*
<stream-final-packet> ::= <string>
<stream-encoding> ::= 'gzip' | 'deflate' | 'identity'
<stream-estimated-total-length> ::= [1-9][0-9]*
```

Any whitespace found is ignored as well as the case of the labels.

- `stream-id`: The unique stream identifier.
- `stream-packet-number`: **Required** The 0-index based packet reassembly order.
- `stream-final-packet`: **Required** Marks the final packet in the stream and
end of stream reason. Only present in the final packet.
- `stream-encoding`: **Optional** The encoding used to create the payload.
- `gzip`: The original payload was gzipped and the compressed version was
sent.
- `defalte`: The original payload was deflated and the compressed version
was sent.
- `identity`: The payload is a raw stream of bytes. If the header is
omitted, `identity` is the default value.
- `stream-estimated-total-length`: **Optional** Indicates the estimated total
length if the stream is a known size. The value is informative only.

Expand Down Expand Up @@ -91,6 +91,12 @@ the assembly buffer.

Packets MAY be delivered out of order and this MUST be tolerated.

The encoding for each packet MAY be different for each packet.

Each packet is optionally compressed in isolation. This allows the producer of
the stream to compress as they go instead of needing to have an entire buffer of
the complete stream in memory.

## 4. Duplicate Packets

Packets MAY be duplicated by any part of the system. Duplicate packets MUST
Expand Down

0 comments on commit 7de0e63

Please sign in to comment.