Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New streaming api #91

Merged
merged 19 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ toolchain go1.22.0

require (
github.com/stretchr/testify v1.8.4
github.com/vincent-petithory/dataurl v1.0.0
golang.org/x/sync v0.6.0
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/vincent-petithory/dataurl v1.0.0 h1:cXw+kPto8NLuJtlMsI152irrVw9fRDX8AbShPRpg2CI=
github.com/vincent-petithory/dataurl v1.0.0/go.mod h1:FHafX5vmDzyP+1CQATJn7WFKc9CvnvxyvZy6I1MrG/U=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
70 changes: 70 additions & 0 deletions internal/sse/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package sse

import (
"bufio"
"bytes"
"io"
"strings"
)

type Event struct {
Type string
ID string
Data string
}

type Decoder struct {
r *bufio.Reader
}

func NewDecoder(r io.Reader) *Decoder {
return &Decoder{r: bufio.NewReader(r)}
}

var (
eventField = []byte("event:")
dataField = []byte("data:")
idField = []byte("id:")
retryField = []byte("retry:")
space = []byte{' '}
)

func buildEvent(t, id string, data *strings.Builder) Event {
return Event{
Type: t,
ID: id,
Data: data.String(),
}
}

func (d *Decoder) Next() (Event, error) {
var t, id string
var data strings.Builder
for {
line, err := d.r.ReadBytes('\n')
if err == io.EOF {
return buildEvent(t, id, &data), io.ErrUnexpectedEOF
}
if err != nil {
return buildEvent(t, id, &data), err
}

switch {
case line[0] == '\n':
// a blank line finishes the event, so we return it
return buildEvent(t, id, &data), nil
case bytes.HasPrefix(line, eventField):
t = string(bytes.TrimPrefix(line[6:len(line)-1], space))
case bytes.HasPrefix(line, dataField):
// strings.Builder.Write() always returns nil error, so we don't
// need to handle it
data.Write(bytes.TrimPrefix(line[5:], space))
case bytes.HasPrefix(line, idField):
id = string(bytes.TrimPrefix(line[3:len(line)-1], space))
case bytes.HasPrefix(line, retryField):
default:
// ignore the line
}

}
}
132 changes: 132 additions & 0 deletions internal/sse/decoder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package sse_test

import (
"fmt"
"io"
"strings"
"testing"

"github.com/replicate/replicate-go/internal/sse"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestDecodeOneEventNoSpace(t *testing.T) {
input := `event:output
id:123abc
data:giraffe

`
d := sse.NewDecoder(strings.NewReader(input))

e, err := d.Next()

require.NoError(t, err)

assert.Equal(t, "output", e.Type)
assert.Equal(t, "123abc", e.ID)
assert.Equal(t, "giraffe\n", e.Data)
}

func TestDecodeOneEventWithSpace(t *testing.T) {
input := `event: output
id: 123abc
data: giraffe

`
d := sse.NewDecoder(strings.NewReader(input))

e, err := d.Next()

require.NoError(t, err)

assert.Equal(t, "output", e.Type)
assert.Equal(t, "123abc", e.ID)
// only one space should be trimmed
assert.Equal(t, " giraffe\n", e.Data)
}

func TestDecodeOneEventMultipleData(t *testing.T) {
input := `event:output
data:giraffe
data:rhino
data:wombat

`
d := sse.NewDecoder(strings.NewReader(input))

e, err := d.Next()

require.NoError(t, err)

assert.Equal(t, "output", e.Type)
assert.Equal(t, "giraffe\nrhino\nwombat\n", e.Data)
}

func TestDecodeOneEventHugeData(t *testing.T) {
// this test is mainly to make sure we're not constrained by the
// bufio.Reader buffer size
input := fmt.Sprintf(`event:output
data:%s

`, strings.Repeat("0123456789abcdef", 1_000_000))
d := sse.NewDecoder(strings.NewReader(input))

e, err := d.Next()

require.NoError(t, err)

assert.Equal(t, "output", e.Type)
// 16_000_000 data bytes and the terminal LF character
assert.Equal(t, 16_000_001, len(e.Data))
}

func TestDecodeManyEvents(t *testing.T) {
input := `event:output
id:alpha1
data:giraffe

event:output
id:bravo2
data:rhino

event:output
id:gamma3
data:pine marten

`
d := sse.NewDecoder(strings.NewReader(input))

e, err := d.Next()

require.NoError(t, err)

assert.Equal(t, "output", e.Type)
assert.Equal(t, "alpha1", e.ID)
assert.Equal(t, "giraffe\n", e.Data)

e, err = d.Next()

require.NoError(t, err)

assert.Equal(t, "output", e.Type)
assert.Equal(t, "bravo2", e.ID)
assert.Equal(t, "rhino\n", e.Data)

e, err = d.Next()

require.NoError(t, err)

assert.Equal(t, "output", e.Type)
assert.Equal(t, "gamma3", e.ID)
assert.Equal(t, "pine marten\n", e.Data)
}

func TestDecodeEarlyEOF(t *testing.T) {
input := ``
d := sse.NewDecoder(strings.NewReader(input))

_, err := d.Next()

assert.ErrorIs(t, err, io.ErrUnexpectedEOF)
}
Loading
Loading