Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to latest go-amqp #72

Merged
merged 2 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Change Log

## `v4.1.0`
- Update to the latest go-amqp
[PR#72](https://github.com/Azure/azure-amqp-common-go/pull/72)

## `v4.0.0`
- Updated to the latest go-amqp which includes a few minor changes in public surface area.
[PR#68](https://github.com/Azure/azure-amqp-common-go/pull/68)

## `v3.2.0`
- Change the default credits for the RPC link to be more reasonable (1000)
[PR#54](https://github.com/Azure/azure-amqp-common-go/pull/54)
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ module github.com/Azure/azure-amqp-common-go/v4
go 1.18

require (
github.com/Azure/go-amqp v0.18.0
github.com/Azure/go-amqp v0.19.0
github.com/Azure/go-autorest/autorest v0.11.18
github.com/Azure/go-autorest/autorest/adal v0.9.13
github.com/devigned/tab v0.1.1
github.com/stretchr/testify v1.7.1
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0
golang.org/x/crypto v0.7.0
)

require (
Expand Down
7 changes: 4 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/Azure/go-amqp v0.18.0 h1:95bTiJq0oxjK1RUlt5T3HF/THj6jWTRZpSXMPSOJLz8=
github.com/Azure/go-amqp v0.18.0/go.mod h1:+bg0x3ce5+Q3ahCEXnCsGG3ETpDQe3MEVnOuT2ywPwc=
github.com/Azure/go-amqp v0.19.0 h1:LSkPtyUxv/hJWxWXFd52SqRaAMuwOXEOzKrXS9JM5U0=
github.com/Azure/go-amqp v0.19.0/go.mod h1:+bg0x3ce5+Q3ahCEXnCsGG3ETpDQe3MEVnOuT2ywPwc=
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest/autorest v0.11.18 h1:90Y4srNYrwOtAgVo3ndrQkTYn6kf1Eg/AjTFJ8Is2aM=
Expand Down Expand Up @@ -29,8 +29,9 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 h1:hb9wdF1z5waM+dSIICn1l0DkLVDT3hqhhQsDNUmHPRE=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A=
golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
2 changes: 1 addition & 1 deletion internal/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ package common

const (
// Version is the semantic version of the library
Version = "3.1.1"
Version = "4.1.0"
)
14 changes: 7 additions & 7 deletions rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ type (

// Actually: *amqp.Receiver
amqpReceiver interface {
Receive(ctx context.Context) (*amqp.Message, error)
Receive(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error)
Close(ctx context.Context) error
}

amqpSender interface {
Send(ctx context.Context, msg *amqp.Message) error
Send(ctx context.Context, msg *amqp.Message, o *amqp.SendOptions) error
Close(ctx context.Context) error
}
)
Expand Down Expand Up @@ -217,7 +217,7 @@ func (l *Link) RetryableRPC(ctx context.Context, times int, delay time.Duration,
// original `RPC` call.
func (l *Link) startResponseRouter() {
for {
res, err := l.receiver.Receive(context.Background())
res, err := l.receiver.Receive(context.Background(), nil)

// You'll see this when the link is shutting down (either
// service-initiated via 'detach' or a user-initiated shutdown)
Expand Down Expand Up @@ -287,10 +287,10 @@ func (l *Link) RPC(ctx context.Context, msg *amqp.Message) (*Response, error) {
responseCh := l.addChannelToMap(messageID)

if responseCh == nil {
return nil, &amqp.DetachError{}
return nil, &amqp.LinkError{}
}

err = l.sender.Send(ctx, msg)
err = l.sender.Send(ctx, msg, nil)

if err != nil {
l.deleteChannelFromMap(messageID)
Expand Down Expand Up @@ -498,9 +498,9 @@ func addMessageID(message *amqp.Message, uuidNewV4 func() (uuid.UUID, error)) (*
func isClosedError(err error) bool {
var connError *amqp.ConnError
var sessionError *amqp.SessionError
var detachError *amqp.DetachError
var linkError *amqp.LinkError

return (errors.As(err, &detachError) && detachError.RemoteErr == nil) ||
return (errors.As(err, &linkError) && linkError.RemoteErr == nil) ||
errors.As(err, &sessionError) ||
errors.As(err, &connError)
}
18 changes: 9 additions & 9 deletions rpc/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestResponseRouterBasic(t *testing.T) {
receiver := &fakeReceiver{
Responses: []rpcResponse{
{amqpMessageWithCorrelationId("my message id"), nil},
{nil, &amqp.DetachError{}},
{nil, &amqp.LinkError{}},
},
}

Expand All @@ -41,7 +41,7 @@ func TestResponseRouterMissingMessageID(t *testing.T) {
receiver := &fakeReceiver{
Responses: []rpcResponse{
{amqpMessageWithCorrelationId("my message id"), nil},
{nil, &amqp.DetachError{}},
{nil, &amqp.LinkError{}},
},
}

Expand All @@ -64,7 +64,7 @@ func TestResponseRouterBadCorrelationID(t *testing.T) {
receiver := &fakeReceiver{
Responses: []rpcResponse{
{messageWithBadCorrelationID, nil},
{nil, &amqp.DetachError{}},
{nil, &amqp.LinkError{}},
},
}

Expand All @@ -79,7 +79,7 @@ func TestResponseRouterBadCorrelationID(t *testing.T) {

func TestResponseRouterFatalErrors(t *testing.T) {
fatalErrors := []error{
&amqp.DetachError{},
&amqp.LinkError{},
&amqp.ConnError{},
&amqp.SessionError{},
}
Expand Down Expand Up @@ -246,7 +246,7 @@ func TestRPCNilMessageMap(t *testing.T) {
Responses: []rpcResponse{
// this should let us see what deleteChannelFromMap does
{amqpMessageWithCorrelationId("hello"), nil},
{nil, &amqp.DetachError{}},
{nil, &amqp.LinkError{}},
},
}

Expand Down Expand Up @@ -275,8 +275,8 @@ func TestRPCNilMessageMap(t *testing.T) {

// now check that sending can handle it.
resp, err := link.RPC(context.Background(), &amqp.Message{})
var detachErr *amqp.DetachError
require.ErrorAs(t, err, &detachErr)
var linkErr *amqp.LinkError
require.ErrorAs(t, err, &linkErr)
require.Nil(t, resp)
}

Expand All @@ -294,7 +294,7 @@ type fakeReceiver struct {
ch <-chan struct{}
}

func (fr *fakeReceiver) Receive(ctx context.Context) (*amqp.Message, error) {
func (fr *fakeReceiver) Receive(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error) {
// wait until the actual send if we're simulating request/response
if fr.ch != nil {
<-fr.ch
Expand All @@ -314,7 +314,7 @@ type fakeSender struct {
ch chan<- struct{}
}

func (s *fakeSender) Send(ctx context.Context, msg *amqp.Message) error {
func (s *fakeSender) Send(ctx context.Context, msg *amqp.Message, o *amqp.SendOptions) error {
s.Sent = append(s.Sent, msg)

if s.ch != nil {
Expand Down