Skip to content

Commit

Permalink
more lint
Browse files Browse the repository at this point in the history
  • Loading branch information
philandstuff committed Oct 11, 2024
1 parent 35f0a42 commit abaf8f3
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 7 deletions.
2 changes: 2 additions & 0 deletions internal/sse/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (s *Streamer) connect(ctx context.Context) error {
}
s.attempt++
reconnectDelay := time.NewTimer(delay)
// once we only support go 1.23+, we can use time.After() here and simplify
defer reconnectDelay.Stop()
select {
case <-ctx.Done():
Expand All @@ -70,6 +71,7 @@ func (s *Streamer) connect(ctx context.Context) error {
req.Header.Set("Last-Event-ID", s.lastEventID)
}

//nolint:bodyclose
resp, err := s.c.Do(req)
if err != nil {
// try again
Expand Down
7 changes: 1 addition & 6 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,16 +298,11 @@ func (f *fileStreamer) Close() error {
return f.s.Close()
}

type FileStreamer interface {
io.Closer
NextFile(ctx context.Context) (streaming.File, error)
}

// StreamPredictionFiles streams prediction file output via the replicate
// streaming api. It is the caller's responsibility to close the returned
// FileStreamer to ensure connections and associated resources are cleaned up
// appropriately.
func (r *Client) StreamPredictionFiles(ctx context.Context, prediction *Prediction) (FileStreamer, error) {
func (r *Client) StreamPredictionFiles(prediction *Prediction) (streaming.FileStreamer, error) {
url := prediction.URLs["stream"]
if url == "" {
return nil, errors.New("streaming not supported or not enabled for this prediction")
Expand Down
2 changes: 1 addition & 1 deletion stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ event: done
c, err := replicate.NewClient(replicate.WithToken("test-token"))
require.NoError(t, err)

files, err := c.StreamPredictionFiles(ctx, p)
files, err := c.StreamPredictionFiles(p)

require.NoError(t, err)

Expand Down
6 changes: 6 additions & 0 deletions streaming/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ import (
"io"
)

// FileStreamer represents a stream of output files from a model.
type FileStreamer interface {
io.Closer
NextFile(ctx context.Context) (File, error)
}

// File represents a file output from a model over an SSE stream. On the wire,
// it might be a data URL or a regular http URL. File abstracts over this and
// provides a way to get the data regardless of the implementation.
Expand Down

0 comments on commit abaf8f3

Please sign in to comment.