Skip to content

Commit

Permalink
Delete client and handler stream constructors (#193)
Browse files Browse the repository at this point in the history
  • Loading branch information
akshayjshah authored Apr 9, 2022
1 parent f77be89 commit 28baade
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 37 deletions.
4 changes: 4 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ linters-settings:
struct-patterns:
# No zero values for param structs.
- 'connectrpc.com/connect.*[pP]arams'
# No zero values for ClientStream, ServerStream, and friends.
- 'connectrpc.com/connect.ClientStream*'
- 'connectrpc.com/connect.ServerStream*'
- 'connectrpc.com/connect.BidiStream*'
forbidigo:
forbid:
- '^fmt\.Print'
Expand Down
6 changes: 3 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (c *Client[Req, Res]) CallUnary(
// CallClientStream calls a client streaming procedure.
func (c *Client[Req, Res]) CallClientStream(ctx context.Context) *ClientStreamForClient[Req, Res] {
sender, receiver := c.newStream(ctx, StreamTypeClient)
return NewClientStreamForClient[Req, Res](sender, receiver)
return &ClientStreamForClient[Req, Res]{sender: sender, receiver: receiver}
}

// CallServerStream calls a server streaming procedure.
Expand All @@ -139,13 +139,13 @@ func (c *Client[Req, Res]) CallServerStream(
if err := sender.Close(nil); err != nil {
return nil, err
}
return NewServerStreamForClient[Res](receiver), nil
return &ServerStreamForClient[Res]{receiver: receiver}, nil
}

// CallBidiStream calls a bidirectional streaming procedure.
func (c *Client[Req, Res]) CallBidiStream(ctx context.Context) *BidiStreamForClient[Req, Res] {
sender, receiver := c.newStream(ctx, StreamTypeBidi)
return NewBidiStreamForClient[Req, Res](sender, receiver)
return &BidiStreamForClient[Req, Res]{sender: sender, receiver: receiver}
}

func (c *Client[Req, Res]) newStream(ctx context.Context, streamType StreamType) (Sender, Receiver) {
Expand Down
25 changes: 9 additions & 16 deletions client_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@ import (
)

// ClientStreamForClient is the client's view of a client streaming RPC.
//
// It's returned from Client.CallClientStream, but doesn't currently have an
// exported constructor function.
type ClientStreamForClient[Req, Res any] struct {
sender Sender
receiver Receiver
}

// NewClientStreamForClient constructs the client's view of a client streaming RPC.
func NewClientStreamForClient[Req, Res any](s Sender, r Receiver) *ClientStreamForClient[Req, Res] {
return &ClientStreamForClient[Req, Res]{sender: s, receiver: r}
}

// RequestHeader returns the request headers. Headers are sent to the server with the
// first call to Send.
func (c *ClientStreamForClient[Req, Res]) RequestHeader() http.Header {
Expand Down Expand Up @@ -65,18 +63,15 @@ func (c *ClientStreamForClient[Req, Res]) CloseAndReceive() (*Response[Res], err
}

// ServerStreamForClient is the client's view of a server streaming RPC.
//
// It's returned from Client.CallServerStream, but doesn't currently have an
// exported constructor function.
type ServerStreamForClient[Res any] struct {
receiver Receiver
msg Res
err error
}

// NewServerStreamForClient constructs the client's view of a server streaming
// RPC.
func NewServerStreamForClient[Res any](r Receiver) *ServerStreamForClient[Res] {
return &ServerStreamForClient[Res]{receiver: r}
}

// Receive advances the stream to the next message, which will then be
// available through the Msg method. It returns false when the stream stops,
// either by reaching the end or by encountering an unexpected error. After
Expand Down Expand Up @@ -123,16 +118,14 @@ func (s *ServerStreamForClient[Res]) Close() error {
}

// BidiStreamForClient is the client's view of a bidirectional streaming RPC.
//
// It's returned from Client.CallBidiStream, but doesn't currently have an
// exported constructor function.
type BidiStreamForClient[Req, Res any] struct {
sender Sender
receiver Receiver
}

// NewBidiStreamForClient constructs the client's view of a bidirectional streaming RPC.
func NewBidiStreamForClient[Req, Res any](s Sender, r Receiver) *BidiStreamForClient[Req, Res] {
return &BidiStreamForClient[Req, Res]{sender: s, receiver: r}
}

// RequestHeader returns the request headers. Headers are sent with the first
// call to Send.
func (b *BidiStreamForClient[Req, Res]) RequestHeader() http.Header {
Expand Down
6 changes: 3 additions & 3 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func NewClientStreamHandler[Req, Res any](
procedure,
StreamTypeClient,
func(ctx context.Context, sender Sender, receiver Receiver) {
stream := NewClientStream[Req, Res](sender, receiver)
stream := &ClientStream[Req, Res]{sender: sender, receiver: receiver}
err := implementation(ctx, stream)
_ = receiver.Close()
_ = sender.Close(err)
Expand All @@ -133,7 +133,7 @@ func NewServerStreamHandler[Req, Res any](
procedure,
StreamTypeServer,
func(ctx context.Context, sender Sender, receiver Receiver) {
stream := NewServerStream[Res](sender)
stream := &ServerStream[Res]{sender: sender}
req, err := receiveUnaryRequest[Req](receiver)
if err != nil {
_ = receiver.Close()
Expand Down Expand Up @@ -161,7 +161,7 @@ func NewBidiStreamHandler[Req, Res any](
procedure,
StreamTypeBidi,
func(ctx context.Context, sender Sender, receiver Receiver) {
stream := NewBidiStream[Req, Res](sender, receiver)
stream := &BidiStream[Req, Res]{sender: sender, receiver: receiver}
err := implementation(ctx, stream)
_ = receiver.Close()
_ = sender.Close(err)
Expand Down
24 changes: 9 additions & 15 deletions handler_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,16 @@ import (
)

// ClientStream is the handler's view of a client streaming RPC.
//
// It's constructed as part of Handler invocation, but doesn't currently have
// an exported constructor.
type ClientStream[Req, Res any] struct {
sender Sender
receiver Receiver
msg Req
err error
}

// NewClientStream constructs the handler's view of a client streaming RPC.
func NewClientStream[Req, Res any](s Sender, r Receiver) *ClientStream[Req, Res] {
return &ClientStream[Req, Res]{sender: s, receiver: r}
}

// RequestHeader returns the headers received from the client.
func (c *ClientStream[Req, Res]) RequestHeader() http.Header {
return c.receiver.Header()
Expand Down Expand Up @@ -78,15 +76,13 @@ func (c *ClientStream[Req, Res]) SendAndClose(envelope *Response[Res]) error {
}

// ServerStream is the handler's view of a server streaming RPC.
//
// It's constructed as part of Handler invocation, but doesn't currently have
// an exported constructor.
type ServerStream[Res any] struct {
sender Sender
}

// NewServerStream constructs the handler's view of a server streaming RPC.
func NewServerStream[Res any](s Sender) *ServerStream[Res] {
return &ServerStream[Res]{sender: s}
}

// ResponseHeader returns the response headers. Headers are sent with the first
// call to Send.
func (s *ServerStream[Res]) ResponseHeader() http.Header {
Expand All @@ -106,16 +102,14 @@ func (s *ServerStream[Res]) Send(msg *Res) error {
}

// BidiStream is the handler's view of a bidirectional streaming RPC.
//
// It's constructed as part of Handler invocation, but doesn't currently have
// an exported constructor.
type BidiStream[Req, Res any] struct {
sender Sender
receiver Receiver
}

// NewBidiStream constructs the handler's view of a bidirectional streaming RPC.
func NewBidiStream[Req, Res any](s Sender, r Receiver) *BidiStream[Req, Res] {
return &BidiStream[Req, Res]{sender: s, receiver: r}
}

// RequestHeader returns the headers received from the client.
func (b *BidiStream[Req, Res]) RequestHeader() http.Header {
return b.receiver.Header()
Expand Down

0 comments on commit 28baade

Please sign in to comment.